"壊れにくい"データ基盤を構築するためにMackerelチームで実践していること

こんにちは。MackerelチームにおいてCRE(Customer Reliability Engineer)をしているid:syou6162です。主にカスタマーサクセスを支えるデータ基盤の構築や、データ分析を担当しています。

今回は、壊れにくいデータ基盤を構築するため、Mackerelチームで実践していることを紹介します。

なぜ壊れにくいデータ基盤を構築するのか

Mackerelチームのデータ基盤はさまざまな形で利用されます。ざっと挙げるだけでも、こういった用途があります。

  • プロダクトオーナーや経営陣が、経営指標を把握する
  • 施策のオーナーが、施策に対する仮説を構築・検証する
  • セールス/マーケティングの担当者が、展示会やオンラインセミナー経由で獲得した商談の進行状況を把握する
  • 機能の利用状況を把握する
  • FAQサイトを継続的に改善する(著者自身のブログを参照)

このようにデータ基盤がチームのあらゆるところで利用されるようになると、壊れにくいことが求められます。

データ基盤が“壊れている”とはどういうことか

ここでは「壊れたデータ基盤」として、例えば以下のような状況を想定しています。

  • テーブルやビューが壊れていて、ダッシュボードが見られない
  • 出ているデータの最終更新日が古くて、データ分析者に誤った情報を与えてしまう
  • 入力データのミスにより、一意であるはずデータが一意でなく、ダブルカウントされている

壊れやすいデータ基盤は信用されず、使われなくなり、価値を生まないだけではなく、場合によってはユーザーやチームに不利益をもたらし、よりよい意思決定を阻害するものになってしまいます。

壊れてないだけでなく、壊れたら気付ける

データ基盤は壊れてないだけなく、チーム内のさまざまな要望に素早く応えることも求められます。要望に応えるため、データ基盤に修正を加えることもよくありますが、何かを修正すると別のどこかが壊れるのも世の常です。壊れたらすぐ気付ける、すぐに直せる必要があります。

つまり、データ基盤が“壊れにくい”ための仕組みには、少なくとも以下の2つの観点があります*1

  • 壊れたことに気付けるよう監視する
  • そもそも壊れてない状態を保つ

以降ではそれぞれについて、Mackerelチームで工夫している取り組みを紹介します。

前提とするシステム構成

これから紹介する事例の前提条件として、Mackerelチームのデータ基盤は以下の環境になっています。

  • サービスやRDB: AWS上にある
  • データ基盤: BigQuery(GCP)上に構築されている
  • AWSからGCPへのデータ転送: EmbulkやDigdagを利用
  • 可視化: Data Studioを利用

ただし、この条件に特化した部分は少ないので、ある程度は汎用的に使える仕組みになっていると思います。

壊れたことに気付けるよう監視する

何らかの修正で壊れたときのため、Mackerlチームではいくつかの監視を入れて、気付いたらすぐ直せるようにしています。

普段、この辺りはSRE(Site Reliability Engineer)と同じような気持ちで、SLI/SLOの制約下で開発スピードをなるべく上げてサービス成長に貢献できるよう、仕事をしています。

1. バッチジョブが失敗したことに気付く

基本的なことですが、データ転送などのバッチジョブには監視を入れます。具体的には以下の2種類を仕込んでいます。

  • バッチの失敗自体を検知する
  • バッチの処理時間が想定以上に長くないかを検知する

バッチ処理の監視には、手前味噌ですがMackerelを使っています。失敗検知にはmkr wrapを、バッチの実行時間にはhorensoを利用します。それぞれ以下のドキュメントを参照してください。

例えばもともとのバッチ処理がdaily_batch.shだとすると、以下のようにラップするだけで、簡単に監視できます。

mkr wrap --name "daily-batch" --detail \
  --host ${MONITOR_HOST_ID} --warning --auto-close -- \
  /path/to/horenso -t sync-daily -T \
  --reporter "/work/my_host_metric_reporter ${MONITOR_HOST_ID}" -- \
  daily_batch.sh

バッチ処理が失敗していたり、想定した処理時間より長くかかっていると、Slackに通知されるので、そこから原因を調べていくことができます。処理時間が長くなっている場合は、並列化や高速化を検討します。

データ転送のバッチ処理にはembulkとdigdagを利用し、AWS Fargate上で動かしています。このときタスクのサイドカーにmackerel-container-agentを仕込むことで、CPUやネットワークのどこがボトルネックになっているかを把握しやすくしています。次のドキュメントを参照してください。

2. 投入されたデータの性質を監視する

