のべラボ.blog

Tech Blog | AWS や サーバーレスやコンテナ などなど

Amazon Athena を AWS SDK for Python ( boto3 ) から使用してみる

前回の記事では、Amazon Athena を AWS CLI から操作してみましたが、今回は AWS SDK for Python 、つまり boto3 を使って Python のコードから Amazon Athena を操作してみます。

操作する内容は、前回の記事のAWS CLI で行った操作と同じことをやってみようと思います。

nobelabo.hatenablog.com

なお、この記事の内容は 2023年 1月時点で検証した結果に基づきます。


boto3 で Amazon Athena を操作するにあたって、参考にしたドキュメントは、もちろん boto3 の API リファレンスです。

boto3.amazonaws.com

boto3 の client オブジェクトを使用するので、前回の AWS CLI で使用したコマンドと一致するメソッドを見つけてコードを書けばよさそうです。

まずは、完成形を掲載します。

import boto3
import pprint

workgroup_name = "my-workgroup-with-boto3"
result_location = "s3://tnobe-datalake-athena-result/my-workgroup-with-boto3/"
catalog_name = "AwsDataCatalog"
database_name = "mydatabaseboto3"
sql_create_database = "CREATE SCHEMA " + database_name
sql_create_table =  '''\
 CREATE EXTERNAL TABLE IF NOT EXISTS cloudfront_logs (     \
  LogDate DATE,                                           \
  Time STRING,                                            \
  Location STRING,                                        \
  Bytes INT,                                              \
  RequestIP STRING,                                       \
  Method STRING,                                          \
  Host STRING,                                            \
  Uri STRING,                                             \
  Status INT,                                             \
  Referrer STRING,                                        \
  ClientInfo STRING                                       \
  )                                                       \
  ROW FORMAT DELIMITED                                    \
  FIELDS TERMINATED BY '\t'                               \
  LINES TERMINATED BY '\n'                                \
  LOCATION 's3://athena-examples-ap-northeast-1/cloudfront/plaintext/' \
  '''
sql_query = '''\
  SELECT logdate, location, uri, status \
  FROM cloudfront_logs  \
  WHERE method = 'GET' and status = 200 and location like 'SFO%' limit 10 \
  '''
sql_drop_database = "DROP SCHEMA " + database_name

client = boto3.client('athena')

try:
    # ワークグループの作成
    response = client.create_work_group(
        Name=workgroup_name,
        Configuration={
            'ResultConfiguration': {
            'OutputLocation': result_location
            }
        },
        Description='demo workgroup with boto3'
    )
   
    # ワークグループの一覧表示
    response = client.list_work_groups()
    print("--- Workgroup ---")
    pprint.pprint(response)
    
    # データベースの作成
    response = client.start_query_execution(
      QueryString=sql_create_database,
      WorkGroup=workgroup_name
    )
    
    # データベースの一覧表示
    response = client.list_databases(
      CatalogName=catalog_name
    )
    print("--- Database ---")
    pprint.pprint(response)
    
    # テーブルの作成
    response = client.start_query_execution(
      QueryString=sql_create_table,
      QueryExecutionContext={
        'Database': database_name,
        'Catalog': catalog_name
      },
      WorkGroup=workgroup_name
    )
    
    # クエリーの発行
    response = client.start_query_execution(
      QueryString=sql_query,
      QueryExecutionContext={
        'Database': database_name,
        'Catalog': catalog_name
      },
      WorkGroup=workgroup_name
    )
    
    # QueryExecutionId の取得
    query_exec_id = response["QueryExecutionId"]

    # クエリーが終了するまで待機
    status = ""
    while status != "SUCCEEDED":
      response = client.get_query_execution(
        QueryExecutionId=query_exec_id
      )
      status = response["QueryExecution"]["Status"]["State"]
    print(status)
    
    # クエリーの結果取得
    response = client.get_query_results(
      QueryExecutionId=query_exec_id
    )
    print("--- Query Result ---")
    pprint.pprint(response)
    
except Exception as e:
    print(e)
finally:    
    print("--- Drop Database ---")
    response = client.start_query_execution(
      QueryString=sql_drop_database,
      WorkGroup=workgroup_name
    )
    print("--- Delete Workgroup ---")
    response = client.delete_work_group(
      WorkGroup=workgroup_name,
      RecursiveDeleteOption=True
    )
    print("--- END ---")

このコードのポイント部分をみていきます。

最初は、6 行目と 7行目(下記)です。

database_name = "mydatabaseboto3"
sql_create_database = "CREATE SCHEMA " + database_name

上記のように、AWS CLI 使用時と同じくデータベース作成時に CREATE DATABASE ではなく、CREATE SCHEMA を使用しなければエラーになります。、

これは、データベース削除時も同じで DROP DATABASE ではなく DROP SCHEMA にする必要があります。

