データアナリティクス事業本部の根本です。Workflowsでエクスポネンシャルバックオフを実装して検証してみました。WorkflowsでAPIを叩く予定のある方はよかったら読んでみてください。
この記事の対象者
- WorkflowsでAPIを叩く際にエクスポネンシャルバックオフを実装したいひと
はじめに:エクスポネンシャルバックオフとは
- エクスポネンシャル(exponential):指数関数的
- バックオフ(backoff):後退.
簡潔に表すと、指数関数的にリトライ間隔を後退(遅らせる)させるアルゴリズムになります。
以下の例は、リトライごとに2の累乗分の時間待機するものです。
呼び出し回数 | 待機時間 | |
---|---|---|
1 | 0秒 | (初回呼び出し) |
2 | 1秒 | 2の0乗 |
3 | 2秒 | 2の1乗 |
4 | 4秒 | 2の2乗 |
5 | 8秒 | 2の3乗 |
6 | 16秒 | 2の4乗 |
7 | 32秒 | 2の5乗 |
8 | 64秒 | 2の6乗 |
9 | 128秒 | 2の7乗 |
エクスポネンシャルバックオフは、APIを呼び出した時に、即時リトライをするのではなく待機時間を指数関数的に増加させて呼び出しタイミングを待機させます。
そうすることでサーバ負荷の軽減やリクエスト数の低減を図ることができますし、サーバ側の一時的な負荷でのAPI呼び出し失敗であれば再試行で成功する可能性も上がります。
Workflowsにはエクスポネンシャルバックオフを実装するための機能再試行ステップ
(retry step)があります。
それでは実際にWorkflowsで再試行ステップ(retry step)
を用いてエクスポネンシャルバックオフを実装してみます。
やってみる
事前準備
事前準備として、2つのCloud Functionsを実装してデプロイしました。以前記事にしたものです。
デプロイしているCloud Functionsは以下です。
1. ファイル一覧取得関数(list_files STEP)
2. 並列処理検証関数(processFile STEP)←エクスポネンシャルバックオフでの再試行対象関数
ファイル一覧取得関数はCloud Storageからファイルを取得するので、バケットを作成して以下のファイルを保存しています。
test1.txt
test2.txt
ファイル一覧取得関数
httpトリガー起動の関数として作成しています。指定バケットに保存されているファイル一覧を取得して返却します。
list_filest
import functions_framework
from google.cloud import storage
from flask import jsonify
def list_blobs(bucket_name):
storage_client = storage.Client()
blobs = storage_client.list_blobs(bucket_name)
blob_names = [blob.name for blob in blobs]
return blob_names
@functions_framework.http
def get_file_names(request):
BUCKET = "作成したバケット名"
list_blobs_result = list_blobs(BUCKET)
return jsonify({"file_names": list_blobs_result})
並列処理検証関数
httpトリガーで実装をします。
Workflowsから呼び出す際に、リクエストボディにファイル名を格納しています。エクスポネンシャルバックオフを検証するため、ファイル名がtest1.txt
だった場合は意図的にリトライ処理を発生させるためHTTPステータスコード503
を返却します。
(test1.txtは呼び出しに失敗するため複数回再試行される,test2.txtは複数回呼び出されることはないという仮説です)
並列処理検証関数
import functions_framework
from flask import jsonify
@functions_framework.http
def parallel_process(request):
data = request.get_json()
file_name = data.get('file_name')
print(f"[{file_name}]:関数が起動されました。対象ファイルは[{file_name}]です。")
if(file_name == 'test1.txt'):
response = jsonify({'message': 'Error test'})
response.status_code = 503
return response
print(f"[{file_name}]:処理終了")
return file_name + ':処理完了'
Workflowsの実装
以下のYAMLでは1.ファイル一覧取得関数
で取得したファイルのリストを、parallel
ステップで並列処理して2.並列処理検証関数
で並列処理する流れとなっています。
workflows.yaml
main:
steps:
- list_files:
call: http.get
args:
url: "ファイル一覧取得関数のURL"
result: getFilesResult
- processFiles:
parallel:
for:
value: name
in: ${getFilesResult.body.file_names}
steps:
- processFile:
try:
steps:
- parallelFunction:
call: http.post
args:
url: "並列処理検証関数のURL"
body:
file_name: ${name}
result: processResult
- logRecode:
call: sys.log
args:
text: ${processResult.body}
severity: "INFO"
retry:
predicate: ${retryCheck}
max_retries: 6
backoff:
initial_delay: 1
max_delay: 32
multiplier: 2
checkRetry:
params: [error_array]
steps:
- checkError:
switch:
- condition: ${("code" in error_array) and (error_array.code == 503)}
return: True
- nothingToDo:
return: False
それぞれのステップを見ていきます。
- list_files:
call: http.get
args:
url: "ファイル一覧取得関数のURL"
result: getFilesResult
ファイル一覧を取得してgetFilesResult
に格納をします。
parallel:
for:
value: name
in: ${getFilesResult.body.file_names}
steps:
- processFile:
try:
steps:
- parallelFunction:
call: http.post
args:
url: "並列処理検証関数のURL"
body:
file_name: ${name}
result: processResult
- logRecode:
call: sys.log
args:
text: ${processResult.body}
severity: "INFO"
parallel
ステップは並列処理のステップとなります。for文でgetFilesResult
に格納されたファイル名を
取り出し、リクエストボディ格納してにそれぞれ並列処理検証関数を呼び出します。
try
ステップ内で起きたエラーはretry
ステップにてキャッチされます。
retry:
predicate: ${checkRetry}
max_retries: 6
backoff:
initial_delay: 1
max_delay: 32
multiplier: 2
Workflowsにおけるエクスポネンシャルバックオフの肝の部分です。
max_retries
:最大再試行回数(初回実行はカウント対象外)
initial_delay
:初回遅延秒数
max_delay
:最大遅延秒数
multiplier
:乗数(直前の遅延秒数に掛け算される値)
initial_delay
に1が設定されていて、multiplier
が2、max_retries
が6の場合だと、待機時間は以下となります。
1回目の呼び出し:待機時間1秒
2回目の呼び出し:待機時間2秒(multiplier * 1回目の呼び出し待機時間 = 2 * 1
)
3回目の呼び出し:待機時間4秒(multiplier * 2回目の呼び出し待機時間 = 2 * 2
)
4回目の呼び出し:待機時間8秒(multiplier * 3回目の呼び出し待機時間 = 2 * 4
)
5回目の呼び出し:待機時間16秒(multiplier * 4回目の呼び出し待機時間 = 2 * 8
)
6回目の呼び出し:待機時間32秒(multiplier * 5回目の呼び出し待機時間 = 2 * 16
)
checkRetry:
params: [error_array]
steps:
- checkError:
switch:
- condition: ${("code" in error_array) and (error_array.code == 503)}
return: True
- nothingToDo:
return: False
上記はリトライ判定のステップです。並列処理検証関数の実行結果のエラーコードが503
の場合True
を返却してリトライ処理が行われます。
実行して検証してみる
Workflowsを実行したところ、1分ほど経過してエラーとなりました。
呼び出している並列処理検証関数は[test1.txt]の場合、503しか返さないためです。
並列処理検証関数のログを見てみます。
test2.txtを引数にした場合は、503
を返却せず正常終了しているため1回のみ呼び出されているのが確認できます(青枠です)。
test1.txtを引数にした呼び出しに関しては503
を発生させているので全てエラーになっており、複数回呼び出されているのが確認できました。
なかなかみづらいのですが、伝わりましたでしょうか?
呼び出し時間 | 待機時間 | 呼び出し回数 |
---|---|---|
2024-04-23 17:46:33.467 JST | 0 | 1回目 |
2024-04-23 17:46:34.895 JST | 1 | 2回目 |
2024-04-23 17:46:37.209 JST | 2 | 3回目 |
2024-04-23 17:46:41.529 JST | 4 | 4回目 |
2024-04-23 17:46:49.838 JST | 8 | 5回目 |
2024-04-23 17:47:06.170 JST | 16 | 6回目 |
2024-04-23 17:47:38.504 JST | 32 | 7回目 |
max_retries
に6を指定しているので、合計7回関数の呼び出しが行われています。
max_retries
に設定した数値は初回実行はカウント対象外なので、6を指定すると1(初回実行) + 6(max_retries) = 7
となるからです。
WorkflowsのYAMLに再試行ステップを用いることで、簡潔にエクスポネンシャルバックオフを実装することができました。
おわりに
再試行ステップを用いることで、判定回数の分岐や判定処理を実装することなく確実にエクスポネンシャルバックオフを実装することができとても嬉しくなりました。
再試行回数や秒数の設定は個々のワークロードによると思いますが、適切な値を設定しておけばAPIの一時的なエラーにも対応できて良いと考えます。個人的には今後Workflowsを本番環境で運用する際には実装したい機能だなと思いました。
この記事がどなたかのお役に立てば嬉しいです。それでは。