dely Tech Blog

クラシル・TRILLを運営するdely株式会社の開発ブログです

クラシルリワードのデータ基盤

はじめに

こんにちは!クラシルリワードで開発責任者をしているfunzinです。
この記事ではクラシルリワードのデータ基盤の構成について紹介していきます。

クラシルリワードについて

はじめにクラシルリワードのサービス概要について紹介させてください。 クラシルリワードは「日常のお買い物体験をお得に変える」アプリです。 買い物のためにお店に行く(移動する)、チラシを見る、商品を買う、レシートを受け取る......。これら日常の行動がポイントに変わり、そのポイントを使って様々な特典と交換することができます。 気になる方はサービスサイトをぜひご確認ください。

それではさっそく本題のデータ基盤の構成について説明していきます。

データ基盤の全体構成

データ基盤の全体構成

クラシルリワードのデータ基盤の技術スタックは下記になります。

  • DWH: BigQuery(メイン), Snowflake(サブ)
  • ETL: Embulk, Digdag
  • BI Tool: Redash, Looker

全体像のみではそれぞれの役割が把握しにくいため、次の章より詳細を説明していきます。

BigQueryの選定理由

クラシルリワードではメインのデータ基盤としてBigQueryを利用しています。 BigQueryを選定した理由としては下記となります。

  1. データエンジニアが不在のため、フルマネージドなサービスを利用したい

サービスの立ち上げ当初、新規事業ということもあり最小人数で開発をしていたためデータエンジニアが不在でした。 しかし、施策結果を振り返るためにイベントログ実装は必須であり、データ基盤を構築する必要がありました。
また自社データ基盤の場合、イベントログのスキーマ設計、エラー時の対応、データパイプラインの修正対応などに時間がかかることが懸念としてありました。
そのためフルマネージドなBigQueryを採用することで、データ基盤の構築にかかる時間を短縮しサービス開発にリソースを割くことができると判断しました。

2. Google Analytics for Firebaseとの相性が良い

クラシルリワードは 現在iOS / Androidアプリでサービスを提供しています。
モバイルアプリではFirebaseを利用することが一般的であり、クラシルリワードでもFirebaseを積極的に利用しています。
Google Analytics for Firebaseを利用することで下記のメリットがあります。

  • BigQuery Exportを利用することで、GAに送信されたイベントログが1日一回定期実行でイベントログのテーブルとしてBigQueryに自動でエクスポートされるため自前でイベントログのデータパイプラインを作成する必要がない
  • ログのスキーマが事前に決まっているため、開発者はevent_name, event_paramsなど最低限のものだけを意識すれば良い
  • アプリ側ではFirebase Analyticsのライブラリが提供されているため、下記のように送信処理を書くのみで完結する

iOSでのイベント送信例

// event_name: test_event
// event_params: ["index": 0]
Analytics.logEvent("test_event", parameters: ["index": 0])

インフラはAWSで構築しているためAWS Athena、クラシルで利用しているSnowflakeなども検討しましたが、Google Analytics for Firebaseを利用するメリットが大きいためBigQueryを採用しました。

データ基盤における重要な役割

この章ではクラシルリワードのデータ基盤において、重要な役割を抜粋して説明していきます。

アプリのイベントログのscan量削減

BigQueryの選定理由でも説明しましたが、 アプリからイベントログを送信するためにGoogle Analytics for Firebaseを利用しています。
日付別テーブルがBigQueryにエクスポートされるため、これだけでもイベントログ分析は可能になります。
しかし、そのまま日付別テーブルを利用する場合、DAUが増えるにつれて、BigQueryのscan量が増加しコストに影響してきます。

例えば起動イベントのみを抽出する場合、日付別テーブルを利用するとwhere句で条件を指定してもその範囲の日付全てのイベントが対象となり、パーティション分割テーブルと比較してscan量が多くなってしまい、結果的にコストに影響します。

日々分析をする上でのscan量の課題を解決するために、下記のような工夫をしています。

  1. パーティション分割テーブルを利用

BigQueryにエクスポートされた日付別テーブルを直接利用するのではなく、パーティション分割テーブルとして変換して分析ではこちらを利用するようにしています。 下記がパーティション分割テーブルを作成するサンプルクエリです。

CREATE OR REPLACE TABLE
  `product_firebase_analytics.events`
PARTITION BY
  DATE(event_timestamp_micros)
CLUSTER BY
  event_name,
  new_event_name,
  platform AS
SELECT
  event_date,
  TIMESTAMP_MICROS(event_timestamp) AS event_timestamp_micros,
  ARRAY_TO_STRING(
    [event_name,
     (
       SELECT value.string_value
       FROM UNNEST(event_params)
       WHERE KEY = 'id'
     )],
    "_"
  ) AS new_event_name,
  platform,
  ... -- カラムが続く
