いつもお世話になっております。
株式会社KIYONOでエンジニアをしている寺島と申します。
今回はGoogle Cloudが提供するワークフロー管理サービスである
Cloud Composerについて解説していきます。
Cloud Composerとは??
Cloud ComposerとはGoogle Cloudが提供するフルマネージドのワークフロー管理サービスです。
Apache Airflowが活用されていて、Pythonを使ってタスクやDAGなどのワークフローを定義することができます。
フルマネージドサービスなのでユーザー側でサーバーリソースなどを管理する必要がなくワークフローの作成管理にリソースを集中投下することができます。
Apache Airflowのローカルインスタンスを自ら立てて使うのではなく
Cloud Composer経由でApache AirFlowを使うことによって
ユーザーはインストールや管理にリソースを割かれることなく
ワークフローの作成という本来の業務に集中することができます。
詳細はこの後説明いたしますが
Cloud Composer上で管理されているGCSにタスク内容を記したPythonコードをアップロードするだけで
Cloud Composer側でファイルを読み取りAir Flow上で実行することが可能になってます。
Composerにおける用語について
Cloud Composerを扱うにあたっていくつか用語が出てくるので
まずはそちらから押さえておきましょう。
ワークフロー
Cloud Composerにおけるワークフローとは、
データの取り込み、変換や分析などの一連のタスクを表します。
Cloud ComposerではAirflowを使ってこのワークフローを管理する仕組みを提供します。
DAG
DAGはスケジューリングして実行するタスクの集合であります。
それらの関係を反映して編成することができます。
DAGはPythonスクリプトで作成してコードを用いてDAGの構造を定義します。
タスク
DAGを構築する際に一つ以上定義される処理のことです。
こちらのタスクに実行する順番などを定義するのでそれが結果的に
ワークフローになることになります。
Operator
Airflow内でタスクを実装する際に必要になるのがOperatorという機能です。
例えば「BiqQueryにクエリを実行してください」と言った内容を自動化する場合はBigQueryOperatorという関数を使います。
他にも
bashでの操作をオペレーションすることができるBashOperator
CloudStorageからBigQueryへの移行をサポートしているGCSToBigQueryOperatorなどがありますので
要件に合うOperatorを探してみるといいでしょう。
実際にCloud Composerを使ってみよう(ハンズオン)
環境の作成
まずは以下コマンドでCloudComposerの環境を作成しましょう。
1行目のcreate xxx の部分で任意のCloudComposerの環境名を入力します。
2行目でローケーションの指定
3行目でバージョンの指定です。
gcloud composer environments create example-environment \
--location asia-northeast1 \
--image-version composer-1.18.5-airflow-1.10.15
上記のコマンドによりCloudComposer環境が作成されました。
環境の作成によってワークフローを実行するためのGoogle Cloudコンポーネントがプロビジョニングされており
これらを総称してCloud Composer環境と呼びます。
こちらの環境にはGoogle Kubernetes Engineが採用されており
各コンポーネントを利用してワークフローを実行します。
DAGファイルの作成
DAGはスケジュールやタスクなどを定義するファイルです。
以下で[quickstart.py]をローカル環境で作成してみましょう。
import datetime
import airflow
from airflow.operators import bash_operator
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with airflow.DAG(
'composer_sample_dag',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
# Print the dag_run id from the Airflow logs
print_dag_run_conf = bash_operator.BashOperator(
task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')
DAGファイルをGCSにアップロードする
作成したDAGファイルを読み取ってCloud Composer環境に読み取ってもらえるように
管理下にあるGSCにDAGファイルをアップロードします。
以下アップロード手順です。
①
Cloud Composerコンソールに行くと
先ほど作成した名前で環境が出来上がっているはずです。該当するDAGのフォルダをクリックしてみましょう。
Cloud Composerが管理しているGSCにつながります。
②
先ほどのGCSに作成したDAGファイルをアップロードする。
③
アップロードが完了したらAirflow側に同期されているはずなので確認してみましょう。
Cloud Composerの環境ページから該当する環境のAirflow webサーバーをクリックしてみましょう。
Airflow側のコンソールに移動しますので先ほどアップロードしたDAGファイルが存在していれば成功です。
動作確認
動作確認をしてみましょう。
作成したDAGの名前が表示されているはずですのでクリックして移動します。
このような画面が表示されるはずです。
上タブから[Graph view]を選択すると
DAG内で定義したタスクが表示されるの選択しましょう。
[view log]を選択すると実行ログが表示されます。
DAGで定義していたタスクが実行されているログが残っていれば成功です。
ロギングからでも確認することができるのでそちらから確認してみてもいいかもしれません。
まとめ
いかがでしたか??
他にもデータを整形してBigQueryへの同期やインフラの自動化、Dataprocを動かすことができたりと
Cloud Composerを通して自動化や管理ができることは多岐にわたります。
インフラや自動化ツールの肥大化に伴って管理が煩雑になってきた場合は
かなり効果を発揮するのではないでしょうか。
最後までご覧いただきましてありがとうございました。
コメント