EventBridge PipeとStepFunctionsでS3イベント通知を設定してみる

目次
はじめに
こんにちは、CloudBuildersのktakedaです。
S3イベント通知をご存知でしょうか。
S3バケットでオブジェクトの作成や削除、レプリケーションなどオブジェクトに関するイベントが発生したときに通知をすることができる機能です。
S3イベント通知では、SQS, Lambda, SNS, EventBridgeの4つのサービスに対してイベント通知を発行することが可能ですが、JSON形式で発行されるためメールなどでユーザ宛に通知を送りたい場合は可読性を高めるため整形する必要があります。

Lambdaを使用してJSONを整形するようなコードを書けば一発ではありますが本記事ではLambda無しでイベントごとに通知内容を分けて通知する方法を紹介します。
構成

今回は上記のように、EventBridge PipeとStepFunctionsを使った構成にします。また、ユーザへの通知はメールでの通知を想定します。
大まかな流れとしては、はじめにS3からSQSに対してイベント通知を発行しSQSのキューをトリガーにしてEventBridge Pipeを起動します。
続いて、EventBridge Pipeに渡されるデータにはSQSにて付加されたデータも含まれるためS3イベント通知のみを抽出しStepFunctionsに渡します。
最後にStepFunctionsでは受け取ったイベント通知からCreateObjectやDeleteObjectなどのイベントから分岐を発生させイベントに即した通知内容でSNSからメールを送信させ、通知が完了します。
ここからは、選定したAWSサービスについて記載します。
SQSについて
S3イベント通知を受け取るためにSQSを使用しています。前述したようにS3イベント通知ではSQSのほか、3つのサービスに対してイベント通知を発行でき、特にEventBridgeを用いると下記のような構成でも実装でき、また設定も容易です。
S3---(S3イベント通知)--->EventBridge Rule---(入力トランスフォーマーで整形)--->SNS
しかし、EventBridgeではレプリケーションおよびRRS(低冗長化ストレージ)オブジェクト消失イベントといった一部のイベントを受け取ることができませんので(受信できないイベントが必要かはさておき)今回はSQSを使っています。
また、受信するイベントのメッセージ構造も多少差があり、SQSやLambda、SNSの方がイベントの発生者の情報などが詳細に記載されています。
メッセージ構造については下記をご参照ください。(上がEventBridge、下が他3サービスのメッセージ構造になります。)
StepFunctionsについて
S3イベント通知内容からSNS経由でメールを送信する処理の部分にStepFunctionsを使っています。
EventBridge Pipeにて直接ターゲットをSNSとすることも可能ではありますが、EventBridge PIpeの入力トランスフォーマーではJSON形式でしか値を渡すことができず結局そこまで可読性高くないです。
そのため、分岐処理により通知内容もイベントごとに区別できるStepFunctionsを使用します。
設定
では、実際の設定方法になります。前提条件として、S3バケットの作成およびSNSのトピック設定は完了しているものとします。
SQSキュー作成
1. Amazon SQS コンソールにアクセス
2. 「キューの作成」をクリック

3. 任意の名前を入力

4. アクセスポリシーに下記をStatement内に追記します。
※SQSのARN、通知を行うS3バケットの部分は適宜修正してください。
{
"Sid": "s3-event-notification",
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
},
"Action": "SQS:SendMessage",
"Resource": "arn:aws:sqs:{リージョン}:{アカウントID}:{SQSキュー名}",
"Condition": {
"StringEquals": {
"AWS:SourceArn": [
"{任意のS3バケットのARN}"
]
}
}
}
5. その他はデフォルトでも問題ないので保存します。
S3イベント通知設定
1. Amazon S3 コンソールにアクセス
2. 設定したいバケットを開き、プロパティタブからイベント通知を作成をクリック

3. 任意のイベント名を記載。
※通知したいオブジェクトに指定があれば、プレフィックス、サフィックスを指定することも可能です。

4. イベントタイプには今回下記の3つを入れてみます。
- すべてのオブジェクト作成イベント(s3:objectCreated:*)
- すべてのオブジェクト削除イベント(s3:objectRemoved:*)
- オブジェクトのレプリケートに失敗しました(s3:Replication:OperatopmFailedReplication)
5. 送信先にSQSキューを選び、先ほど作成したSQSキュー名を指定して保存

StepFunctions作成
1. Step Functions コンソールにアクセス
2. ステートマシンタブから「ステートマシンの作成」をクリック

3. 任意の名前を記載
4. コードを選択し、下記をステートマシン定義を記載しステートマシンを作成
※SNSトピックのARN部分を適宜修正してください。