FROM
  `project.events_*`

パーティション: Date
クラスタ: event_name, new_event_name(後述), platform

パーティション分割テーブル作成後はクエリスケジューリングを利用し、毎日定期実行で指定範囲の日数を上書きするような形でイベントログをアップデートしています。

-- 直近3日以内のイベントログを上書きする
DECLARE start_date_interval INT64 DEFAULT 3;
DECLARE end_date_interval INT64 DEFAULT 0;

DELETE FROM
  `product_firebase_analytics.events`
WHERE
  DATE(event_time, 'Asia/Tokyo') BETWEEN DATE_SUB(
    CURRENT_DATE('Asia/Tokyo'),
    INTERVAL start_date_interval DAY
  )
  AND DATE_SUB(
    CURRENT_DATE('Asia/Tokyo'),
    INTERVAL end_date_interval DAY
  );

INSERT INTO
  `product_firebase_analytics.events`
SELECT
  event_date,
  TIMESTAMP_MICROS(event_timestamp) AS event_timestamp_micros,
  ARRAY_TO_STRING(
    [event_name,
     (
       SELECT value.string_value
       FROM UNNEST(event_params)
       WHERE KEY = 'id'
     )],
    "_"
  ) AS new_event_name,
  platform,
...-- カラムが続く
FROM
  `project.events_*`
WHERE
  REGEXP_EXTRACT(_TABLE_SUFFIX, r '[0-9]+') BETWEEN FORMAT_DATE(
    "%Y%m%d",
    DATE_SUB(
      CURRENT_DATE('Asia/Tokyo'),
      INTERVAL start_date_interval DAY
    )
  )
  AND FORMAT_DATE(
    "%Y%m%d",
    DATE_SUB(
      CURRENT_DATE('Asia/Tokyo'),
      INTERVAL end_date_interval DAY
    )
  )

2. new_event_nameの導入

Google Analytics for Firebaseの仕様上、イベント名は最大500までしか定義できません。 カジュアルに新規イベントを定義しすぎると上限の500に到達してしまい、新規イベントの計測ができなくなります。
イベント数の上限が500を超えないために、イベント名を一意にするのではなく、event_paramsにイベントを識別できるid(STRING)を含めて、パーティション分割テーブル作成時にnew_event_nameとして定義しています。

例えば下記のようなイベント名とIDの場合、new_event_nameはtest_hogeになります

  • event_name: test
  • event_params.id: hoge
  • new_event_name: test_hoge

new_event_nameはパーティション分割テーブル作成時にevent_nameidを結合することで、新しいイベント名として定義することで実現しています。

  ARRAY_TO_STRING(
    [event_name,
     (
       SELECT value.string_value
       FROM UNNEST(event_params)
       WHERE KEY = 'id'
     )],
    "_"
  ) AS new_event_name

new_event_nameによって、下記のようなメリットが得られます。

  • event_nameはtest, imp, tapなど汎用的なイベントのみを定義して使い回すため、イベント名の定義数で500を超えることがなくなる
  • クラスタにnew_event_nameを指定しているため、分析する側はnew_event_nameを条件に指定することでscan量を削減することが可能

DynamoDBやRDSをBigQueryにSync

クラシルリワードではインフラ環境をAWSで構築しているため、ユーザーのデータはRDSやDynamoDBに保存されています。
これらのデータをアプリから送信したイベントログと結合してBigQueryで分析をしたいニーズがでてきます。
現在はDigdagEmbulkを利用して、1日1回定期実行でBigQueryにSyncをしています。 これらはAWS ECSのタスクスケジューリングで実現しています。

多くのテーブルは全体更新をしていますが、一部テーブルはレコード数が多くなっているため差分更新をしています。 差分更新の対応については説明が長くなってしまうため、また別のテックブログで話せればと思います。

運用面に関しては、現状大きな課題はないもののバッチ処理が失敗した時の調査やリトライなどの運用工数もかかり始めてはきているので、自前のETLフローではなくtroccoFivetranなどのSaaS ETLツールも検討しています。

Snowflakeの活用

メインのデータ基盤はBigQueryですが、一部でSnowflakeを利用しています。 Snowflakeを利用することになった背景は下記2つです。

  1. クラシルのデータをクラシルリワード開発部で分析可能にするため

現状dely社では各事業部ごとに技術選定をしています。 クラシルではAWS Athena, Snowflakeを利用してデータ分析をしています。クラシルリワードでは今まで説明してきましたがBigQueryをメインで利用しています。
しかし、事業部が分かれているため、クラシル側のデータを分析するときに何か問題が発生するとクラシルのデータエンジニアに都度依頼する運用コストが発生していました。
この課題を解消するために、クラシルリワード側でSnowflakeをアカウントを作成し、Snowflakeのデータシェアリングを利用することで、クラシルリワード開発部でクラシルのデータを自由に分析することができるようにしています。

