初めに
少し遡り2023/10頃のアップデートですがAmazon AthenaとAmazon S3 Object Lambdaの統合のアップデートがありました。
Amazon S3 Object LambdaはS3バケットからデータを取得し返却するタイミングでLambdaの処理によりデータを加工することができるアクセスポイントで、こちらを指定し経由することで個人情報のマスキングなどの処理ができるため派生データを作成せず1マスタ複数パターンを実現することが可能です。
上記のアップデートによりはAmazon Athenaがこちらのアクセスポイントを指定しクエリを実行に応じデータを加工し参照できるようになったため試してみます。
やること
以前DMARCレポートをAmazon SESで受信しS3バケットに保管する仕組みを作成しました。
DMARCレポートはあくまでメールの添付や本文に記載され、フォーマットもXMLかつ送信元次第でやZIPやgzipに圧縮される為直接Athenaで分析することはできません。
上記記事ではレポート受信(正確にはS3への設置)をトリガーに取り出し・JSONへの変換を行い変換後のデータを分析していましたが、今回はObject Lambdaアクセスポイントを中継することでバケット上の変換後データを処理するのではなく変換元データを実行のたびに都度変換し分析してみます。
対応については既存の基盤に変更を行うのではなく一旦外付け別テンプレートとして既存のS3バケットを参照するようなオプションとして実装します。
テンプレート・ソースコード
以下に格納しています。前述の通り外付けとなっている関係で単体では大元のS3バケットを作成しない点ご注意ください。
構成イメージは以下の通りです。
Object Lambdaを利用する場合S3へのアクセスは署名付きURLで行われるのですがどの権限で署名されているかが微妙なところでもしかするとAthenaを実行しているIAMユーザの権限で動いてる可能性があります(Athena + Object Lambdaに関するドキュメントが見当たらず...)。
今回ユーザ側の権限でよしなに吸収してリソース側のポリシーが不足している場合、クロスアカウントアクセス操作で同等のことをやるとコケる可能性があるのでもしやられる方は確認していただければと思います。
Athenaテーブルの作成
ここは手動で作成します。
LOCATIONにObject Lambdaのアクセスポイントエイリアスを指定する以外は以前の差異はありません。
CREATE EXTERNAL TABLE IF NOT EXISTS dmarc_report_ol (
feedback struct<
report_metadata: struct<
org_name: string,
email: string,
extra_contact_info: string,
report_id: string,
date_range: struct<
begin: string,
`end`: string
>
>,
policy_published: struct<
domain: string,
adkim: string,
aspf: string,
p: string,
sp: string,
pct: string,
np: string
>,
record: array<struct<
`row`: struct<
source_ip: string,
`count`: string,
policy_evaluated: struct<
disposition: string,
dkim: string,
spf: string
>
>,
identifiers: struct<
header_from: string
>,
auth_results: struct<
spf: struct<
domain: string,
result: string
>,
dkim: array<
struct<
domain: string,
result: string,
selector: string
>
>
>>
>
>
)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
WITH SERDEPROPERTIES ('paths'='feedback')
LOCATION 's3://dmarc-convert-functi-xxxxx-ol-s3/source/catcher/';
エイリアスはAmazon S3のObject Lambdaアクセスポイントから確認可能です。
実行
後述の事情でnullデータをSELECT実行時に都度除去していますが以下のように得られることが確認できました。
以前アクセスポイント経由でアクセスした際の記事では実行が大体500msほどでしたが今回は2.8秒ほどとなり、どうしても処理時間は長くなってしまいます。。
Lambda関数の呼び出しを見てみると稼働自体は並行で行われますが、起動自体はファイル単位で順次行われ数十ミリ秒程度の差はあるためファイル数自体が多くなるとこの辺の影響も大きくなっていきそうです。
ハマったところ
範囲GetObjectを有効にする
マネジメントコンソールでの以下の箇所で範囲GetObject、SAMやCloudFormationの場合はAllowedFeatures
にGetObject-Range
を指定しましょう。
最初こちらの設定が抜けていてListObjectは発生するが、Lambdaの呼び出しも発生しないしGetObjectも発生せず躓いていました。
データ無の場合も空のデータを書き込む必要がある
今回の処理ではレポートが存在しない、読み込めない場合とくにレスポンスを書き込まず終了していたのですが、この場合Lambda関数自体が正常に終了していても呼び出しを繰り返すことがわかりました(実際の動作を見る限りの推定。AtenaとObject Lambdaどちらの仕様か未確認)。
なおここでいうレスポンスはLambda関数自体の返却値ではなくwrite_get_object_response()
の呼び出しを指します。
上限は不明ですが数十秒放置したらデータが10個ほどしかないのに400回近く呼び出されていました(エラー自体は別件でプログラム側のミス)。
異常に実行が長い場合はちょっと気にして確認したほうが良いかもしれません。
ひとまず空のJSONを返すようにすることで解決はしましたが空のデータが返るのでis not null
で弾く必要はあります。もっと良い方法があるのでしょうか?
終わりに
Object Lambdaを利用してAthenaのクエリ実行時にデータ変換を行ってみました。
アップデートを見る限り個人情報の除去など軽微なものをユースケースとして挙げていましたが今回のようにがっつりデータ加工をすることも可能です。
データを多重に持たなくてよくなる一方で都度加工を行う関係でレスポンスの遅延及び加工処理に対するコンピューティングの課金も発生するため月1に固定で分析をかけるようなユースケース向けでしょうか。
高頻度で分析をかける場合は作業速度的にも課金額的にも一時的に中間データを生成し作業が終わったらデータを消すほうが良い場合もありますので各々の環境に応じてどちらを利用するか選択していきましょう。