データ転送のバッチが成功し、データレイクにデータが正常に転送できたとしても、まだ安心することはできません。なぜなら、転送元のデータにそもそも誤りが存在する可能性があるからです。例えば以下のようなケースです。

  • 重複を許さないはずのnameカラムに、重複したデータが入っている
  • 値が0から100の間に収まるはずのカラムに、マイナスの値が入っている

こういった誤ったデータがRDBに入力されないよう、フロントエンド、サーバーサイド、RDBのチェック制約など、さまざまな箇所でバリデーションしていることでしょう。しかしながら、世の中にはいろいろなデータソースがあります。外部SaaSサービスや、スプレッドシートなどがその典型例です。

データ基盤を構築する仕事をしていると、そういったデータを取り扱うことは日常茶飯事です。こういったデータにはRDBのチェック制約などは存在せず、制約を破らないようにデータを入力することが期待されているわけですが、人手による入力ミスで制約が破られてしまうことがあります。

そこで、データを定期的にバリデーションする仕組みを、データ基盤側に入れています。やることは単純で、満たして欲しい制約と、期待する結果を、以下のようにYAMLで書くだけです。

- description: nameカラムが一意であるか
  sql: |-
    SELECT
      name,
      COUNT(*) AS names_count
    FROM
      my-project.my_dataset.my_table
    GROUP BY
      name
    HAVING
      names_count > 1    
  expect: []

こういったYAMLのファイルを必要なだけ用意し、クエリと期待した結果が一致するかを以下のスクリプトで検証します。

#!/bin/bash
set -eu -o pipefail

CWD=$(cd "$(dirname "$0")"; pwd)

for file in "${CWD}"/**/*.yml; do
  cat "${file}" | yq -c '.[]' | while read -r json; do
    if [[ $(diff <(printf "%s" "$json" | jq -r .expect) <(printf "%s" "$json" | jq -r .sql | bq query --format json --use_legacy_sql=false | jq -r .) | wc -l) -gt 0 ]]; then
      echo "Unexpected query result for ${file} ($(printf "%s" "$json" | jq -r .description))"
      exit 1
    fi
  done
done

これを1時間に1回実行しておくと、仮に誤ったデータが入力されたとしても、遅くとも翌営業日くらいには「こういうデータが入っていたんですが、これって意図したものですか?」と担当者に確認できます。こうした監視をベースに、誤ったデータを取り除いたり、正常な状態に修正して、データ基盤を健全な状態に保ちます。

つい最近、BigQueryではASSERT文によるデータのテストもサポートされたので、これを使ってもいいかもしれませんね。次のドキュメントを参照してください。

3. ビューが壊れてないかを監視する

バッチジョブの監視によって、テーブルが壊れてないかは大まかに分かります。しかし、ビューが壊れている(例えば、クエリ中のカラムが削除されて実行できない)かどうかは、それだけでは分かりません。クエリが実行されないと、失敗するかどうか分からないためです。

ものによっては1ヶ月に1回しか閲覧されないダッシュボードもありますが、そうなると壊れてることが分かるのは最悪で1ヶ月後、ということもあり得ます。ビューを壊した変更から時間が経つほど原因の特定は難しくなるので、壊れたらなるべく早く気付けるようにしたいですね。

BigQueryのbqコマンドラインツールのqueryサブコマンドには、--dry_runというオプションがあり、無効(invalid)なクエリを投げると失敗してくれます。これを利用して、ビューが壊れたら素早く気付けるようにしています。次のドキュメントを参照してください。

4. 利用状況を監視する

データ基盤の利用が増えてくると、コストも増加してきます。クラウド破産しないよう、利用状況やコストも監視しておきましょう。

次のドキュメントを参照してください。

そもそも壊れてない状態を保つ

監視により、データ基盤が壊れたことに素早く気付けるようになりました。しかしそれだけではなく、そもそもデータ基盤が壊れてない状態を保つことが大事です。

1. データリネージを元に修正できるようにする

ビューやテーブルを生成するSQLを修正する際に、それがどこで利用されているかが事前に分かれば、変更に問題がないかどうか調査しやすくなります。

「どのデータを使って生成されているか」や「どこで利用されているか」といった情報は、データリネージと呼ばれます。データリネージが存在しない場合、以下のような問題が発生します。

  1. チームの要望に答えるには、既存のSQLを修正する必要がある
  2. 修正対象のビューやテーブルに対する影響範囲が分からない
    • 修正内容はカラム名の変更や、ロジックの変更など
  3. データ基盤上には多数のテーブルやビューがあり、目視で確認するのは困難
  4. 「最近使われていなかったと思うし、修正しても影響ないだろう...」
    という勘を頼りに修正する
  5. 案の定、利用している箇所があり、データ基盤が壊れる

