のべラボ.blog

謙虚に、臆せず、さぼらずにブログを書く

AWS Step Functions の Map ステートに動的に生成した配列を渡してみる

前回の記事で AWS Step Functions の Map ステートの基本的な使い方を試してみました。

nobelabo.hatenablog.com

前回は、ステートマシン実行時のパラメータに配列を手入力して、その配列を Map ステートにそのまま渡していましたが、今回はステートマシンの中で動的に配列を生成し、それを Map ステートに渡すようにしたいと思います。

ステートマシンの Map ステートから呼び出す Lambda 関数は、前回の記事と同じものとします。

それに加え、今回は動的に配列を生成する AWS Lambda 関数を作成します。今回は、Python 3.9 で作成しました。

便宜上、このLambda 関数を ArrayGenerator と呼ぶことにします。

この ArrayGenerator 関数は、イベントオブジェクトから length というキーで指定された数値に基づき、動的に配列を生成してリターンします。

import json

def lambda_handler(event, context):
    print(event)
    length = event['length']
    items = []
    for id in range(length):
      items.append({'item': id})
    return items

次に、ステートマシンを作成します。下記の JSON を使用して、前回の記事と同様の手順で作成します。

{
  "Comment": "test map state",
  "StartAt": "Generate Array",
  "States": {
    "Generate Array": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "arn:aws:lambda:ap-northeast-1:123412341234:function:ArrayGenerator"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "Next": "Map",
      "ResultPath": "$.generatedArray"
    },
    "Map": {
      "Type": "Map",
      "End": true,
      "Iterator": {
        "StartAt": "Lambda Invoke by Map",
        "States": {
          "Lambda Invoke by Map": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            "OutputPath": "$.Payload",
            "Parameters": {
              "Payload.$": "$",
              "FunctionName": "arn:aws:lambda:ap-northeast-1:123412341234:function:maptest"
            },
            "Retry": [
              {
                "ErrorEquals": [
                  "Lambda.ServiceException",
                  "Lambda.AWSLambdaException",
                  "Lambda.SdkClientException"
                ],
                "IntervalSeconds": 2,
                "MaxAttempts": 6,
                "BackoffRate": 2
              }
            ],
            "End": true
          }
        }
      },
      "MaxConcurrency": 32,
      "ItemsPath": "$.generatedArray.Payload"
    }
  }
}

作成したステートマシンは、下図のようになります。

https://cdn-ak.f.st-hatena.com/images/fotolife/n/neob/20221113/20221113170044.png

ステートマシンの内容を少し見ていきます。

このステートマシンは前回の記事と異なり、まず Generate Array ステートで、Array Generator 関数を呼び出しています。

ArrayGenerator 関数に 渡すパラメータは、ステートマシン実行時に指定するパラメータをそのまま渡す、という前提です。

そのために、Generate Array ステートでは、ArrayGenerator 関数に 渡すパラメータに、"Payload.$": "$" を指定しています。

また、"ResultPath": "$.generatedArray" という指定があります。

これは、ArrayGenerator 関数がリターンする配列をステートマシンのデータに追加し、$.generatedArray というキーで参照できるようにするための指定です。

後続の Map ステートでは、この配列を受け取るため、"ItemsPath": "$.generatedArray.Payload" という指定をしています。

では、AWS マネジメントコンソールから実行してみます。今回は、配列の要素数を 3 を指定してみます。

作成したステートマシンのページで [実行の開始] ボタンを選択して [入力] に下記のパラメータを指定します。[実行の開始] をクリックします。

{"length": 3}

結果、下図のようにステートマシンは成功します。

https://cdn-ak.f.st-hatena.com/images/fotolife/n/neob/20221113/20221113170050.png

前回の記事と同じように、ページを少し下にスクロールして、[イベント] セクションが表示させ、その中から Lambda Invoke ステップの [Logs] のリンクを選択すると、CloudWatch Logs のページが表示させます。

そこで、3 つのログストリームが作成されていることを確認します。

続けて実行する時は、まずログストリームをすべて削除してから実行すると、手っ取り早く並列性を確認できます。

今回作成したステートマシンは、パラメータにより生成する配列の要素数を指定できるので、値を変更して Map による並列での呼び出し数が変更されることも確認してみましょう!

/* -----codeの行番号----- */