また、データベース名に ハイフン (-) を含めることができません。

次に、86 行目から 93 行目(下記)に注目して下さい。

# クエリーが終了するまで待機
    status = ""
    while status != "SUCCEEDED":
      response = client.get_query_execution(
        QueryExecutionId=query_exec_id
      )
      status = response["QueryExecution"]["Status"]["State"]
    print(status)

AWS CLI を使用する時は 1 つ 1 つのコマンドの実行の間に時間があったので問題なかったのですが、Python のコードでクエリー発行から結果取得まで一気に行うと、まだクエリーが完了していない状態で結果を取得しようとしてしまうこともあり、下記のエラーが発生します。

An error occurred (InvalidRequestException) when calling the GetQueryResults operation: Query has not yet finished. Current state: QUEUED

よって、get_query_execution メソッドを使用してクエリーのステータスが SUCCEEDED になるまで待機してから結果を取得するようなコードにしています。

ループで待機せず、Waiter のような自動的に待機してくれる方法があればいいのですが、見当たらなかったため、致し方なく while ループを使いました。

ちなみに、ステータスとしては、クエリー発行後は QUEUED 、クエリー実行中は RUNNING、クエリーが成功したら SUCCEEDED となります。

この Python のコードを実行すると、Amazon Athena のワークグループの一覧、データベースの一覧、クエリ結果が表示されます。

下記は、クエリ結果部分を抜粋した内容です。

--- Query Result ---
{'ResponseMetadata': {'HTTPHeaders': {'connection': 'keep-alive',
                                      'content-length': '2183',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Sun, 22 Jan 2023 08:18:03 GMT',
                                      'x-amzn-requestid': '216f5ef6-8e7c-479a-976a-1c5dcb43bedd'},
                      'HTTPStatusCode': 200,
                      'RequestId': '216f5ef6-8e7c-479a-976a-1c5dcb43bedd',
                      'RetryAttempts': 0},
 'ResultSet': {'ResultSetMetadata': {'ColumnInfo': [{'CaseSensitive': False,
                                                     'CatalogName': 'hive',
                                                     'Label': 'logdate',
                                                     'Name': 'logdate',
                                                     'Nullable': 'UNKNOWN',
                                                     'Precision': 0,
                                                     'Scale': 0,
                                                     'SchemaName': '',
                                                     'TableName': '',
                                                     'Type': 'date'},
                                                    {'CaseSensitive': True,
                                                     'CatalogName': 'hive',
                                                     'Label': 'location',
                                                     'Name': 'location',
                                                     'Nullable': 'UNKNOWN',
                                                     'Precision': 2147483647,
                                                     'Scale': 0,
                                                     'SchemaName': '',
                                                     'TableName': '',
                                                     'Type': 'varchar'},
                                                    {'CaseSensitive': True,
                                                     'CatalogName': 'hive',
                                                     'Label': 'uri',
                                                     'Name': 'uri',
                                                     'Nullable': 'UNKNOWN',
                                                     'Precision': 2147483647,
                                                     'Scale': 0,
                                                     'SchemaName': '',
                                                     'TableName': '',
                                                     'Type': 'varchar'},
                                                    {'CaseSensitive': False,
                                                     'CatalogName': 'hive',
                                                     'Label': 'status',
                                                     'Name': 'status',
                                                     'Nullable': 'UNKNOWN',
                                                     'Precision': 10,
                                                     'Scale': 0,
                                                     'SchemaName': '',
                                                     'TableName': '',
                                                     'Type': 'integer'}]},
               'Rows': [{'Data': [{'VarCharValue': 'logdate'},
                                  {'VarCharValue': 'location'},
                                  {'VarCharValue': 'uri'},
                                  {'VarCharValue': 'status'}]},
                        {'Data': [{'VarCharValue': '2014-08-05'},
                                  {'VarCharValue': 'SFO4'},
                                  {'VarCharValue': '/test-image-3.jpeg'},
                                  {'VarCharValue': '200'}]},
(・・・中略・・・)
                        {'Data': [{'VarCharValue': '2014-08-05'},
                                  {'VarCharValue': 'SFO4'},
                                  {'VarCharValue': '/test-image-2.jpeg'},
                                  {'VarCharValue': '200'}]}]},
 'UpdateCount': 0}

上記結果の 50 行目以降で、クエリー結果のデータを表示することができていますね。

なお、このサンプルのコードでは、finally 句を使用して最後に Amazon Athena で作成したデータベースやワークグループを削除していますが、これは単に環境をクリアしておきたいという意図があるだけです。


最後に:

Amazon Athena をコードから使用する例としては公式ドキュメントで Java のサンプルがあります。

docs.aws.amazon.com

ただ、Javaでなくても、boto3 を使用して Python でも十分操作できそうだということがわかりました。

/* -----codeの行番号----- */