{
"Comment": "S3イベントごとに分岐処理するStep Functions",
"StartAt": "ExtractFirstRecord",
"States": {
"ExtractFirstRecord": {
"Type": "Pass",
"Parameters": {
"Record.$": "$[0].Record"
},
"Next": "CheckEventType"
},
"CheckEventType": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.Record.eventName",
"StringMatches": "ObjectCreated:*",
"Next": "HandleObjectCreated"
},
{
"Variable": "$.Record.eventName",
"StringMatches": "ObjectRemoved:*",
"Next": "HandleObjectRemoved"
},
{
"Variable": "$.Record.eventName",
"StringMatches": "Replication:*",
"Next": "HandleReplication"
}
],
"Default": "HandleUnknownEvent"
},
"HandleObjectCreated": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "{SNSトピックのARN}",
"Subject": "オブジェクト作成通知",
"Message.$": "States.Format('S3バケットイベント通知\n\nイベント詳細:\n- イベント名: {}\n- 発生時刻: {}\n- リージョン: {}\n\nバケット情報:\n- バケット名: {}\n- バケットARN: {}\n\nオブジェクト情報:\n- オブジェクトキー: {}\n- サイズ: {} バイト\n- ETag: {}\n- バージョンID: {}\n\n操作ユーザー: {}\nソースIP: {}', $.Record.eventName, $.Record.eventTime, $.Record.awsRegion, $.Record.s3.bucket.name, $.Record.s3.bucket.arn, $.Record.s3.object.key, $.Record.s3.object.size, $.Record.s3.object.eTag, $.Record.s3.object.versionId, $.Record.userIdentity.principalId, $.Record.requestParameters.sourceIPAddress)"
},
"End": true
},
"HandleObjectRemoved": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "{SNSトピックのARN}",
"Subject": "オブジェクト削除通知",
"Message.$": "States.Format('S3バケットイベント通知\n\nイベント詳細:\n- イベント名: {}\n- 発生時刻: {}\n- リージョン: {}\n\nバケット情報:\n- バケット名: {}\n- バケットARN: {}\n\nオブジェクト情報:\n- 削除されたオブジェクトキー: {}\n- ETag: {}\n- バージョンID: {}\n\n操作ユーザー: {}\nソースIP: {}', $.Record.eventName, $.Record.eventTime, $.Record.awsRegion, $.Record.s3.bucket.name, $.Record.s3.bucket.arn, $.Record.s3.object.key, $.Record.s3.object.eTag, $.Record.s3.object.versionId, $.Record.userIdentity.principalId, $.Record.requestParameters.sourceIPAddress)"
},
"End": true
},
"HandleReplication": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "{SNSトピックのARN}",
"Subject": "レプリケーションイベント通知",
"Message.$": "States.Format('S3レプリケーション失敗通知\n\nイベント詳細:\n- イベント名: {}\n- 発生時刻: {}\n- リージョン: {}\n\nバケット情報:\n- ソースバケット名: {}\n- ソースバケットARN: {}\n\nオブジェクト情報:\n- オブジェクトキー: {}\n- サイズ: {} バイト\n- ETag: {}\n- バージョンID: {}\n\nレプリケーション情報:\n- レプリケーションルールID: {}\n- 宛先バケット: {}\n- S3操作: {}\n- リクエスト時間: {}\n- 失敗理由: {}', $.Record.eventName, $.Record.eventTime, $.Record.awsRegion, $.Record.s3.bucket.name, $.Record.s3.bucket.arn, $.Record.s3.object.key, $.Record.s3.object.size, $.Record.s3.object.eTag, $.Record.s3.object.versionId, $.Record.replicationEventData.replicationRuleId, $.Record.replicationEventData.destinationBucket, $.Record.replicationEventData.s3Operation, $.Record.replicationEventData.requestTime, $.Record.replicationEventData.failureReason)"
},
"End": true
},
"HandleUnknownEvent": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "{SNSトピックのARN}",
"Subject": "未知のイベント通知",
"Message.$": "States.Format('{}', $.Record)"
},
"End": true
}
}
}
Amazon EventBridge Pipe作成
1. Step Functions コンソールにアクセス
2. パイプタブを開き、「パイプの作成」をクリック

3. 任意のパイプ名を記載する
4. ソースにSQSを選択し、作成したSQSキューを選択

5. ターゲットにStepFunctionsを選択し、作成したステートマシンを選択

6. ターゲット入力トランスフォーマーのトランスフォーマーに下記を記載しパイプを作成

{
"Record": <$.body.Records[0]>
}
動作確認
実装が完了しましたのでS3バケットにファイルをアップロードおよび作成してメール通知を確認してみます。
レプリケートのイベントについては失敗時に通知が行われるため、レプリケートが失敗するようにしてます。(単純にレプリケートするための権限を不足させました。)
それぞれ下記にしますが、いずれのイベントについてもメール通知が正常に行われ、イベントに応じた通知内容になっています。
【オブジェクトの作成】

【オブジェクトの削除】

【オブジェクトのレプリケート失敗】

さいごに
S3イベント通知をEventBridge PipeおよびStep Functionsを使って通知する方法について紹介させていただきましたがいかがでしたでしょうか。
費用面を考えるとやはり、StepFunctionsではなくLambdaを使ったほうが安く抑えることはできますがStepFunctionsを用いることにより、処理内容を視覚的に確認できるので実際に運用する上では管理やメンテナンスの手間を考えるとStep Functionsを選択する理由としては有りなのかなと思いました。
また、今回はStep FunctionsからただSNSでメール通知する部分のみの実装でしたが、要件によってはStep Functionsからさまざまな処理をさせてもいいのかなと思います。
ここまでご覧いただきありがとうございました。本記事が参考になりましたら幸いです。