はじめに
データアナリティクス事業本部ビッグデータチームのyosh-kです。
今回はStep Functions内でGlue Python Shellを実行する実装をCDKで行いたいと思います。
前提
今回実現したい構成は以下になります。
- EventBridgeでdata-source Bucketへの
Object Created
イベントを検知し、Step Functionsを起動します。 - Step Functions内でGlue Jobを実行します。
- Glue Jobでデータ加工した出力先としてdata-store Bucketにcsvファイルを生成します。
加工処理前のcsvファイルの中身になります。
animal_info.csv
"animal_id","animal_name","species","weight_kg","age_years","habitat","arrival_year"
"001","Luna","Tiger","120","5","Savanna","2018"
"002","Bella","Elephant","2500","8","Jungle","2015"
"003","Max","Lion","190","6","Savanna","2017"
"004","Daisy","Giraffe","800","4","Savanna","2019"
"005","Charlie","Rhino","2300","7","Jungle","2016"
実装
それでは実装になります。実装コードはリンクに格納しています。
35_step_functions_glue_python_shell_with_cdk % tree
.
├── README.md
├── cdk
│ ├── bin
│ │ └── app.ts
│ ├── cdk.json
│ ├── jest.config.js
│ ├── lib
│ │ ├── constructs
│ │ │ ├── eventbridge.ts
│ │ │ ├── glue.ts
│ │ │ ├── s3.ts
│ │ │ └── step-functions.ts
│ │ └── stack
│ │ └── etl-stack.ts
│ ├── package-lock.json
│ ├── package.json
│ ├── parameter.ts
│ ├── test
│ │ └── app.test.ts
│ └── tsconfig.json
└── resources
└── glue
└── etl_script.py
9 directories, 15 files
35_step_functions_glue_python_shell_with_cdk %
bin/app.ts
#!/usr/bin/env node
import * as cdk from "aws-cdk-lib";
import { ETLStack } from "../lib/stack/etl-stack";
import { devParameter } from "../parameter";
const app = new cdk.App();
new ETLStack(app, "CMKasamaETL", {
description: "ETL (tag:kasama-test-tag)",
env: {
account: devParameter.env?.account || process.env.CDK_DEFAULT_ACCOUNT,
region: devParameter.env?.region || process.env.CDK_DEFAULT_REGION,
},
tags: {
Repository: "kasama-test-tag",
Environment: devParameter.envName,
},
projectName: devParameter.projectName,
envName: devParameter.envName,
});
description
: CloudFormation StackのDescriptionとなります。env
: deploy先のaccoun, regionを設定します。devParameterで定義していなければdeployコマンド実行環境のデフォルト値を設定します。tag
: 作成されるリソースに対してのタグを設定します。projectName, envName
: 処理の中でリソース名の一部として使用します。
lib/constructs/eventbridge.ts
import { Construct } from "constructs";
import * as events from "aws-cdk-lib/aws-events";
import * as eventsTargets from "aws-cdk-lib/aws-events-targets";
import * as sfn from "aws-cdk-lib/aws-stepfunctions";
import * as iam from "aws-cdk-lib/aws-iam";
export interface EventBridgeConstructProps {
envName: string;
projectName: string;
dataSourceBucketName: string;
stateMachineArn: string;
}
export class EventBridgeConstruct extends Construct {
constructor(scope: Construct, id: string, props: EventBridgeConstructProps) {
super(scope, id);
const s3PutRule = new events.Rule(this, "S3PutRule", {
eventPattern: {
source: ["aws.s3"],
detailType: ["Object Created"],
detail: {
bucket: {
name: [props.dataSourceBucketName],
},
object: {
key: [{ prefix: "input/" }],
size: [{ numeric: [">", 0] }],
},
},
},
ruleName: `${props.projectName}-${props.envName}-etl-s3-put-rule`,
});
// EventBridge が Step Functions を起動するための IAM ロールを作成
const eventBridgeExecutionRole = new iam.Role(
this,
`EventBridgeExecutionRole`,
{
assumedBy: new iam.ServicePrincipal("events.amazonaws.com"), // 信頼ポリシー設定
description:
"An IAM role for EventBridge to Start Step Functions Execution",
roleName: `EventBridgeExecutionRoleForStepFunctions-${props.envName}`,
}
);
eventBridgeExecutionRole.addToPolicy(
new iam.PolicyStatement({
actions: ["states:StartExecution"], // 許可するアクション
resources: [props.stateMachineArn], // ステートマシンのARN
})
);
// ステップ関数をS3 Put Eventのターゲットとして設定
const stateMachine = sfn.StateMachine.fromStateMachineArn(
this,
"ImportedStateMachine",
props.stateMachineArn
);
s3PutRule.addTarget(
new eventsTargets.SfnStateMachine(stateMachine, {
role: eventBridgeExecutionRole,
})
);
}
}
EventBridge, EventBridge用のIAM Role, IAM RoleのPolicyの定義をしています。フォルダが作成された際には起動してほしくないため、ファイルサイズが0以上という条件をつけています。
lib/constructs/glue.ts
import { Construct } from "constructs";
import * as glue from "aws-cdk-lib/aws-glue";
import * as iam from "aws-cdk-lib/aws-iam";
export interface GlueConstructProps {
envName: string;
projectName: string;
dataSourceBucketName: string;
dataStoreBucketName: string;
sysBucketName: string;
}
export class GlueConstruct extends Construct {
public readonly glueJobName: string;
constructor(scope: Construct, id: string, props: GlueConstructProps) {
super(scope, id);
// Glue Job用のIAMロール
const glueJobRole = new iam.Role(this, "GlueJobRole", {
assumedBy: new iam.ServicePrincipal("glue.amazonaws.com"),
description: "Role for Glue Job execution",
roleName: `${props.projectName}-${props.envName}-etl-glue-execution-role`,
});
glueJobRole.addToPolicy(
new iam.PolicyStatement({
resources: ["arn:aws:logs:*:*:*:/aws-glue/*"], // ログリソースへのアクセス。必要に応じてより具体的に指定
actions: [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
],
})
);
glueJobRole.addToPolicy(
new iam.PolicyStatement({
actions: [
"s3:ListBucket",
"s3:GetObject",
"s3:PutObject",
"s3:CopyObject",
],
resources: [
`arn:aws:s3:::${props.dataSourceBucketName}`,
`arn:aws:s3:::${props.dataSourceBucketName}/*`,
`arn:aws:s3:::${props.dataStoreBucketName}`,
`arn:aws:s3:::${props.dataStoreBucketName}/*`,
`arn:aws:s3:::${props.sysBucketName}`,
`arn:aws:s3:::${props.sysBucketName}/*`,
],
})
);
this.glueJobName = `${props.projectName}-${props.envName}-glue-job`;
// Glue Jobの定義
new glue.CfnJob(this, "GlueJob", {
name: this.glueJobName,
role: glueJobRole.roleArn,
command: {
name: "pythonshell",
pythonVersion: "3.9",
scriptLocation: `s3://${props.sysBucketName}/glue-jobs/glue/etl_script.py`,
},
executionProperty: {
maxConcurrentRuns: 5,
},
defaultArguments: {
"--TempDir": `s3://${props.sysBucketName}/tmp`,
"--job-language": "python",
"--S3_OUTPUT_BUCKET": props.dataStoreBucketName,
"--S3_OUTPUT_KEY": `output/`,
},
});
}
}
Glue Job、Glue Job用IAM Roleを定義しています。command.name
でpythonshell
と指定することでPython Shellを定義しています。
現状、公式のL2 Constructsが存在しないため、L1を使用しています。aws-glue-alpha
はあるようですが、今後どこまで実装が変更されるかわからず、L1を選択しました。
lib/constructs/s3.ts
import * as cdk from "aws-cdk-lib";
import {
Bucket,
BlockPublicAccess,
BucketEncryption,
} from "aws-cdk-lib/aws-s3";
import { Construct } from "constructs";
export interface S3ConstructProps {
envName: string;
projectName: string;
}
export class S3Construct extends Construct {
public readonly dataSourceBucket: Bucket;
public readonly dataStoreBucket: Bucket;
public readonly sysBucket: Bucket;
constructor(scope: Construct, id: string, props: S3ConstructProps) {
super(scope, id);
this.dataSourceBucket = new Bucket(this, "DataSourceBucket", {
bucketName: `${props.projectName}-${props.envName}-data-source`,
removalPolicy: cdk.RemovalPolicy.DESTROY,
blockPublicAccess: BlockPublicAccess.BLOCK_ALL,
encryption: BucketEncryption.KMS_MANAGED,
versioned: true,
eventBridgeEnabled: true,
});
this.dataStoreBucket = new Bucket(this, "DataStoreBucket", {
bucketName: `${props.projectName}-${props.envName}-data-store`,
removalPolicy: cdk.RemovalPolicy.DESTROY,
blockPublicAccess: BlockPublicAccess.BLOCK_ALL,
encryption: BucketEncryption.KMS_MANAGED,
versioned: true,
});
this.sysBucket = new Bucket(this, "SysBucket", {
bucketName: `${props.projectName}-${props.envName}-sys`,
removalPolicy: cdk.RemovalPolicy.DESTROY,
blockPublicAccess: BlockPublicAccess.BLOCK_ALL,
encryption: BucketEncryption.KMS_MANAGED,
versioned: true,
eventBridgeEnabled: true,
});
// Glue スクリプトを S3 バケットにデプロイ
new cdk.aws_s3_deployment.BucketDeployment(this, "DeployGlueScript", {
sources: [cdk.aws_s3_deployment.Source.asset("../resources")],
destinationBucket: this.sysBucket,
destinationKeyPrefix: "glue-jobs",
});
}
}
インプットファイルを格納するBucket、アウトプットファイルを格納するBucket、Glue Scriptを格納するBucketを定義しています。Glue Scriptはresourcesフォルダ配下のものをデプロイするように定義しています。Bucketを他のConstructで参照するためpublic readonlyで参照できる定義としています。
lib/constructs/step-functions.ts
import { Construct } from "constructs";
import * as sfn from "aws-cdk-lib/aws-stepfunctions";
import * as sfn_tasks from "aws-cdk-lib/aws-stepfunctions-tasks";
import * as iam from "aws-cdk-lib/aws-iam";
export interface StepFunctionsConstructProps {
envName: string;
projectName: string;
dataSourceBucketName: string;
dataStoreBucketName: string;
glueJobName: string;
}
export class StepFunctionsConstruct extends Construct {
public readonly stateMachine: sfn.StateMachine;
constructor(
scope: Construct,
id: string,
props: StepFunctionsConstructProps
) {
super(scope, id);
const stepFunctionsRole = new iam.Role(this, `StepFunctionsRole`, {
assumedBy: new iam.ServicePrincipal("states.amazonaws.com"),
description: "An IAM role for Step Functions to access AWS services",
roleName: `${props.projectName}-${props.envName}-etl-stepfunctions-role`,
});
stepFunctionsRole.addToPolicy(
new iam.PolicyStatement({
actions: ["s3:ListBucket", "s3:GetObject", "s3:PutObject"],
resources: [
`arn:aws:s3:::${props.dataSourceBucketName}`,
`arn:aws:s3:::${props.dataSourceBucketName}/*`,
`arn:aws:s3:::${props.dataStoreBucketName}`,
`arn:aws:s3:::${props.dataStoreBucketName}/*`,
],
})
);
stepFunctionsRole.addToPolicy(
new iam.PolicyStatement({
resources: ["*"], // 必要に応じてリソースをより具体的に指定
actions: ["glue:StartJobRun"], // 必要なアクションに絞り込むことを推奨
})
);
// Glue Job の実行
const startGlueJob = new sfn_tasks.GlueStartJobRun(this, "StartGlueJob", {
glueJobName: props.glueJobName,
arguments: sfn.TaskInput.fromObject({
// S3イベントから取得した入力パス
"--S3_INPUT_BUCKET.$": "$.detail.bucket.name",
"--S3_INPUT_KEY.$": "$.detail.object.key",
}),
integrationPattern: sfn.IntegrationPattern.RUN_JOB, // タスクが終了するまで待機
});
/// ジョブ失敗時の振る舞いを定義
const jobFailed = new sfn.Fail(this, "JobFailed", {
errorPath: "$.Error",
causePath: "$.Cause",
});
startGlueJob.addCatch(jobFailed, {
errors: ["States.ALL"],
});
const definitionBody = sfn.DefinitionBody.fromChainable(startGlueJob);
this.stateMachine = new sfn.StateMachine(this, "StateMachine", {
stateMachineName: `${props.projectName}-${props.envName}-etl-statemachine`,
definitionBody: definitionBody,
role: stepFunctionsRole,
});
}
}
sfn_tasks.GlueStartJobRun
: AWS Glueのジョブを実行するためのStep Functionsのタスクを定義します。このタスクでは、ジョブ名、実行するためのパラメータ、ジョブが完了するまで待つ設定(integrationPattern: sfn.IntegrationPattern.RUN_JOB)が含まれています。jobFailed
: ジョブが失敗した場合の振る舞いを定義します。errorPathとcausePathは、エラーが発生した際のレスポンス値を参照します。addCatch(jobFailed, {errors: ["States.ALL"]})
: StartGlueJobタスクが失敗した場合に、任意のエラーをキャッチしてJobFailedステートへ遷移するように設定しています。今回はStates.ALLであらゆる例外をキャッチしていますが、実際の運用ではそれぞれの例外に沿った処理を検討してみる方が良いかもしれません。
lib/stack/etl-stack.ts
import { Construct } from "constructs";
import * as cdk from "aws-cdk-lib";
import { S3Construct } from "../constructs/s3";
import { GlueConstruct, GlueConstructProps } from "../constructs/glue";
import {
StepFunctionsConstruct,
StepFunctionsConstructProps,
} from "../constructs/step-functions";
import {
EventBridgeConstruct,
EventBridgeConstructProps,
} from "../constructs/eventbridge";
export interface ETLStackProps extends cdk.StackProps {
envName: string;
projectName: string;
}
export class ETLStack extends cdk.Stack {
constructor(scope: Construct, id: string, props: ETLStackProps) {
super(scope, id, props);
const s3Construct = new S3Construct(this, "S3", {
envName: props.envName,
projectName: props.projectName,
});
const glueConstruct = new GlueConstruct(this, "Glue", {
envName: props.envName,
projectName: props.projectName,
dataSourceBucketName: s3Construct.dataSourceBucket.bucketName,
dataStoreBucketName: s3Construct.dataStoreBucket.bucketName,
sysBucketName: s3Construct.sysBucket.bucketName,
} as GlueConstructProps);
const stepFunctionsConstruct = new StepFunctionsConstruct(
this,
"StepFunctions",
{
envName: props.envName,
projectName: props.projectName,
dataSourceBucketName: s3Construct.dataSourceBucket.bucketName,
dataStoreBucketName: s3Construct.dataStoreBucket.bucketName,
glueJobName: glueConstruct.glueJobName,
} as StepFunctionsConstructProps
);
new EventBridgeConstruct(this, "EventBridge", {
envName: props.envName,
projectName: props.projectName,
dataSourceBucketName: s3Construct.dataSourceBucket.bucketName,
stateMachineArn: stepFunctionsConstruct.stateMachine.stateMachineArn,
} as EventBridgeConstructProps);
}
}
上記ファイルではStackを定義しその中でConstructとしてリソースを定義しています。S3 Bucket、Glue、Step Functions, EventBridgeで依存関係があるため、上記の順となっています。bin/app.ts
から取得したenvName, projectNameは全てのConstructで活用するため、引数として指定しています。
parameter.ts
import { Environment } from "aws-cdk-lib";
// Parameters for Application
export interface AppParameter {
env: Environment;
envName: string;
projectName: string;
}
// Example
export const devParameter: AppParameter = {
envName: "dev",
projectName: "cm-kasama",
env: {},
// env: { account: "xxxxxx", region: "ap-northeast-1" },
};
環境変数を定義するためのファイルとなります。accountには実際のAWS_ACCOUNT_IDを記載します。そのほかには、evnNameやprojectNameなどは一括で修正できるように変数として定義しています。私の場合はprofileで指定したIAM Roleに紐づくAWS Account、Regionへデプロイするためenvを空で設定しています。
resources/glue/etl_script.py
import sys
import traceback
import boto3
import pandas as pd
from io import StringIO
from datetime import datetime
from zoneinfo import ZoneInfo
from awsglue.utils import getResolvedOptions
def main():
try:
args = getResolvedOptions(
sys.argv,
[
"S3_INPUT_BUCKET",
"S3_INPUT_KEY",
"S3_OUTPUT_BUCKET",
"S3_OUTPUT_KEY_PREFIX",
],
)
# ジョブから渡されたパラメータを使用
s3_input_bucket = args["S3_INPUT_BUCKET"]
s3_input_key = args["S3_INPUT_KEY"]
s3_output_bucket = args["S3_OUTPUT_BUCKET"]
s3_output_key = args["S3_OUTPUT_KEY_PREFIX"]
# Boto3クライアントの初期化
s3_client = boto3.client("s3")
# S3からCSVファイルを読み込む
response = s3_client.get_object(Bucket=s3_input_bucket, Key=s3_input_key)
input_data = pd.read_csv(response["Body"])
# 動物園に来てからの年数を計算して新しい列に追加
current_year = datetime.now(ZoneInfo("Asia/Tokyo")).year
input_data["years_in_zoo"] = current_year - input_data["arrival_year"]
# Pandas DataFrameをCSV文字列に変換
output_csv = input_data.to_csv(index=False)
output_file = StringIO(output_csv)
current_time = datetime.now(ZoneInfo("Asia/Tokyo")).strftime("%Y-%m-%d-%H-%M-%S")
# 現在の日時を取得し、ファイル名を生成
output_filename = f"{s3_output_key}output_{current_time}.csv"
# 加工後のCSVをS3に保存
s3_client.put_object(
Bucket=s3_output_bucket,
Key=output_filename,
Body=output_file.getvalue(),
)
except Exception as e:
tb = traceback.format_exc()
print(f"Unexpected error: {str(e)}\nTraceback: {tb}")
raise e
if __name__ == "__main__":
main()
inputとなるcsvファイルのarrival_yearカラムを元に動物園に来てからの年数を計算して、years_in_zooカラムを生成し、timestamp型のsuffixを付与した新規csvファイルをoutputとなる格納先へputする処理となっています。
デプロイ
package.jsonがあるディレクトリで依存関係をインストールします。
npm install
次にcdk.jsonがあるディレクトリで、CDKで定義されたリソースのコードをAWS CloudFormationテンプレートに合成(変換)するプロセスを実行します。
npx cdk synth --profile <YOUR_AWS_PROFILE>
同じくcdk.jsonがあるディレクトリでデプロイコマンドを実行します。--all
はCDKアプリケーションに含まれる全てのスタックをデプロイするためのオプション、--require-approval never
はセキュリティ的に敏感な変更やIAMリソースの変更を含むデプロイメント時の承認を求めるダイアログ表示を完全にスキップします。never
は、どんな変更でも事前確認なしにデプロイすることを意味します。今回は検証用なので指定していますが、慎重にデプロイする場合は必要のないオプションになるかもしれません。
npx cdk deploy --all --require-approval never --profile <YOUR_AWS_PROFILE>
実行結果
正常系
それではS3にファイルをPutし、Step Functionsの起動を確認したいと思います。
EventBridgeが実行されていることを確認。
Step Functions正常終了。
Glue Job正常終了。
data store Bucketにファイルが生成されていること、データ加工が想定通りであることを確認。
異常系
データ加工対象のカラムを保持していないcsvファイルをPut。
JobFailedとなり、Step Functionsが異常終了。
Glue Jobも同様。
最後に
フォルダやファイル構成、例外処理などまだまだ悩みは尽きないので引き続き学習していきます!