こんにちは、サーバーサイドエンジニアのjoooee0000です。 delyはデータ基盤としてAWS Athenaを使っており、ユーザーの行動ログからアプリケーションのアクセスログまで、様々なログがAWS Athena上に存在しています。AWS AthenaはS3上にあるデータソースをprestoのクエリ記法で引けるようになっていてとても便利です。
クラシルの分析基盤の歴史に興味がある方は、この記事にまとまっているので参照してみてください。クラシルの分析基盤はデータサイエンスチームが主体となって今も改善を続けています!
今回は、AWS SDK for Rubyを使ってAthenaのクエリを実行する記事が1本もなかったので書きました。また、本記事では並列処理ではなく、シンプルな1クエリを引くsync処理の場合について紹介します。
AWS Athena SDKの特徴
AthenaのSDKは少し変わっていて、実行待ちのポーリング処理を自前で実装する必要があります。処理の流れとしては、
- クエリを実行するAPIを叩く
- クエリ実行のステータス問い合わせAPIを結果が
SUCCEED
になるまで叩く - クエリの結果を返すAPIを叩く
このように、2の処理の間ポーリング処理を実装する必要があります。
では、工程ごとに実装例を紹介していきます。
1. クエリを実行するAPIを叩く
まずは、クエリ実行をリクエストする #start_query_execution
APIを叩きます。
実装サンプルはこちらです。
client = Aws::Athena::Client.new({}) query_string = %Q{SELECT * FROM "databasename"."tablename" limit 10;} client.start_query_execution( { query_string: query_string, output_location: 's3://' + S3BUCKET_NAME } ) => #<struct Aws::Athena::Types::StartQueryExecutionOutput query_execution_id="c0d4460b-xxxx-xxxx-9924-ede5c2d2b56b">
Clientのinitializeの引数は、AWSのaccess_key_id/secret_access_keyやregionを主に指定します。なにも指定しないと、他のAWS SDKの仕様と同様に
- Aws.config[:credentials]
- The :access_key_id, :secret_access_key, and :session_token options.
- ENV['AWS_ACCESS_KEY_ID'], ENV['AWS_SECRET_ACCESS_KEY']
- ~/.aws/credentials
- ...
の値が順番に参照されていきます。regionも同様です。環境に合わせて設定してください。
また、#start_query_execution
APIを叩く際は、クエリとresult_configurationを指定する必要があります。
result_configurationでは、クエリの実行結果の保存先としてs3のpathを指定します。APIでのクエリ実行も、AWSコンソールでのクエリ実行時と同じようにs3に実行結果を保持する仕組みになっています。指定したs3のpathに実行結果が蓄積されていきます。
また、APIのレスポンスとして、query_execution_idを取得できます。こちらのidは、実行結果の問い合わせや実行ステータスの問い合わせ、実行停止処理などに必要になります。
このAPIを叩いた時点では、処理を開始するリクエストを送っただけです。返り値としてクエリの実行結果が返ってくるわけではなくquery_execution_idという実行固有のidのみが返却されます。
2. クエリ実行のステータス問い合わせAPIを結果がSUCCEEDになるまで叩く
1で実行を開始したあと、クエリが走り終わるまで、結果を取得することはできません。しかし、1の工程ではクエリの実行完了を待たずにレスポンスがかえってきます。
そこで、現在の実行のステータスを知るための #get_query_execution
というAPIが存在しています。そのAPIの返り値がSUCCEED
になるまでステータスを問い合わせ続けなければなりません。つまり、ポーリング処理が必要となります。
今回はwhileを使ってポーリング処理をする代わりに、こちらのgemのwith_retriesを使用してポーリング処理を実装しました。 GitHub - ooyala/retries: A tiny Rubygem for retrying code with randomized, exponential backoff. こちらのgemは内部でexponential backoffを採用しています。exponential backoffとは、指数関数的に処理のリトライ間隔を後退させるアルゴリズムのことで、処理に時間がかかるほど再処理をする間隔が広くなっていきます。つまり、早く終わる処理には無駄な待ちがなく、時間がかかる処理には無駄なAPIのコールやCPUの負荷をかけずに済むようなアルゴリズムになっています。
実装サンプルはこちらです。
begin status = '' with_retries({ max_tries: 100, base_sleep_seconds: 0.01, max_sleep_seconds: 30, rescue: [Executing] }) do |retry_count| state_result = client.get_query_execution({ query_execution_id: query_execution_id }) status = state_result.query_execution.status.state puts "[Athena Poling] fetching_count: #{retry_count}" raise Executing if ['QUEUED', 'RUNNING'].include?(status) case status when 'FAILED', 'CANCELED' # 処理が失敗した理由の取得 reason = state_result.query_execution.status.state_change_reason raise ExecutionError, reason end end status ensure # 成功時以外はクエリの停止リクエストを送信 unless status.present? && status == 'SUCCEEDED' puts "クエリ停止処理" client.stop_query_execution(query_execution_id) end end class Executing < Exception end class ExecutionError < Exception end
with_retriesの引数である、max_triesやmax_sleep_secondsなどは、用途に合わせて調節してください。
ポーリング処理の精度検証
試しに実行時間が100秒弱のクエリのポーリング処理を下記の条件で実行した場合のCPU使用時間やAPIコール数を比較してみます。
- シンプルなwhileでのポーリング処理(sleepなし)
- sleep(1)をはさんだwhileでのポーリング処理
- with_retriesでのポーリング処理
CPU使用時間の計測には、Ruby標準ライブラリのbenchmarkを使用しました。
シンプルなwhileでのポーリング処理
APIコール数: 3470回
CPU使用時間:
# benchmark結果 user system total real 12.160000 1.320000 13.480000 ( 96.931696)
sleep(1)をはさんだwhileでのポーリング処理
APIコール数: 94回
CPU使用時間:
# benchmark結果 user system total real 0.350000 0.050000 0.400000 ( 97.002586)
with_retriesでのポーリング処理
APIコール数: 15回
CPU使用時間:
# benchmark結果 user system total real 0.090000 0.010000 0.100000 (110.306136)
with_retriesの処理の方はwhile処理と比べぴったりに処理が終わらないため、少しreal timeが長くなっています。しかし、長い処理においてもCPUをほとんどつかっておらず、APIコール数も少ないのがわかります。 ほとんどCPUを使わないので平行で重い処理などが走っても心配ありません。(これくらいの差であればsleep(1)でも十分だと思いますが。)
停止処理について
また、2番の処理の実装においてもう一つ大事なことは、途中で強制的に処理を終了した際などにしっかりクエリの実行を中断することです。想定外の長い処理が走ってしまったとき、コードを中断しても #stop_query_execution
を叩かない限り裏側ではクエリの実行が走り続けてしまいます。なので、コードを強制終了させた場合などにも後処理として実行されるensure節でクエリ停止処理を書くことをおすすめします。
3. クエリの結果を返すAPIを叩く
クエリの実行が完了したら、後は結果を引く工程のみです。 結果の量が多く、ページングが必要な処理に関してはnext_tokenを次のリクエストでなげる形で実装します。特に変わったことはしないのですが、一つあげるとしたら1ページ目の1行目にカラム行が返ってくるので、結果を返すときにそれを除外しています。 (カラム行をskipしてくれるoptionを探したのですが見つかりませんでした。。探せばあるかもしれません。)
本実装では、1ページにページに返ってくる上限を100件、すべての結果の上限を10000件に絞っています。
MAX_PAGE_RESULTS = 100 MAX_RESULTS = 10000 # next_tokenを受け取って次のページをリクエストする再帰処理 def get_all_results(query_execution_id, next_token = nil, results = []) rows, next_token = get_results(query_execution_id, next_token) results += rows results = results.flatten if results.count > MAX_RESULTS raise ExecutionError, '結果の上限数を超えています。' end if next_token.present? results = get_all_results(query_execution_id, next_token, results) end results end # 1ページ分の結果を取得する処理 def get_results(query_execution_id, next_token = nil) results = client.get_query_results({ query_execution_id: query_execution_id, next_token: next_token, max_results: MAX_PAGE_RESULTS }) next_token = results.next_token # クエリの実行結果の取得 rows = results.result_set.rows # カラム一覧を取得 column = results.result_set.result_set_metadata.column_info.map(&:label) result_rows = rows.map do |result| row = result.data.map(&:var_char_value) # 初回はカラム行が返ってくるので除外 next if row == column column.zip(row).to_h end.compact [result_rows, next_token] end get_all_results(query_execution_id)
まとめ
今回は、AWS AthenaをAWS SDK for Rubyで引く実装について紹介しました。 ポーリング処理を自前で書くような仕様が珍しいですよね。ポーリングを書く処理は、with_retriesを使ってみてはいかがでしょうか。また、停止処理はしっかりと行いましょう!