のべラボ.blog

Tech Blog | AWS や サーバーレスやコンテナ などなど

AWS Step Functions の Distributed Map ステートを試してみる

これまで、AWS Step Functions の Map ステートを試してみる記事を 2 つ(下記)書いてきました。

nobelabo.hatenablog.com

nobelabo.hatenablog.com

今回は、その続きとして 2022 年 12 月にアナウンスされた Distributed Map を試してみます。

aws.amazon.com

この記事の内容は、2022 年 12月時点で検証した内容に基づきます。リージョンは東京リージョンを使用しています。


これまでの Map ステートでは、並列度は 40 までしかサポートされていませんでした。

そのため、40 以上の並列度で処理をしたい場合は、Map ステートから別のステートマシンを呼出し、そのステートマシンでさらに Map ステートを使用するという Mapの入れ子構造を作成する必要がありました。

例えば、1024 の並列処理を行いたい場合は、親のステートマシンで 32 の並列度を指定した Map ステートを用意し、そこから呼び出した子となるステートマシンで 32 の並列度を指定した Map を構成することで、結果的に 32 × 32 = 1024 の並列度を実現していました。

しかし、今回 リリースされた Distributed Map では、並列度が 10,000 までサポートされましたので、「Map の 入れ子構造」を作成する必要はなくなりました。

では、実際に Distributed Map を使ったステートマシンを作成してみます。


今回も、Distributed Map に渡す配列は、下記の Lambda関数 ArrayGenerator で作成することにします。

この ArrayGenerator は Python 3.9 で作成されており、イベントオブジェクトから length というキーで指定された数値に基づき、動的に配列を生成してリターンします。

import json

def lambda_handler(event, context):
    print(event)
    length = event['length']
    items = []
    for id in range(length):
      items.append({'item': id})
    return items

次に、ステートマシンを作成します。下記の JSON を使用して、前回の記事と同様の手順で作成します。

なお、前回は Map から Lambda関数を invoke していましたが、今回はコスト面を考慮し、特に何もしないステートである Passステートを Distributed Map から実行する形にします。

ステートマシンの JSON は下記です。

(なお、Workflow Studio で GUI から作成することもできますが、その場合は Map ステートを選択して、「処理モード」として「分散」を指定します。)

{
  "Comment": "A description of my state machine",
  "StartAt": "Lambda Invoke",
  "States": {
    "Lambda Invoke": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "arn:aws:lambda:ap-northeast-1:123412341234:function:ArrayGenerator"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "ResultPath": "$.generatedArray",
      "Next": "Map"
    },
    "Map": {
      "Type": "Map",
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "DISTRIBUTED",
          "ExecutionType": "STANDARD"
        },
        "StartAt": "Pass",
        "States": {
          "Pass": {
            "Type": "Pass",
            "End": true
          }
        }
      },
      "End": true,
      "Label": "Map",
      "MaxConcurrency": 1000,
      "InputPath": "$.generatedArray.Payload"
    }
  }
}

作成したステートマシンは、下図のようになります。

内容は、Passステートを使っている以外は、こちらの記事の内容とほぼ同じですね。

では、AWS マネジメントコンソールから実行してみます。ひとまず、配列の要素数を 3 で指定してみます。

作成したステートマシンのページで [実行の開始] ボタンを選択して [入力] に下記のパラメータを指定します。[実行の開始] をクリックします。

{"length": 3}

結果、下図のようにステートマシンは成功します。

ページの下側の「イベント」セクションをみてみましょう。

Resource」列に「マップ実行」というリンクがありますね。これをクリックすると、次のように並列処理の結果を確認できます。

実行」セクションの中の「名前」のリンクをどれか1つクリックすると、Pass が実行されたことも確認できます。

問題なく動作できていたら、並列度を 40よりも大きい値でも試してみましょう。このステートマシンを実行する時のパラメータの値を変えるだけで試せます。

{"length": 50}

また、Distributed Map では全体の処理のうち、許容される失敗のしきい値を割合(%)、または失敗した数で指定することもできたり、処理対象を配列だけでなく Amazon S3バケットのオブジェクトにすることもできます。

非常に興味深いですね。このような Distributed Map のその他の機能も、試していきたいです!

/* -----codeの行番号----- */