こんにちは。MackerelチームにおいてCRE(Customer Reliability Engineer)をしているid:syou6162です。主にカスタマーサクセスを支えるデータ基盤の構築や、データ分析を担当しています。
今回は、壊れにくいデータ基盤を構築するため、Mackerelチームで実践していることを紹介します。
なぜ壊れにくいデータ基盤を構築するのか
Mackerelチームのデータ基盤はさまざまな形で利用されます。ざっと挙げるだけでも、こういった用途があります。
- プロダクトオーナーや経営陣が、経営指標を把握する
- 施策のオーナーが、施策に対する仮説を構築・検証する
- セールス/マーケティングの担当者が、展示会やオンラインセミナー経由で獲得した商談の進行状況を把握する
- 機能の利用状況を把握する
- FAQサイトを継続的に改善する(著者自身のブログを参照)
このようにデータ基盤がチームのあらゆるところで利用されるようになると、壊れにくいことが求められます。
データ基盤が“壊れている”とはどういうことか
ここでは「壊れたデータ基盤」として、例えば以下のような状況を想定しています。
- テーブルやビューが壊れていて、ダッシュボードが見られない
- 出ているデータの最終更新日が古くて、データ分析者に誤った情報を与えてしまう
- 入力データのミスにより、一意であるはずデータが一意でなく、ダブルカウントされている
壊れやすいデータ基盤は信用されず、使われなくなり、価値を生まないだけではなく、場合によってはユーザーやチームに不利益をもたらし、よりよい意思決定を阻害するものになってしまいます。
壊れてないだけでなく、壊れたら気付ける
データ基盤は壊れてないだけなく、チーム内のさまざまな要望に素早く応えることも求められます。要望に応えるため、データ基盤に修正を加えることもよくありますが、何かを修正すると別のどこかが壊れるのも世の常です。壊れたらすぐ気付ける、すぐに直せる必要があります。
つまり、データ基盤が“壊れにくい”ための仕組みには、少なくとも以下の2つの観点があります*1。
- 壊れたことに気付けるよう監視する
- そもそも壊れてない状態を保つ
以降ではそれぞれについて、Mackerelチームで工夫している取り組みを紹介します。
前提とするシステム構成
これから紹介する事例の前提条件として、Mackerelチームのデータ基盤は以下の環境になっています。
- サービスやRDB: AWS上にある
- データ基盤: BigQuery(GCP)上に構築されている
- AWSからGCPへのデータ転送: EmbulkやDigdagを利用
- 可視化: Data Studioを利用
ただし、この条件に特化した部分は少ないので、ある程度は汎用的に使える仕組みになっていると思います。
壊れたことに気付けるよう監視する
何らかの修正で壊れたときのため、Mackerlチームではいくつかの監視を入れて、気付いたらすぐ直せるようにしています。
普段、この辺りはSRE(Site Reliability Engineer)と同じような気持ちで、SLI/SLOの制約下で開発スピードをなるべく上げてサービス成長に貢献できるよう、仕事をしています。
1. バッチジョブが失敗したことに気付く
基本的なことですが、データ転送などのバッチジョブには監視を入れます。具体的には以下の2種類を仕込んでいます。
- バッチの失敗自体を検知する
- バッチの処理時間が想定以上に長くないかを検知する
バッチ処理の監視には、手前味噌ですがMackerelを使っています。失敗検知にはmkr wrap
を、バッチの実行時間にはhorenso
を利用します。それぞれ以下のドキュメントを参照してください。
- mkr wrapでcronなどのバッチジョブを監視する - Mackerel ヘルプ
- 最近の砂場活動その10: songmu/horensoを使ってバッチの処理時間をMackerelのサービスメトリックに記録する - yasuhisa's blog
例えばもともとのバッチ処理がdaily_batch.sh
だとすると、以下のようにラップするだけで、簡単に監視できます。
mkr wrap --name "daily-batch" --detail \ --host ${MONITOR_HOST_ID} --warning --auto-close -- \ /path/to/horenso -t sync-daily -T \ --reporter "/work/my_host_metric_reporter ${MONITOR_HOST_ID}" -- \ daily_batch.sh
バッチ処理が失敗していたり、想定した処理時間より長くかかっていると、Slackに通知されるので、そこから原因を調べていくことができます。処理時間が長くなっている場合は、並列化や高速化を検討します。
データ転送のバッチ処理にはembulkとdigdagを利用し、AWS Fargate上で動かしています。このときタスクのサイドカーにmackerel-container-agentを仕込むことで、CPUやネットワークのどこがボトルネックになっているかを把握しやすくしています。次のドキュメントを参照してください。
2. 投入されたデータの性質を監視する
データ転送のバッチが成功し、データレイクにデータが正常に転送できたとしても、まだ安心することはできません。なぜなら、転送元のデータにそもそも誤りが存在する可能性があるからです。例えば以下のようなケースです。
- 重複を許さないはずのnameカラムに、重複したデータが入っている
- 値が0から100の間に収まるはずのカラムに、マイナスの値が入っている
こういった誤ったデータがRDBに入力されないよう、フロントエンド、サーバーサイド、RDBのチェック制約など、さまざまな箇所でバリデーションしていることでしょう。しかしながら、世の中にはいろいろなデータソースがあります。外部SaaSサービスや、スプレッドシートなどがその典型例です。
データ基盤を構築する仕事をしていると、そういったデータを取り扱うことは日常茶飯事です。こういったデータにはRDBのチェック制約などは存在せず、制約を破らないようにデータを入力することが期待されているわけですが、人手による入力ミスで制約が破られてしまうことがあります。
そこで、データを定期的にバリデーションする仕組みを、データ基盤側に入れています。やることは単純で、満たして欲しい制約と、期待する結果を、以下のようにYAMLで書くだけです。
- description: nameカラムが一意であるか sql: |- SELECT name, COUNT(*) AS names_count FROM my-project.my_dataset.my_table GROUP BY name HAVING names_count > 1 expect: []
こういったYAMLのファイルを必要なだけ用意し、クエリと期待した結果が一致するかを以下のスクリプトで検証します。
#!/bin/bash set -eu -o pipefail CWD=$(cd "$(dirname "$0")"; pwd) for file in "${CWD}"/**/*.yml; do cat "${file}" | yq -c '.[]' | while read -r json; do if [[ $(diff <(printf "%s" "$json" | jq -r .expect) <(printf "%s" "$json" | jq -r .sql | bq query --format json --use_legacy_sql=false | jq -r .) | wc -l) -gt 0 ]]; then echo "Unexpected query result for ${file} ($(printf "%s" "$json" | jq -r .description))" exit 1 fi done done
これを1時間に1回実行しておくと、仮に誤ったデータが入力されたとしても、遅くとも翌営業日くらいには「こういうデータが入っていたんですが、これって意図したものですか?」と担当者に確認できます。こうした監視をベースに、誤ったデータを取り除いたり、正常な状態に修正して、データ基盤を健全な状態に保ちます。
つい最近、BigQueryではASSERT
文によるデータのテストもサポートされたので、これを使ってもいいかもしれませんね。次のドキュメントを参照してください。
3. ビューが壊れてないかを監視する
バッチジョブの監視によって、テーブルが壊れてないかは大まかに分かります。しかし、ビューが壊れている(例えば、クエリ中のカラムが削除されて実行できない)かどうかは、それだけでは分かりません。クエリが実行されないと、失敗するかどうか分からないためです。
ものによっては1ヶ月に1回しか閲覧されないダッシュボードもありますが、そうなると壊れてることが分かるのは最悪で1ヶ月後、ということもあり得ます。ビューを壊した変更から時間が経つほど原因の特定は難しくなるので、壊れたらなるべく早く気付けるようにしたいですね。
BigQueryのbqコマンドラインツールのquery
サブコマンドには、--dry_run
というオプションがあり、無効(invalid)なクエリを投げると失敗してくれます。これを利用して、ビューが壊れたら素早く気付けるようにしています。次のドキュメントを参照してください。
4. 利用状況を監視する
データ基盤の利用が増えてくると、コストも増加してきます。クラウド破産しないよう、利用状況やコストも監視しておきましょう。
次のドキュメントを参照してください。
そもそも壊れてない状態を保つ
監視により、データ基盤が壊れたことに素早く気付けるようになりました。しかしそれだけではなく、そもそもデータ基盤が壊れてない状態を保つことが大事です。
1. データリネージを元に修正できるようにする
ビューやテーブルを生成するSQLを修正する際に、それがどこで利用されているかが事前に分かれば、変更に問題がないかどうか調査しやすくなります。
「どのデータを使って生成されているか」や「どこで利用されているか」といった情報は、データリネージと呼ばれます。データリネージが存在しない場合、以下のような問題が発生します。
- チームの要望に答えるには、既存のSQLを修正する必要がある
- 修正対象のビューやテーブルに対する影響範囲が分からない
- 修正内容はカラム名の変更や、ロジックの変更など
- データ基盤上には多数のテーブルやビューがあり、目視で確認するのは困難
- 「最近使われていなかったと思うし、修正しても影響ないだろう...」
という勘を頼りに修正する - 案の定、利用している箇所があり、データ基盤が壊れる
以下のキャプチャ画像は、データリネージの実例です。どこで参照されているかが書かれているため、修正時に考えないといけないスコープをぐっと狭くできます。
こういった情報を、ビューやテーブルの作成時に埋め込めるようにします。まず準備として、参照する側とされる側の組を以下のスクリプトで列挙します。
#!/bin/bash set -eu -o pipefail # データリネージをサポートするためのスクリプト # 参照する側とされる側の情報をmy__source__lineage.lineageテーブルに書き込む DATASETS=("my-project:my__warehouse" "my-project:my__mart") mkdir -p .lineage for dataset in "${DATASETS[@]}"; do for view in $(bq ls --max_results=1000 --format=prettyjson "${dataset}" | jq -r '.[] | select(.type == "VIEW") | .id'); do bq show --format=prettyjson --view "${view}" | jq -r '.view.query' > ".lineage/${view}.sql" done done TARGET_DATASETS=( warehouse mart ) for dataset in "${TARGET_DATASETS[@]}"; do for source_table_or_view_name in $(find "sql/${dataset}" -name '*.sql.yml' | xargs -I% basename % .sql.yml); do for target_table_or_view in $(grep -l "${dataset}.${source_table_or_view_name}" .lineage/*.sql | xargs -I% basename % .sql); do echo -e "my-project:my__${dataset}.${source_table_or_view_name}\t${target_table_or_view}" done done done > lineage.tsv bq load --source_format=CSV --encoding=UTF-8 --field_delimiter='\t' --replace \ my__source__lineage.lineage lineage.tsv source_table_or_view:string,target_table_or_view:string
以下のような結果が出力され、「table2はtable1を参照している」であるとか「table3とtable4はtable2を参照している」といったことが分かります。
次に、以下のスクリプトを経由してビューやテーブルを作ることにより、作成時に参照先の情報をDescriptionとして埋め込むことができます。
... DATA_LINEAGE_DESCRIPTION="" DATA_LINEAGE_FILENAME=".lineage.txt" bq query --max_rows 10000 --format json \ --use_legacy_sql=false "SELECT target_table_or_view FROM my-project.my__source__lineage.lineage WHERE source_table_or_view = \"my-project:${VIEW}\"" \ | jq -c '.[]' > ${DATA_LINEAGE_FILENAME} if [[ $(cat ${DATA_LINEAGE_FILENAME} | wc -l) -gt 0 ]]; then DATA_LINEAGE_DESCRIPTION="\n\nこのviewは以下で参照されています。" for line in $(cat ${DATA_LINEAGE_FILENAME}); do DATA_LINEAGE_DESCRIPTION="${DATA_LINEAGE_DESCRIPTION}\n- $(echo "$line" | jq -r .target_table_or_view)" done fi BQ_SUB_COMMAND="mk" if bq show "${VIEW}" > /dev/null; then BQ_SUB_COMMAND="update" fi bq ${BQ_SUB_COMMAND} \ --use_legacy_sql=false \ --description "${DESCRIPTION}$(echo -e "${DASHBOARDS_DESCRIPTION}")$(echo -e "${VIEW_DEFINITION_DESCRIPTION}")$(echo -e "${DATA_LINEAGE_DESCRIPTION}")" \ --view "${SQL}" \ "${VIEW}"
修正対象のテーブルやビューがどこで使われているかが一目で分かるようになり、安心して修正に挑めます。
2. 使われていないテーブルやビューは定期的に掃除
チーム内でデータ分析がどんどん行われるようになると、データウェアハウスやデータマート上にテーブルやビューが増えてきます。
一時的な調査で作ったものなどもよくあるため、気が付くと使われていないテーブルやビューがどんどん増えていき、データウェアハウスやデータマートがゴミ屋敷になることもあります。使われていないテーブルなどをそのままにしておくと、目的に合わない分析に利用されてしまう、といった問題も起きます。
そういった事態を避けるため、Mackerelチームでは四半期に一度を目安に、利用頻度の低いテーブルやビューを定期的に掃除しています。BigQueryの利用状況はaudit logに出力されるようにしているため、これを利用して以下の情報が分かるダッシュボードを作っています。
- このテーブルに対してクエリが最後に発行されたのはいつか
- このテーブルに対してクエリが最後に発行したのは誰か
- このテーブルに対してクエリが何回発行されたか
これをもとに、この四半期に利用回数が少なかったテーブルやビューを削除対象としています。
WITH raw_resource_usage AS ( SELECT protopayload_auditlog.authenticationInfo.principalEmail, timestamp AS last_queried_at, info.resource, ROW_NUMBER() OVER (PARTITION BY info.resource ORDER BY timestamp DESC) AS rank, FROM `my-project.source__cloudaudit__bigquery.cloudaudit_googleapis_com_data_access` CROSS JOIN UNNEST(protopayload_auditlog.authorizationInfo) AS info WHERE NOT REGEXP_CONTAINS(info.resource, r"LOAD_TEMP_") -- embulkのデータ転送で走るjobは除去したい AND NOT REGEXP_CONTAINS(info.resource, r"jobs") ), latest_access_by_resource AS ( SELECT * EXCEPT(rank) FROM raw_resource_usage WHERE rank = 1 ), resource_called_counts AS ( SELECT resource, COUNT(*) AS count FROM raw_resource_usage GROUP BY resource ORDER BY count DESC ), existing_tables AS ( SELECT project_id, dataset_id, table_id, FORMAT("projects/%s/datasets/%s/tables/%s", project_id, dataset_id, table_id) AS table_full_name FROM ( SELECT * FROM `my-project`.my_dataset_1.__TABLES__ UNION ALL SELECT * FROM `my-project`.my_dataset_2.__TABLES__ ) ) SELECT dataset_id, table_id, principalEmail AS last_accessed_person, last_queried_at, resource_called_counts.count AS queries_counts FROM existing_tables INNER JOIN latest_access_by_resource ON latest_access_by_resource.resource = existing_tables.table_full_name INNER JOIN resource_called_counts ON resource_called_counts.resource = existing_tables.table_full_name ORDER BY last_queried_at
長い間使われていなかったとはいえ、いきなり削除すると問題になるケースもあると思われます。チーム内では、以下のフローで削除するようにしています。
- 削除対象のテーブルに
expiration
を付与する- 例えば、何もしなければ2週間後には削除されるようにする
- 作成者や、最後にクエリを叩いた人に、次のような連絡をする
「このテーブルは長い間使われていなかったので、削除しようと思っています。もし削除されて困る場合は、expirationを外して削除されないようにしてください」 - 消えると困るものは
expiration
が外されるので、削除されない - 消して問題ないものは、何もしなければ自動的に削除される
- 一応、手元にSQLのバックアップは取っておく
以下のスクリプトで、削除対象の一覧にexpiration
を付与します。
#!/bin/bash set -u CWD=$(cd "$(dirname "$0")"; pwd) PROJECT=my-project # 2週間 = 60 * 60 * 24 * 14 EXPIRE_TIME=1209600 TARGET_TABLES_OR_VIEWS=( my_dataset.old_table1 my_dataset.old_table2 ) for t in "${TARGET_TABLES_OR_VIEWS[@]}"; do # table or viewがすでに存在しなくても全部舐めれるようにする if ! bq show "${PROJECT}:${t}" > /dev/null; then echo "${t} does not exist. Skipped..." continue fi RESULT=$(bq show --format json "${PROJECT}:${t}") if [ "$(echo "$RESULT" | jq -r .type)" = "VIEW" ]; then # viewであれば元のSQLをバックアップで手元に残しておく echo "$RESULT" | jq -r .view.query > "${CWD}/${t}.sql" fi bq update --expiration ${EXPIRE_TIME} "${PROJECT}:${t}" done
おわりに
今回は、壊れにくいデータ基盤を構築する仕組みについて紹介しました。今後、壊れにくいだけでなく使われやすくする仕組みや、このデータ基盤をベースにしたデータ分析でサービスをどう成長させるかについても紹介していければと思っています。
はてなでは、新卒・中途、東京・京都を問わず、エンジニアを募集しています。データ基盤、データ分析に興味のある人はぜひご応募ください!
参考文献
- 作者:DAMA International
- 発売日: 2018/11/30
- メディア: 単行本
図解即戦力 ビッグデータ分析のシステムと開発がこれ1冊でしっかりわかる教科書
- 作者:渡部 徹太郎
- 発売日: 2019/11/07
- メディア: 単行本(ソフトカバー)
*1:他の観点としては「壊れても簡単に復旧できる手段を用意する」「データ基盤が壊れた場合、自動的に壊れる前の状態に戻るようにする」などが挙げられますが、これらの観点はMackerelチームではまだあまり取り組めていないため、今回のエントリでは省略しています