前回の記事では、Amazon Athena を AWS CLI から操作してみましたが、今回は AWS SDK for Python 、つまり boto3 を使って Python のコードから Amazon Athena を操作してみます。
操作する内容は、前回の記事のAWS CLI で行った操作と同じことをやってみようと思います。
なお、この記事の内容は 2023年 1月時点で検証した結果に基づきます。
boto3 で Amazon Athena を操作するにあたって、参考にしたドキュメントは、もちろん boto3 の API リファレンスです。
boto3 の client オブジェクトを使用するので、前回の AWS CLI で使用したコマンドと一致するメソッドを見つけてコードを書けばよさそうです。
まずは、完成形を掲載します。
import boto3 import pprint workgroup_name = "my-workgroup-with-boto3" result_location = "s3://tnobe-datalake-athena-result/my-workgroup-with-boto3/" catalog_name = "AwsDataCatalog" database_name = "mydatabaseboto3" sql_create_database = "CREATE SCHEMA " + database_name sql_create_table = '''\ CREATE EXTERNAL TABLE IF NOT EXISTS cloudfront_logs ( \ LogDate DATE, \ Time STRING, \ Location STRING, \ Bytes INT, \ RequestIP STRING, \ Method STRING, \ Host STRING, \ Uri STRING, \ Status INT, \ Referrer STRING, \ ClientInfo STRING \ ) \ ROW FORMAT DELIMITED \ FIELDS TERMINATED BY '\t' \ LINES TERMINATED BY '\n' \ LOCATION 's3://athena-examples-ap-northeast-1/cloudfront/plaintext/' \ ''' sql_query = '''\ SELECT logdate, location, uri, status \ FROM cloudfront_logs \ WHERE method = 'GET' and status = 200 and location like 'SFO%' limit 10 \ ''' sql_drop_database = "DROP SCHEMA " + database_name client = boto3.client('athena') try: # ワークグループの作成 response = client.create_work_group( Name=workgroup_name, Configuration={ 'ResultConfiguration': { 'OutputLocation': result_location } }, Description='demo workgroup with boto3' ) # ワークグループの一覧表示 response = client.list_work_groups() print("--- Workgroup ---") pprint.pprint(response) # データベースの作成 response = client.start_query_execution( QueryString=sql_create_database, WorkGroup=workgroup_name ) # データベースの一覧表示 response = client.list_databases( CatalogName=catalog_name ) print("--- Database ---") pprint.pprint(response) # テーブルの作成 response = client.start_query_execution( QueryString=sql_create_table, QueryExecutionContext={ 'Database': database_name, 'Catalog': catalog_name }, WorkGroup=workgroup_name ) # クエリーの発行 response = client.start_query_execution( QueryString=sql_query, QueryExecutionContext={ 'Database': database_name, 'Catalog': catalog_name }, WorkGroup=workgroup_name ) # QueryExecutionId の取得 query_exec_id = response["QueryExecutionId"] # クエリーが終了するまで待機 status = "" while status != "SUCCEEDED": response = client.get_query_execution( QueryExecutionId=query_exec_id ) status = response["QueryExecution"]["Status"]["State"] print(status) # クエリーの結果取得 response = client.get_query_results( QueryExecutionId=query_exec_id ) print("--- Query Result ---") pprint.pprint(response) except Exception as e: print(e) finally: print("--- Drop Database ---") response = client.start_query_execution( QueryString=sql_drop_database, WorkGroup=workgroup_name ) print("--- Delete Workgroup ---") response = client.delete_work_group( WorkGroup=workgroup_name, RecursiveDeleteOption=True ) print("--- END ---")
このコードのポイント部分をみていきます。
最初は、6 行目と 7行目(下記)です。
database_name = "mydatabaseboto3" sql_create_database = "CREATE SCHEMA " + database_name
上記のように、AWS CLI 使用時と同じくデータベース作成時に CREATE DATABASE
ではなく、CREATE SCHEMA
を使用しなければエラーになります。、
これは、データベース削除時も同じで DROP DATABASE
ではなく DROP SCHEMA
にする必要があります。
また、データベース名に ハイフン (-) を含めることができません。
次に、86 行目から 93 行目(下記)に注目して下さい。
# クエリーが終了するまで待機 status = "" while status != "SUCCEEDED": response = client.get_query_execution( QueryExecutionId=query_exec_id ) status = response["QueryExecution"]["Status"]["State"] print(status)
AWS CLI を使用する時は 1 つ 1 つのコマンドの実行の間に時間があったので問題なかったのですが、Python のコードでクエリー発行から結果取得まで一気に行うと、まだクエリーが完了していない状態で結果を取得しようとしてしまうこともあり、下記のエラーが発生します。
An error occurred (InvalidRequestException) when calling the GetQueryResults operation: Query has not yet finished. Current state: QUEUED
よって、get_query_execution
メソッドを使用してクエリーのステータスが SUCCEEDED になるまで待機してから結果を取得するようなコードにしています。
ループで待機せず、Waiter のような自動的に待機してくれる方法があればいいのですが、見当たらなかったため、致し方なく while ループを使いました。
ちなみに、ステータスとしては、クエリー発行後は QUEUED 、クエリー実行中は RUNNING、クエリーが成功したら SUCCEEDED となります。
この Python のコードを実行すると、Amazon Athena のワークグループの一覧、データベースの一覧、クエリ結果が表示されます。
下記は、クエリ結果部分を抜粋した内容です。
--- Query Result --- {'ResponseMetadata': {'HTTPHeaders': {'connection': 'keep-alive', 'content-length': '2183', 'content-type': 'application/x-amz-json-1.1', 'date': 'Sun, 22 Jan 2023 08:18:03 GMT', 'x-amzn-requestid': '216f5ef6-8e7c-479a-976a-1c5dcb43bedd'}, 'HTTPStatusCode': 200, 'RequestId': '216f5ef6-8e7c-479a-976a-1c5dcb43bedd', 'RetryAttempts': 0}, 'ResultSet': {'ResultSetMetadata': {'ColumnInfo': [{'CaseSensitive': False, 'CatalogName': 'hive', 'Label': 'logdate', 'Name': 'logdate', 'Nullable': 'UNKNOWN', 'Precision': 0, 'Scale': 0, 'SchemaName': '', 'TableName': '', 'Type': 'date'}, {'CaseSensitive': True, 'CatalogName': 'hive', 'Label': 'location', 'Name': 'location', 'Nullable': 'UNKNOWN', 'Precision': 2147483647, 'Scale': 0, 'SchemaName': '', 'TableName': '', 'Type': 'varchar'}, {'CaseSensitive': True, 'CatalogName': 'hive', 'Label': 'uri', 'Name': 'uri', 'Nullable': 'UNKNOWN', 'Precision': 2147483647, 'Scale': 0, 'SchemaName': '', 'TableName': '', 'Type': 'varchar'}, {'CaseSensitive': False, 'CatalogName': 'hive', 'Label': 'status', 'Name': 'status', 'Nullable': 'UNKNOWN', 'Precision': 10, 'Scale': 0, 'SchemaName': '', 'TableName': '', 'Type': 'integer'}]}, 'Rows': [{'Data': [{'VarCharValue': 'logdate'}, {'VarCharValue': 'location'}, {'VarCharValue': 'uri'}, {'VarCharValue': 'status'}]}, {'Data': [{'VarCharValue': '2014-08-05'}, {'VarCharValue': 'SFO4'}, {'VarCharValue': '/test-image-3.jpeg'}, {'VarCharValue': '200'}]}, (・・・中略・・・) {'Data': [{'VarCharValue': '2014-08-05'}, {'VarCharValue': 'SFO4'}, {'VarCharValue': '/test-image-2.jpeg'}, {'VarCharValue': '200'}]}]}, 'UpdateCount': 0}
上記結果の 50 行目以降で、クエリー結果のデータを表示することができていますね。
なお、このサンプルのコードでは、finally 句を使用して最後に Amazon Athena で作成したデータベースやワークグループを削除していますが、これは単に環境をクリアしておきたいという意図があるだけです。
最後に: