【AWS Step Functions】 並列処理と動的並列処理
こんにちは。プラットフォーム技術部の渋谷です。
前回の記事ではLambdaの呼び出し方法とChoiceを使った条件分岐について紹介しました。
今回は並列処理と動的並列処理について説明していきたいと思います。
これらの機能は、複数のタスクが全て完了するまで次のタスクを実行したくない場合や、一連のタスクを配列の要素ごとに反復して行いたい場合に利用できます。
並列処理 (Parallel)
Step Fucntionsの並列処理は、複数のタスクが全て完了するまで次のタスクを実行したくない場合に使用します。
Parallelステート(“Type": “Parallel")を使用して、ステートマシンで定義された一定数のブランチを並列に実行できます。
Parallelには次の追加のフィールドがあります。
- Branches (必須)
ステートマシンで並列して実行するステートを指定。 - ResultPath (オプション)
入力データ内のどこにブランチステートの出力を配置するかを指定。さらにOutputPath フィールドが指定されている場合は、それに従ってフィルタリングされステートの出力となる。 - ResultSelector (オプション)
キーと値のペアを渡す。例えばLambdaを呼び出す際などに戻るメタデータを、キーと値のペアを指定して渡すことができる。 - Retry (オプション)
ステートでランタイムエラーが発生した場合の再試行ポリシーを定義。 - Catch (オプション)
ステートでランタイムエラーが発生し、再試行ポリシー実施後もしくは定義されていない場合に実行される例外処理ステートを定義。
並列処理サンプル
並列処理を含んだステートマシンの定義を作成します。
この定義では10秒後に処理が完了する"Wait10s"と20秒後に処理が完了する"Wait20s"という2つのタスクをParallelで並列実行しています。
{
"Comment": "Parallel Example.",
"StartAt": "myFirstParallel",
"States": {
"myFirstParallel": {
"Type": "Parallel",
"End": true,
"Branches": [
{
"StartAt": "Wait20s",
"States": {
"Wait20s": {
"Type": "Wait",
"Seconds": 20,
"End": true
}
}
},
{
"StartAt": "Wait10s",
"States": {
"Wait10s": {
"Type": "Wait",
"Seconds": 10,
"End": true
}
}
}
]
}
}
}
処理を実行してから10秒後では"Wait10s"が成功し"Wait20s"は進行中のステータスとなり、20秒後には"Wait20s"も成功ステータスとなっていることをグラフインスペクターから確認できます。
Parallel 状態の入出力処理
Parallel状態では各ブランチに入力データのコピーを与えます。各ブランチからの出力を配列に格納し出力します。
今から作成するサンプルでは顧客情報(名前、電話番号、住所、生年月日)を入力とし、名前と生年月日を返します。
入力する顧客情報は次の通りとします。
{
"Name": "Suzuki Taro",
"Phone": "090-XXXX-XXXX",
"Address": "Japan",
"Birthday": "1990-01-01"
}
名前と生年月日を取得するLambdaを作成します。ランタイムはPython3.8です。
# GetName
import json
def lambda_handler(event, context):
return event["Name"]
# GetBirthday
import json
def lambda_handler(event, context):
return event["Birthday"]
Parallelを使用した定義は次のように記載します。
{
"Comment": "Parallel Example.",
"StartAt": "LookupCustomerInfo",
"States": {
"LookupCustomerInfo": {
"Type": "Parallel",
"End": true,
"Branches": [
{
"StartAt": "GetName",
"States": {
"GetName": {
"Type": "Task",
"Resource": "<GetNameのARN>",
"End": true
}
}
},
{
"StartAt": "GetBirthday",
"States": {
"GetBirthday": {
"Type": "Task",
"Resource": "<GetBirthdayのARN>",
"End": true
}
}
}
]
}
}
}
作成したステートマシンを実行すると[名前, 誕生日]の配列が出力されます。
処理が失敗する場合はステートマシンの権限にLambdaの実行権限がついているか確認してください。既に作成しているステートマシンを再利用しようとするとLambdaの実行権限がついておらず失敗してしまいます。
[
"Suzuki Taro",
"1990-01-01"
]
並列処理の失敗
並列処理をしたブランチが一つでも失敗すると、並列処理全体が失敗とみなされすべての処理が停止します。
次の定義では"Wait10s"から"FailState"に遷移します。"FailState"は"Type:Fail"のため必ず失敗します。"Wait20s"は処理が進行中ですが、"FailState"が失敗したため進行中の処理がキャンセルされます。
{
"Comment": "Parallel Example.",
"StartAt": "FailParallel",
"States": {
"FailParallel": {
"Type": "Parallel",
"End": true,
"Branches": [
{
"StartAt": "Wait20s",
"States": {
"Wait20s": {
"Type": "Wait",
"Seconds": 20,
"End": true
}
}
},
{
"StartAt": "Wait10s",
"States": {
"Wait10s": {
"Type": "Wait",
"Seconds": 10,
"Next": "FailState"
},
"FailState": {
"Type": "Fail"
}
}
},
{
"StartAt": "Wait5s",
"States": {
"Wait5s": {
"Type": "Wait",
"Seconds": 5,
"End": true
}
}
}
]
}
}
}
動的並列処理 (Map)
Map状態(“Type": “Map")を利用することにより、入力配列の要素ごとに一連のステップを実行できます。
Parallel状態では同じ入力を使用して複数のブランチを実行しましたが、Map状態では複数の配列に対して同じステップを実行します。
Mapには次の追加のフィールドがあります。
- Iterator (必須)
配列の各要素を処理するステートマシンを定義するオブジェクト。 - ItemsPath (オプション)
有効な入力内で配列フィールドを見つける場所を識別する参照パス。 - MaxConcurrency (オプション)
反復子の呼び出しを並列実行できる上限数を指定する整数。 - ResultPath (オプション)
ブランチの出力を配置する場所を指定。 - ResultSelector (オプション)
キーと値のペアを渡す。 - Retry (オプション)
Retrier と呼ばれるオブジェクトの配列。状態でランタイムエラーが発生した場合の再試行ポリシーを定義。 - Catch (オプション)
Catcher と呼ばれるオブジェクトの配列。状態でランタイムエラーが発生し、再試行ポリシーがすでに試された後または定義されていない場合に実行されるフォールバック状態を定義。
動的並列処理のサンプル
動的並列処理を含んだステートマシンの定義を作成します。
並列処理で作成したものと同様に、顧客情報(名前、電話番号、住所、生年月日)を入力とし、名前と生年月日を返す定義を作成します。
{
"Comment": "Map Example.",
"StartAt": "My1stMap",
"States": {
"My1stMap": {
"Type": "Map",
"End": true,
"Iterator": {
"StartAt": "LookupCustomerInfo",
"States": {
"LookupCustomerInfo": {
"Type": "Parallel",
"End": true,
"Branches": [
{
"StartAt": "GetName",
"States": {
"GetName": {
"Type": "Task",
"Resource": "<GetNameのARN>",
"End": true
}
}
},
{
"StartAt": "GetBirthday",
"States": {
"GetBirthday": {
"Type": "Task",
"Resource": "<GetBirthdayのARN>",
"End": true
}
}
}
]
}
}
}
}
}
}
次のような入力データを与えます。
[
{
"Name": "Suzuki Taro",
"Phone": "090-XXXX-XXXX",
"Address": "Tokyo",
"Birthday": "1990-01-01"
},
{
"Name": "Yamada Ziro",
"Phone": "090-XXXX-XXXX",
"Address": "Nagoya",
"Birthday": "2004-2-10"
},
{
"Name": "Yamada Hanako",
"Phone": "090-XXXX-XXXX",
"Address": "Osaka",
"Birthday": "1981-05-10"
}
]
実行が成功するといつも通りグラフインスペクターが表示されます。
グラフインスペクターの内側の点線内部をクリックするとインデックスが表示され、インデックスに応じた入力、出力を表示できます。
動的並列処理の失敗
処理が一つでも失敗した場合、動的並列処理全体は失敗となります。
試しに"birthday"がないデータを先ほどのJSONに追加してみます。
{
"Name": "Tanaka Satoshi",
"Phone": "090-XXXX-XXXX",
"Address": "Okinawa"
}
インデックス0,1,2は成功しますが、インデックス3にて"birthday"が取得できず処理がキャンセル状態となります。"GetName"と"GetBirthday"は並列処理のため一つでも失敗した場合全体が失敗となるため、インデックス3が失敗となります。
結果、インデックス3が失敗となったため動的並列処理全体が失敗というステータスになります。
最後に
今回は並列処理と動的並列処理について記載しました。
どちらも便利な機能ではありますが、一つでも失敗となった場合に全体が失敗となるリスクがあります。
長時間に及ぶ処理で失敗すると泣けてしまいます。使いどころを間違えないように気を付けましょう。
2回にわたってAWS Step Functionsの基本的な使い方について紹介しました。
他にも豊富な機能が用意されていますので、ぜひワークフローで日々の運用作業を自動化するなど活用してみてください。