さがらです。
先月、Amazon Data Firehoseを用いてSnowflakeに対してデータをストリーミングする機能が一般提供となりました。東京リージョンでも使えます!
これを受けて、関連するQuickstartである「Getting Started with Snowflake and Amazon Data Firehose (ADF)」をやってみたので、本記事でまとめてみます。
Quickstartの内容
このQuickstartですが、OpenSky Networkというリアルタイムの航空データを提供しているサービスのAPIを用いて、JSON形式で航空状況を表す時系列データを取得し、Data Firehose経由でSnowflakeにストリーミングする、ということを実際に体験できる内容となっています。
下図はQuickstartのページからの引用ですが、このようなアーキテクチャとなっています。より具体的には、EC2インスタンス上でOpenSkyのAPIを叩いてデータを取得してData FirehoseにストリーミングするPython処理を実行し、Snowflakeでストリーミングされたデータをクエリする、という構成です。
前提
本Quickstartを進める上で、以下の前提で進めます。
- ストリームデータを生成するEC2は「パブリックサブネット」に配置
- このため、Quickstart上で言及されているPrivateLink周りの処理は行いません
- VPC、Subnet、Security Group、EC2、S3など、Data Firehose以外のリソースについてはCloudFormationで定義(詳細は後述)
- AWSは「アジアパシフィック(東京)」リージョン
- Snowflakeのアカウントは、クラウドプラットフォームがAWS、リージョンは東京、Enterpriseエディション、で事前に作成済
AWS上の準備(2. Provision a Linux jumphost in AWS)
CloudFormationの実行
本Quickstart上でもEC2を立てるCloudFormationを実行するリンクがあるのですが、そのリンク先に記載されているjsonファイル(https://snowflake-corp-se-workshop.s3.us-west-1.amazonaws.com/VHOL_Snowflake_KDF/kdf-bastion.json)を変換したコードを使って各AWSリソースを構築します。
具体的には、以下の内容でClaude3 Sonnetに依頼して変換してもらいました。(個人的に、CloudFormationのコードの生成ではClaude3 Sonnetをよく使っています。)
※上述のJSONのコードを貼り付けた上で
このコードについて、以下の内容に沿って変換してください。
・YAMLにすること
・VPC、Public Subnet、SecurityGroupなど必要なリソースもこのコード上で定義すること。
・Security Groupでは、特定のIPからのアクセスに限定させること
・Firehoseのbackup settings用のS3バケットの作成も追加すること
・名前をつけることが出来るリソースについて、全て名前の指定を行うこと
この依頼内容で変換してもらったコードがこちらです。(parameterのところなど、手作業で少しだけ編集しているところもあります。)
AWSTemplateFormatVersion: '2010-09-09'
Description: Create a Jumphost to run Kinesis producer for ingesting data into firehose
Parameters:
InstanceType:
Description: JumpHost EC2 instance types
Type: String
Default: t3.micro
AllowedValues:
- t2.micro
- t2.small
- t2.medium
- t3.micro
- t3.small
- t3.medium
LatestAmiId:
Type: 'AWS::SSM::Parameter::Value<AWS::EC2::Image::Id>'
Description: JumpHost EC2 AMI Id
Default: "/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2"
AllowedValues:
- "/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2"
RemoteAccessCIDR:
Description: CIDR IP range that can access the Jumphost
Type: String
Default: 0.0.0.0/0
AllowedPattern: '(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})/(\d{1,2})'
ConstraintDescription: must be a valid CIDR range of the form x.x.x.x/x
Resources:
VPC:
Type: 'AWS::EC2::VPC'
Properties:
CidrBlock: 10.0.0.0/16
EnableDnsHostnames: true
EnableDnsSupport: true
InstanceTenancy: default
Tags:
- Key: Name
Value: !Sub '${AWS::StackName}-VPC'
PublicSubnet:
Type: 'AWS::EC2::Subnet'
Properties:
VpcId: !Ref VPC
AvailabilityZone: !Select [0, !GetAZs '']
CidrBlock: 10.0.0.0/24
MapPublicIpOnLaunch: true
Tags:
- Key: Name
Value: !Sub '${AWS::StackName}-Public-Subnet'
InternetGateway:
Type: 'AWS::EC2::InternetGateway'
Properties:
Tags:
- Key: Name
Value: !Sub '${AWS::StackName}-IGW'
VPCGatewayAttachment:
Type: 'AWS::EC2::VPCGatewayAttachment'
Properties:
VpcId: !Ref VPC
InternetGatewayId: !Ref InternetGateway
RouteTable:
Type: 'AWS::EC2::RouteTable'
Properties:
VpcId: !Ref VPC
Tags:
- Key: Name
Value: !Sub '${AWS::StackName}-RouteTable'
RouteTableInternetGatewayRoute:
Type: 'AWS::EC2::Route'
DependsOn: VPCGatewayAttachment
Properties:
RouteTableId: !Ref RouteTable
DestinationCidrBlock: 0.0.0.0/0
GatewayId: !Ref InternetGateway
RouteTableAssociation:
Type: 'AWS::EC2::SubnetRouteTableAssociation'
Properties:
SubnetId: !Ref PublicSubnet
RouteTableId: !Ref RouteTable
FirehoseBackupBucket:
Type: 'AWS::S3::Bucket'
Properties:
BucketName: !Sub '${AWS::StackName}-backup-bucket'
AccessControl: BucketOwnerFullControl
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
JumpHostSecurityGroup:
Type: 'AWS::EC2::SecurityGroup'
Properties:
GroupDescription: Allow SSH from specified CIDR range
VpcId: !Ref VPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 22
ToPort: 22
CidrIp: !Ref RemoteAccessCIDR
Tags:
- Key: Name
Value: !Sub '${AWS::StackName}-JumpHost-SG'
rJumpHostInstanceRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- ec2.amazonaws.com
Action:
- 'sts:AssumeRole'
ManagedPolicyArns:
- 'arn:aws:iam::aws:policy/service-role/AmazonEC2RoleforSSM'
RoleName: !Sub '${AWS::StackName}-JumpHostRole'
Policies:
- PolicyName: !Sub '${AWS::StackName}-${AWS::AccountId}-jumphostpolicy'
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 's3:*'
- 'kinesis:*'
- 'firehose:*'
Resource: '*'
rJumpHostInstanceProfile:
Type: AWS::IAM::InstanceProfile
Properties:
Path: /
Roles:
- !Ref rJumpHostInstanceRole
InstanceProfileName: !Sub '${AWS::StackName}-JumpHostInstanceProfile'
rJumpHost:
Type: 'AWS::EC2::Instance'
Metadata:
'AWS::CloudFormation::Init': {}
Properties:
IamInstanceProfile: !Ref rJumpHostInstanceProfile
ImageId: !Ref LatestAmiId
InstanceType: !Ref InstanceType
UserData:
'Fn::Base64':
!Sub |
#!/bin/bash
yum -y install jq python3-pip
echo ${AWS::Region} > /tmp/region
echo ${AWS::StackName} > /tmp/stackName
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "/tmp/awscliv2.zip"
unzip "/tmp/awscliv2.zip" -d /tmp
/tmp/aws/install
chmod 755 /usr/local/bin/aws
aws s3 cp s3://snowflake-corp-se-workshop/VHOL_Snowflake_KDF/adf-producer.py /tmp/adf-producer.py
pip3 install geohash2 pytz boto3 requests urllib3==1.26.6
# Helper function
function error_exit
{
/opt/aws/bin/cfn-signal -e 1 -r "$1" '${EC2WaitHandle}'
exit 1
}
# Install the basic system configuration
/opt/aws/bin/cfn-init -s ${AWS::StackId} -r rJumpHost --region ${AWS::Region} || error_exit 'Failed to run cfn-init'
# All done so signal success
/opt/aws/bin/cfn-signal -e 0 -r "setup complete" '${EC2WaitHandle}'
NetworkInterfaces:
- AssociatePublicIpAddress: true
DeviceIndex: 0
GroupSet:
- !Ref JumpHostSecurityGroup
SubnetId: !Ref PublicSubnet
Tags:
- Key: Name
Value: !Sub '${AWS::StackName}-jumphost'
EC2WaitHandle:
Type: 'AWS::CloudFormation::WaitConditionHandle'
EC2WaitCondition:
Type: 'AWS::CloudFormation::WaitCondition'
Properties:
Handle: !Ref EC2WaitHandle
Timeout: 600
Outputs:
JumpHostPublicIP:
Description: Public IP of the Jumphost
Value: !GetAtt 'rJumpHost.PublicIp'
InstanceSecurityGroupID:
Description: The ID of the security group created for the Jumphost
Value: !Ref JumpHostSecurityGroup
JumpHostId:
Description: Jump Host Instance ID
Value: !Ref rJumpHost
FirehoseBackupBucketName:
Description: Bucket name for Firehose backup
Value: !Ref FirehoseBackupBucket
このコードをファイル化して、CloudFormationを実行します。実行する際はRemoteAccessCIDR
だけご注意ください。今回の検証範囲ではEC2からData Firehoseを介してSnowflakeへインターネット上で通信ができればよいので、EC2へアクセスするIP制限を設ける意味合いで私は自宅のグローバルIPを記入しました。
EC2をAWS Systems Manager Session Managaerから操作
続いて、EC2をAWS Systems Manager Session Managaerから操作してFirehoseからSnowflakeの認証に使うキーペアを生成します。
まず、Quickstartの内容に沿って、AWS Systems Manager Session Managaerで設定
からLinux shell profile
を/bin/bash
に変更して保存
を押します。
次に、セッション
からセッションの開始
を押し、先程CloudFormationで立てたインスタンスへログインします。
Quickstartに沿って、キーペアの作成から出力を行います。生成の際に用いたPasswordと、cat
で確認した公開鍵と秘密鍵は後で使用しますので、すぐにコピペできるようにしておきましょう。
cd $HOME
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
grep -v KEY rsa_key.pub | tr -d '\n' | awk '{print $1}' > pub.Key
cat pub.Key
grep -v KEY rsa_key.p8 | tr -d '\n' | awk '{print $1}' > priv.Key
cat priv.Key
Snowflake上の準備(3. Prepare the Snowflake cluster for streaming)
続いて、Snowflake上で準備を進めます。
まず、使用するユーザー、データベース、ウェアハウス、ロールを準備します。一番最後のWITHから始まるSELECT文で、Data Firehoseの設定で使用するAccount Identifierが確認できるので、こちらの値もすぐにコピペできるようにしておきましょう。
-- Set default value for multiple variables
-- For purpose of this workshop, it is recommended to use these defaults during the exercise to avoid errors
-- You should change them after the workshop
SET PWD = 'Test1234567';
SET USER = 'STREAMING_USER';
SET DB = 'ADF_STREAMING_DB';
SET WH = 'ADF_STREAMING_WH';
SET ROLE = 'ADF_STREAMING_RL';
USE ROLE ACCOUNTADMIN;
-- CREATE USERS
CREATE USER IF NOT EXISTS IDENTIFIER($USER) PASSWORD=$PWD COMMENT='STREAMING USER';
-- CREATE ROLES
CREATE OR REPLACE ROLE IDENTIFIER($ROLE);
-- CREATE DATABASE AND WAREHOUSE
CREATE DATABASE IF NOT EXISTS IDENTIFIER($DB);
USE IDENTIFIER($DB);
CREATE OR REPLACE WAREHOUSE IDENTIFIER($WH) WITH WAREHOUSE_SIZE = 'SMALL';
-- GRANTS
GRANT CREATE WAREHOUSE ON ACCOUNT TO ROLE IDENTIFIER($ROLE);
GRANT ROLE IDENTIFIER($ROLE) TO USER IDENTIFIER($USER);
GRANT OWNERSHIP ON DATABASE IDENTIFIER($DB) TO ROLE IDENTIFIER($ROLE);
GRANT USAGE ON WAREHOUSE IDENTIFIER($WH) TO ROLE IDENTIFIER($ROLE);
-- SET DEFAULTS
ALTER USER IDENTIFIER($USER) SET DEFAULT_ROLE=$ROLE;
ALTER USER IDENTIFIER($USER) SET DEFAULT_WAREHOUSE=$WH;
-- RUN FOLLOWING COMMANDS TO FIND YOUR ACCOUNT IDENTIFIER, COPY IT DOWN FOR USE LATER
-- IT WILL BE SOMETHING LIKE <organization_name>-<account_name>
-- e.g. ykmxgak-wyb52636
WITH HOSTLIST AS
(SELECT * FROM TABLE(FLATTEN(INPUT => PARSE_JSON(SYSTEM$allowlist()))))
SELECT REPLACE(VALUE:host,'.snowflakecomputing.com','') AS ACCOUNT_IDENTIFIER
FROM HOSTLIST
WHERE VALUE:type = 'SNOWFLAKE_DEPLOYMENT_REGIONLESS';
次に、作成したユーザーに先ほどEC2上で確認した公開鍵を割り当てます。
use role accountadmin;
alter user streaming_user set rsa_public_key='< pubKey >';
作成したユーザーでログインしてスキーマとテーブルの作成を行う
次に、先程作成したユーザーでSnowflakeアカウントにログインしなおし、スキーマとテーブルの作成を行います。(Quickstart上ではテーブル作成はもう少し後で行っていますが、このタイミングでまとめて行ったほうが画面の行き来が少なくて済むのでこのタイミングで行っています。)
SET DB = 'ADF_STREAMING_DB';
SET SCHEMA = 'ADF_STREAMING_SCHEMA';
USE IDENTIFIER($DB);
CREATE OR REPLACE SCHEMA IDENTIFIER($SCHEMA);
use ADF_STREAMING_DB;
use schema ADF_STREAMING_SCHEMA;
create or replace TABLE ADF_STREAMING_TBL (
ORIG VARCHAR(20),
UTC NUMBER(38,0),
ALT VARCHAR(20),
ICAO VARCHAR(20),
LON VARCHAR(20),
ID VARCHAR(20),
DEST VARCHAR(20),
LAT VARCHAR(20)
);
Amazon Data Firehoseの設定(4. Create an ADF delivery stream)
次に、Data Firehoseの設定に移ります。
Amazon Data Firehoseのページから、Firehoseストリームを作成
を押します。
ここから、各設定値を入力していきます。
ソース
はDirect PUT
、送信先
はSnowflake
を選択します。
Firehose ストリーム名
は任意の名称を入れましょう。ここで決めた名称は、後でEC2からPython処理を実行する際に引数として使うことになります。
次は送信先の設定となりますが、以下の内容に沿って入力してください。
SnowflakeアカウントURL
は、形式
に記載されたURLのうち、[account identifier]
を先程Snowflakeの画面で確認したものに変換して入力ユーザー
は、STREAMING_USER
と入力プライベートキー
は、EC2上で作成したキーペアの秘密鍵の値を入力パスフレーズ
は、EC2上でキーペア作成時に用いたパスワードを入力ロール
は、カスタムのSnowflakeロールを使用する
を選択し、ADF_STREAMING_RL
と入力Snowflakeデータベース
は、ADF_STREAMING_DB
と入力Snowflakeスキーマ
は、ADF_STREAMING_SCHEMA
と入力Snowflakeテーブル
は、ADF_STREAMING_TBL
と入力
バックアップの設定では、S3バックアップバケット
にCloudFormationで構成されたS3バケットを選択します。
あとは右下のFirehoseストリームを作成
を押せばOKです。
データの取得とクエリ実行(5. Ingest and Query data in Snowflake)
最後に、実際にEC2上でデータをストリーミングするPython処理を実行し、Data Firehose経由でSnowflakeにロードされているかを確認してみます。
EC2にログインし、ストリームデータを生成するPythonを実行
まず、EC2にログインして、ストリームデータを生成するPythonを実行します。
実際に実行するPythonコードですが、https://snowflake-corp-se-workshop.s3.us-west-1.amazonaws.com/VHOL_Snowflake_KDF/adf-producer.pyのリンクからダウンロード出来ます。
コードの内容としては下記のようになっています。
import pytz,boto3,time,requests,sys,re,json
from datetime import datetime
from botocore.config import Config
global tbl,maxNumRecords
maxNumRecords=250
def dumpToKDF(rec):
for j in range(0,len(rec),maxNumRecords):
records=rec[j:j+maxNumRecords]
numRec=str(len(records))
try:
for item in records:
print(json.dumps(item))
result = kdfClient.put_record(DeliveryStreamName=deliveryStreamName,Record={'Data': json.dumps(item)})
except Exception as err:
print("Error:", err)
#Main
if len(sys.argv) != 2:
print("Usage: python3 ",sys.argv[0], " <delivery stream name>")
sys.exit()
deliveryStreamName=sys.argv[1]
headers = {'Content-type': 'application/json'}
url = "http://169.254.169.254/latest/meta-data/placement/availability-zone"
region = requests.get(url).text[:-1]
kdf = boto3.Session()
kdfClient = kdf.client('firehose', region_name=region, config=Config(read_timeout=20, max_pool_connections=5000, retries={'max_attempts': 10}))
url='http://ecs-alb-1504531980.us-west-2.elb.amazonaws.com:8502/opensky'
kdfflight=[]
while True:
try:
results = requests.get(url, headers=headers)
r=json.loads(results.text)
print('Headers:')
except:
print(results.headers)
pass
ts=int(time.time())
print(datetime.now(pytz.timezone('America/Los_Angeles')).strftime("%D %H:%M:%S"))
for st in r:
(icao,id,utc,lon,lat,alt,dest,orig)=(st['icao'],st['id'],st['utc'],st['lon'],st['lat'],st['alt'],st['dest'],st['orig'])
if dest=='KSFO' or dest=='KSJC' or dest=='KOAK' or dest=='KPAO' or dest=='KSQL' or dest=='KRHV' or dest=='KNUQ' or dest=='KHWD' or dest=='KLVK' or dest=='KHAF':
kdfRecord={}
kdfRecord['utc']=str(utc)
kdfRecord['icao']=icao
kdfRecord['id']=id
kdfRecord['lat']=str(lat)
kdfRecord['lon']=str(lon)
kdfRecord['alt']=str(int(alt))
kdfRecord['dest']=dest
kdfRecord['orig']=orig
kdfflight.append(kdfRecord)
dumpToKDF(kdfflight)
kdfflight=[]
print('-----------')
print('Sleeping for 20 seconds')
print('-----------')
time.sleep(20)
では、キーペアを生成したときと同じくAWS Systems Manager Session Managaerから対象のEC2にログインし、以下のコマンドを実行します。
python3 /tmp/adf-producer.py <Data Firehoseで設定したストリーム名>
これで、下図のようにJSON形式でデータがどんどん生成されていけばOKです。
Snowflakeで実際にテーブルをクエリ
それではSnowflakeにログインし、ちゃんとテーブルにストリーミングされているかを確認してみます。
まずはシンプルなSELECT文を実行してみると、ちゃんとデータがストリーミングされていることがわかります!
select * from adf_streaming_tbl;
次に、このテーブルからより実際に使いやすくするための変換を行うビューを定義します。タイムスタンプをさまざまなタイムゾーンに変換、SnowflakeのGeohash関数を使用してGrafanaなどの時系列視覚化ツールで使用できる Geohashを生成、st_distance関数を使って飛行機とサンフランシスコ空港の間の距離を計算、といったことを行っています。
create or replace view flights_vw
as select
utc::timestamp_ntz ts_utc,
CONVERT_TIMEZONE('UTC','America/Los_Angeles',ts_utc::timestamp_ntz) as ts_pt,
alt::integer alt,
dest::string dest,
orig::string orig,
id::string id,
icao::string icao,
lat::float lat,
lon::float lon,
st_geohash(to_geography(st_makepoint(lon, lat)),12) geohash,
st_distance(st_makepoint(-122.366340, 37.616245), st_makepoint(lon, lat))/1609::float dist_to_sfo,
year(ts_pt) yr,
month(ts_pt) mo,
day(ts_pt) dd,
hour(ts_pt) hr
FROM adf_streaming_tbl;
実際にビューに対してSELECT文を実行すると下図のようなデータが得られます。このようなデータがニアリアルタイムで得られるため、今現在の空港と飛行機の位置関係を分析したいときにも使えますね!
select * from flights_vw;
お片付け
Quickstartの内容としては以上となるので、これ以上課金がされないようにAWSのリソースを削除しておきましょう。
まずCloudFormationでのスタックの削除を行います。(もしS3バケットが削除できない場合には、バケットの中身を空にしてからスタックを削除してください。)
次に、手動で定義したData Firehoseのストリームの削除を行います。これでお片付けは完了です!
最後に
SnowflakeのQuickstart「Getting Started with Snowflake and Amazon Data Firehose (ADF)」をやってみたのでその内容をまとめてみました。
実はData Firehoseを触るのは初めてだったのですが、Data FirehoseとSnowflakeを使って簡単にデータストリーミングパイプラインが構築できることがわかりましたね!
今回はパブリックサブネット経由での実行だったこともあるので、本番運用で想定されるプライベートサブネット経由での実行の場合はもう少しネットワーク周りの設定が必要になると思いますが、本記事が参考になると嬉しいです。