2. 一部のクラシルのテーブルをBigQueryにエクスポートしたいため

クラシルリワードではクラシルチラシにまつわる情報もユーザーに提供していますが、チラシ情報のテーブルがクラシル側にあるため、BigQueryで分析を完結することができません。

例えば、クラシルリワード側でどの店舗のチラシを見られたかを分析したい場合、どうしてもBigQueryに店舗のマスターデータが必要になってきます。

そのためクラシルチラシのデータ(e.g. 店舗情報)をクラシルリワード側のBigQueryに格納するために下記のようなフローを構築することで実現しています。
Snowflake(クラシル) -> Snowflake(クラシルリワード) -> Google Storage(クラシルリワード) -> BQ(クラシルリワード)

クラシルでのデータをBigQueryにエクスポートすることで、クラシルリワード側ではBigQueryのみで完結してチラシ情報を分析することが可能となっています。

BIツール(Redash, Looker)

最後にBIツールについて説明します。

Redash

現状はセルフホスティングでRedash用のEC2インスタンスを立てて、SQLクエリを書いて分析をすることが多いです。
しかしSQLクエリは人によって書き方が異なる、数値出しの判断が間違える可能性があるため、SQLクエリをGitHubのRepositoryで管理してレビューをする体制をとっています。 こちらはバイセルさんのRedashのクエリ管理方法を参考にしています。

Redash repositoryのフォルダ・ファイル構成

フォルダ構造

├── README.md
├── csv // クエリ名などをcsv管理
├── queries // クエリ管理
├── requirements.txt
└── scripts // Redashに反映用のスクリプト

csv

name,data_source_id,query_id
query_test_name,1,29

CSVではクエリ名、データソースID, クエリIDのみを最低限管理しています。

git管理下のSQLクエリをRedashに反映

PRがマージされたタイミングでGitHub Actionsを実行し差分があるクエリのみをRedashに反映させるようにしています。

GitHub Actions ステップ実行順

  1. 変更があったクエリの差分抽出(write_diff_query_only.sh)
  2. Redashに反映(main.py)

write_diff_query_only.sh

#!/bin/bash
# write_diff_query_only.sh
# PR差分のみをcsv_query_list.csvに反映するスクリプト

header=`head -n 1 csv/query_list.csv`
current_commit=$GITHUB_SHA
previous_commit=$(git rev-parse "${current_commit}"\^1)
query_ids=`git diff --name-only "$current_commit" "$previous_commit" | grep queries |  grep -o '[0-9]\+' | tr '\n' '|'`
query_ids=${query_ids%|}

echo "$header" > csv/diff_query_list.csv
if [ ! -z "$query_ids" ]; then
  grep -E "($query_ids)$" csv/query_list.csv >> csv/diff_query_list.csv
fi

# main.pyで差分のみを上書きして実行するため
cat csv/diff_query_list.csv > csv/query_list.csv

main.py

# main.py

import os.path
from redash_client.client import RedashClient
import csv
import os

class Query:
  def __init__(self, base_path, name, data_source_id, query_id, schedule=None, description='generated by retail-redash-query'):
    self.query_id = query_id
    self.name = name
    self.data_source_id = data_source_id
    self.schedule = schedule
    self.description = description
    path = 'queries/{0}.sql'.format(query_id)
    self.load_query(base_path, path)

  def load_query(self, base_path, path):
    with open('{0}/{1}'.format(base_path, path), 'r') as f:
      self.query = f.read()

# Read query list
def get_queries():
    reader = csv.reader(open('csv/query_list.csv', 'r'))
    queries = []
    # Skip header
    header = next(reader)

    for row in reader:
        queries.append(Query("./", row[0], row[1], row[2]))
  
    return queries


RedashClient.BASE_URL = os.environ['REDASH_HOST']
RedashClient.API_BASE_URL = os.environ['REDASH_HOST'] + "/api/"
api_key = os.environ['REDASH_API_KEY']
redash_client = RedashClient(api_key)

# Update queries
for query in get_queries():
    print('QueryID: ' + query.query_id)
    try:
        redash_client.update_query(
            query.query_id, query.name, query.query, query.data_source_id, query.description
        )
    except RedashClient.RedashClientException:
        print('Error: ' + query.query_id)

現状は差分があるクエリのみをRedashに反映するように影響範囲をとどめています。

Lookerについて

RedashではSQLクエリを書くというハードルが残り続けるため、SQLクエリをかけない非エンジニアでも分析が可能な体制にするため、現在は並行してLookerにも移行中です。

さいごに

クラシルリワードのデータ基盤について紹介しました。 データエンジニアが不在の中でも、分析基盤を構築しサービス改善につなげることができました。 引き続きデータを分析することでクラシルリワードを改善していきたいと思いますので、よろしくお願いします。