以下のキャプチャ画像は、データリネージの実例です。どこで参照されているかが書かれているため、修正時に考えないといけないスコープをぐっと狭くできます。

参照先が分かるようにする

こういった情報を、ビューやテーブルの作成時に埋め込めるようにします。まず準備として、参照する側とされる側の組を以下のスクリプトで列挙します。

#!/bin/bash
set -eu -o pipefail

# データリネージをサポートするためのスクリプト
# 参照する側とされる側の情報をmy__source__lineage.lineageテーブルに書き込む

DATASETS=("my-project:my__warehouse" "my-project:my__mart")

mkdir -p .lineage

for dataset in "${DATASETS[@]}"; do
  for view in $(bq ls --max_results=1000 --format=prettyjson "${dataset}" | jq -r '.[] | select(.type == "VIEW") | .id'); do
    bq show --format=prettyjson --view "${view}" | jq -r '.view.query' > ".lineage/${view}.sql"
  done
done

TARGET_DATASETS=(
  warehouse
  mart
)

for dataset in "${TARGET_DATASETS[@]}"; do
  for source_table_or_view_name in $(find "sql/${dataset}" -name '*.sql.yml' | xargs -I% basename % .sql.yml); do
    for target_table_or_view in $(grep -l "${dataset}.${source_table_or_view_name}" .lineage/*.sql | xargs -I% basename % .sql); do
      echo -e "my-project:my__${dataset}.${source_table_or_view_name}\t${target_table_or_view}"
    done
  done
done > lineage.tsv

bq load --source_format=CSV --encoding=UTF-8 --field_delimiter='\t' --replace \
  my__source__lineage.lineage lineage.tsv source_table_or_view:string,target_table_or_view:string

以下のような結果が出力され、「table2はtable1を参照している」であるとか「table3とtable4はtable2を参照している」といったことが分かります。

次に、以下のスクリプトを経由してビューやテーブルを作ることにより、作成時に参照先の情報をDescriptionとして埋め込むことができます。

...
DATA_LINEAGE_DESCRIPTION=""
DATA_LINEAGE_FILENAME=".lineage.txt"

bq query --max_rows 10000 --format json \
  --use_legacy_sql=false "SELECT target_table_or_view FROM my-project.my__source__lineage.lineage WHERE source_table_or_view = \"my-project:${VIEW}\"" \
  | jq -c '.[]' > ${DATA_LINEAGE_FILENAME}

if [[ $(cat ${DATA_LINEAGE_FILENAME} | wc -l) -gt 0 ]]; then
  DATA_LINEAGE_DESCRIPTION="\n\nこのviewは以下で参照されています。"
  for line in $(cat ${DATA_LINEAGE_FILENAME}); do
    DATA_LINEAGE_DESCRIPTION="${DATA_LINEAGE_DESCRIPTION}\n- $(echo "$line" | jq -r .target_table_or_view)"
  done
fi

BQ_SUB_COMMAND="mk"
if bq show "${VIEW}" > /dev/null; then
  BQ_SUB_COMMAND="update"
fi

bq ${BQ_SUB_COMMAND} \
  --use_legacy_sql=false \
  --description "${DESCRIPTION}$(echo -e "${DASHBOARDS_DESCRIPTION}")$(echo -e "${VIEW_DEFINITION_DESCRIPTION}")$(echo -e "${DATA_LINEAGE_DESCRIPTION}")" \
  --view "${SQL}" \
  "${VIEW}"

修正対象のテーブルやビューがどこで使われているかが一目で分かるようになり、安心して修正に挑めます。

2. 使われていないテーブルやビューは定期的に掃除

チーム内でデータ分析がどんどん行われるようになると、データウェアハウスやデータマート上にテーブルやビューが増えてきます。

一時的な調査で作ったものなどもよくあるため、気が付くと使われていないテーブルやビューがどんどん増えていき、データウェアハウスやデータマートがゴミ屋敷になることもあります。使われていないテーブルなどをそのままにしておくと、目的に合わない分析に利用されてしまう、といった問題も起きます。

そういった事態を避けるため、Mackerelチームでは四半期に一度を目安に、利用頻度の低いテーブルやビューを定期的に掃除しています。BigQueryの利用状況はaudit logに出力されるようにしているため、これを利用して以下の情報が分かるダッシュボードを作っています。

  • このテーブルに対してクエリが最後に発行されたのはいつか
  • このテーブルに対してクエリが最後に発行したのは誰か
  • このテーブルに対してクエリが何回発行されたか

これをもとに、この四半期に利用回数が少なかったテーブルやビューを削除対象としています。

WITH 
  raw_resource_usage AS (
    SELECT
      protopayload_auditlog.authenticationInfo.principalEmail,
      timestamp AS last_queried_at,
      info.resource,
      ROW_NUMBER() OVER (PARTITION BY info.resource ORDER BY timestamp DESC) AS rank,
    FROM
      `my-project.source__cloudaudit__bigquery.cloudaudit_googleapis_com_data_access`
    CROSS JOIN
      UNNEST(protopayload_auditlog.authorizationInfo) AS info 
    WHERE
      NOT REGEXP_CONTAINS(info.resource, r"LOAD_TEMP_") -- embulkのデータ転送で走るjobは除去したい
      AND NOT REGEXP_CONTAINS(info.resource, r"jobs")
  ),

  latest_access_by_resource AS (
    SELECT
      * EXCEPT(rank)
    FROM
      raw_resource_usage
    WHERE
      rank = 1
  ),

  resource_called_counts AS (
    SELECT
      resource,
      COUNT(*) AS count
    FROM 
      raw_resource_usage
    GROUP BY
      resource
    ORDER BY
      count DESC
  ),

  existing_tables AS (
    SELECT
      project_id,
      dataset_id,
      table_id,
      FORMAT("projects/%s/datasets/%s/tables/%s", project_id, dataset_id, table_id) AS table_full_name
    FROM (
      SELECT
        *
      FROM
        `my-project`.my_dataset_1.__TABLES__
      UNION ALL
      SELECT
        *
      FROM
        `my-project`.my_dataset_2.__TABLES__
    )
  )

SELECT
  dataset_id,
  table_id,
  principalEmail AS last_accessed_person,
  last_queried_at,
  resource_called_counts.count AS queries_counts
FROM
  existing_tables
INNER JOIN
  latest_access_by_resource
ON
  latest_access_by_resource.resource = existing_tables.table_full_name
INNER JOIN
  resource_called_counts
ON
  resource_called_counts.resource = existing_tables.table_full_name
ORDER BY
  last_queried_at

長い間使われていなかったとはいえ、いきなり削除すると問題になるケースもあると思われます。チーム内では、以下のフローで削除するようにしています。

  1. 削除対象のテーブルにexpirationを付与する
    • 例えば、何もしなければ2週間後には削除されるようにする
  2. 作成者や、最後にクエリを叩いた人に、次のような連絡をする
    「このテーブルは長い間使われていなかったので、削除しようと思っています。もし削除されて困る場合は、expirationを外して削除されないようにしてください」
  3. 消えると困るものはexpirationが外されるので、削除されない
  4. 消して問題ないものは、何もしなければ自動的に削除される
    • 一応、手元にSQLのバックアップは取っておく

以下のスクリプトで、削除対象の一覧にexpirationを付与します。

#!/bin/bash
set -u

CWD=$(cd "$(dirname "$0")"; pwd)
PROJECT=my-project
# 2週間 = 60 * 60 * 24 * 14
EXPIRE_TIME=1209600

TARGET_TABLES_OR_VIEWS=(
  my_dataset.old_table1
  my_dataset.old_table2
)

for t in "${TARGET_TABLES_OR_VIEWS[@]}"; do
  # table or viewがすでに存在しなくても全部舐めれるようにする
  if ! bq show "${PROJECT}:${t}" > /dev/null; then
    echo "${t} does not exist. Skipped..."
    continue
  fi
  
  RESULT=$(bq show --format json "${PROJECT}:${t}")
  if [ "$(echo "$RESULT" | jq -r .type)" = "VIEW" ]; then
    # viewであれば元のSQLをバックアップで手元に残しておく
    echo "$RESULT" | jq -r .view.query > "${CWD}/${t}.sql"
  fi

  bq update --expiration ${EXPIRE_TIME} "${PROJECT}:${t}"
done

おわりに

今回は、壊れにくいデータ基盤を構築する仕組みについて紹介しました。今後、壊れにくいだけでなく使われやすくする仕組みや、このデータ基盤をベースにしたデータ分析でサービスをどう成長させるかについても紹介していければと思っています。

はてなでは、新卒・中途、東京・京都を問わず、エンジニアを募集しています。データ基盤、データ分析に興味のある人はぜひご応募ください!

エンジニア採用 - 採用情報 - 株式会社はてな

参考文献

データマネジメント知識体系ガイド 第二版

データマネジメント知識体系ガイド 第二版

データマネジメントが30分でわかる本

データマネジメントが30分でわかる本

*1:他の観点としては「壊れても簡単に復旧できる手段を用意する」「データ基盤が壊れた場合、自動的に壊れる前の状態に戻るようにする」などが挙げられますが、これらの観点はMackerelチームではまだあまり取り組めていないため、今回のエントリでは省略しています