ZabbixでAirflowを監視する方法

背景

私が担当するチームは Cloud Engineer・DevOps・Data Engineer の三つのパートに分かれています。ある日 Data Engineer のメンバーが「Airflow の DAG の成否をアラートで受け取りたい」というニーズを挙げました。Airflow の Web UI では DAG の実行状態を確認できますが、障害を即座にアラートとして受け取る方法はありませんでした。検索してみると Prometheus ベースのダッシュボード例は少し見つかったものの、失敗アラートに焦点を当てたリファレンスは見当たりませんでした。そこで最終的に、ディスカバリルール → アイテムプロトタイプ → トリガープロトタイプまで自分で作成し、実装しました。

この記事ではオンプレミスの Kubernetes 環境で Zabbix を用いて Airflow の DAG 状態を監視した方法を共有します。Airflow REST API と通信できれば、オンプレミスの Kubernetes でもクラウド環境でも同じ方法を応用できます。ここでは Zabbix agent やその他の設定については触れません。

なぜ DAG アラートだけでは不十分なのか?

DAG が失敗することも問題ですが、それ以上に根本的な問題は Airflow 自体が正常に動作していない状況です。Scheduler が停止すると DAG が実行されず、Metadata データベースへの接続が切れるとシステム全体が止まります。Trigger プロセスが死んでいるとイベント駆動の DAG はまったく実行されません。つまり DAG の失敗アラートは“症状”であり、Airflow のヘルスチェックは“原因”に近い警告です。両方を一緒に監視することで、障害を素早く認識し、対処できます。

コンセプト

  1. Airflow REST API から定期的に DAG ID 一覧を収集します。
  2. 各 DAG の実行結果を API で取得します。
  3. 直近の実行が failed であれば Zabbix のトリガーでアラートを発生させます。

以下の手順では Zabbix サーバ(Linux)側の作業と Zabbix の UI 上での設定に分けて解説します。オンプレミスの Kubernetes 環境での例ですが、Airflow REST API にアクセスできる環境であれば同じ構成を応用できます。

1. Zabbixサーバ(Ubuntu)
側の作業

1‑1. DAG ID 収集スクリプトの作成

Zabbix サーバから Airflow API を呼び出し、DAG ID 一覧を取得するシェルスクリプトを用意します。daginfo.sh では API URL やユーザー名・パスワードなどは各自の運用環境に合わせて修正してください。

#!/bin/bash

API_URL="http://192.102.200.97:8080/api/v1/dags"
USERNAME="testuser"
PASSWORD="testpasswd"
LIMIT=100  # 一度に取得する DAG の最大数
OFFSET=0

OUT_FILE="/tmp/airflow_dag_ids.txt"
RESP_FILE="/tmp/airflow_response.json"

# 結果ファイルを初期化
> "$OUT_FILE"

while true; do
    # 現在の OFFSET と LIMIT を使用して API を呼び出し、HTTP ステータスコードを取得
    HTTP_STATUS=$(curl -s -u "$USERNAME:$PASSWORD" -o "$RESP_FILE" -w '%{http_code}' \
        "${API_URL}?limit=${LIMIT}&offset=${OFFSET}")

    # HTTP ステータスコードを確認
    if [ "$HTTP_STATUS" -ne 200 ]; then
        echo "エラー: HTTP ステータス $HTTP_STATUS を受信しました" >&2
        echo "レスポンス内容:" >&2
        cat "$RESP_FILE" >&2
        break
    fi

    # レスポンスから dag_id のみ抽出してテキストファイルに追記
    # (パスを変更する場合は Zabbix のマスターアイテムも同様に変更する必要がある)
    jq -r '.dags[].dag_id' "$RESP_FILE" >> "$OUT_FILE"

    # 今回のレスポンスに含まれる DAG の数を取得
    DAG_COUNT=$(jq '.dags | length' "$RESP_FILE")

    # DAG がもう存在しない場合、処理を終了
    if [ "$DAG_COUNT" -eq 0 ]; then
        break
    fi

    # 次の OFFSET に進める
    OFFSET=$((OFFSET + LIMIT))
done

echo "すべての DAG ID を $OUT_FILE に保存しました"

このスクリプトでは jq を使用するため、事前に以下のようにインストールしておきます。

#apt install -y jq
#chmod +x daginfo.sh
#./daginfo.sh

初回実行後、cron に登録して定期実行します(ここでは 3 時間ごと)。

0 */3 * * * /home/example/daginfo.sh

2. Zabbix (7.4) UI での作業

事前に Airflow DAG 用のホストグループとホストを作成しておきます。

2‑1. マスターアイテムの作成

Airflow API から取得した DAG ID 一覧を読み込むマスターアイテムを作成します。

項目設定
名前dagid マスターアイテム
キーvfs.file.contents[/tmp/airflow_dag_ids.txt]
タイプZabbix agent
データ型文字列
インターフェース127.0.0.1:10050
更新間隔2h

2‑2. LLD(ディスカバリ)ルールの作成

マスターアイテムで取得した DAG ID 一覧を基にローレベルディスカバリを行います。ホストのディスカバリタブから「ディスカバリルールの作成」をクリックし、以下を設定します。

項目設定
名前Airflow DAGID Discovery
タイプ依存アイテム
キーairflow.discovery.dagsid(任意で指定可能)
マスターアイテム先ほど作成した dagid マスターアイテム

前処理(JavaScript)

try {
    // 入力値を出力(デバッグ用)
    console.log("Input value received:", value);

    if (!value) {
        console.log("No value received or value is empty.");
        return JSON.stringify({ "data": [] });
    }

    // ファイル内容を行単位で分割(Windows/Unix の改行に対応)
    var lines = value.split(/\r?\n/);

    // DAG ID のリストを格納する配列
    var data = [];

    // 各行を走査し JSON オブジェクトに変換して配列へ追加
    for (var i = 0; i < lines.length; i++) {
        var dag_id = lines[i].trim(); // 先頭と末尾の空白を除去
        if (dag_id) { // 空行でなければ追加
            data.push({ "{#DAG_ID}": dag_id });
            console.log("DAG ID added:", dag_id);
        } else {
            console.log("Empty line at index:", i);
        }
    }

    console.log("Total DAG IDs found:", data.length);

    // JSON 形式で返して LLD に利用する
    return JSON.stringify({ "data": data });

} catch (error) {
    console.log("Error in JavaScript preprocessing:", error);
    return JSON.stringify({ "data": [] });
}

このスクリプトで得られた {#DAG_ID} マクロは、後述するアイテムプロトタイプとトリガープロトタイプで利用します。

2‑3. アイテムプロトタイプの作成

各 DAG の最新実行状態を Airflow API で取得するアイテムプロトタイプを作成します。ディスカバリルールの「アイテムプロトタイプ」タブで次のように設定します。

項目設定
名前DAG {#DAG_ID} Runs 状態
タイプHTTP エージェント
キーdagruns.status[{#DAG_ID}]
データ型数値(符号なし)
URLhttp://192.102.200.97:8080/api/v1/dags/{#DAG_ID}/dagRuns?order_by=-execution_date(実際の API IP に置き換え)
HTTP 認証Basic Auth(Airflow アカウント)
更新間隔5m
ホストインターフェース127.0.0.1:10050

前処理(JavaScript)

// JSON データを解析
var parsedData;
try {
    parsedData = JSON.parse(value);
} catch (e) {
    throw "JSON パースエラー";
}

// 最新の dagRun だけを取得
if (parsedData.dag_runs && parsedData.dag_runs.length > 0) {
    var latestRun = parsedData.dag_runs[0];
    var state = latestRun.state;

    // 状態に応じて返り値を設定
    if (state === "failed") {
        return 1;
    } else if (state === "success" || state === "running") {
        return 0;
    } else {
        return 0;
    }
} else {
    // 実行履歴がない場合
    return 0;
}

このアイテムプロトタイプは failed の場合に 1 を返し、success または running の場合は 0 を返します。

2‑4. トリガープロトタイプの作成

DAG の状態を判定するトリガープロトタイプを作成します。アイテムプロトタイプの値を使用して条件式と復旧条件式を設定します。

項目設定
名前DAG {#DAG_ID} 状態が正常ではありません
条件式last(/Airflow - example Product/dagruns.status[{#DAG_ID}])=1
復旧条件式last(/Airflow - example Product/dagruns.status[{#DAG_ID}])=0
深刻度軽度の障害

ここでの深刻度は Zabbix の深刻度定義に基づき 軽度の障害 を指定します。Zabbix では深刻度ごとに色や通知方法を変えられます

2‑5. Airflow のヘルスチェック用アイテム

DAG の失敗アラートは症状に過ぎないため、Airflow の主要プロセスのヘルス状態も監視します。ここでは Scheduler・Metadata DB・Trigger プロセスのヘルスをそれぞれ HTTP エージェントアイテムで取得します。

(1) Scheduler ヘルス

項目設定
名前Airflow スケジューラのヘルス状態チェック
タイプHTTP エージェント
キーairflow.health
データ型数値(符号なし)
URLhttp://192.102.200.97:8080/health
要求ステータスコード200
認証Basic Auth(Airflow アカウント)
更新間隔適宜

前処理(JavaScript)

// JSON データを解析
var parsedData;
try {
    if (!value) throw "データが空です";
    parsedData = JSON.parse(value);
} catch (e) {
    return 1; // JSON パースエラー時は異常とみなし 1 を返す
}

// scheduler の状態を確認
if (parsedData.scheduler && parsedData.scheduler.status === "healthy") {
    return 0; // healthy なら 0
} else {
    return 1; // それ以外は 1
}

(2) Metadata DB ヘルス

項目設定
名前Airflow metadata DB ヘルス
タイプHTTP エージェント
キーairflow.metadata.health
データ型数値(符号なし)
URLhttp://192.102.200.97:8080/health
要求ステータスコード200
認証Basic Auth(Airflow アカウント)

前処理(JavaScript)

// JSON データを解析
var parsedData;
try {
    if (!value) throw "データが空です";
    parsedData = JSON.parse(value);
} catch (e) {
    return 1; // JSON パースエラー時は異常とみなす
}

// metadatabase の状態を確認
if (parsedData.metadatabase && parsedData.metadatabase.status === "healthy") {
    return 0; // healthy なら 0
} else {
    return 1; // それ以外は 1
}

(3) Trigger プロセスヘルス

項目設定
名前Airflow trigger のヘルス状態チェック
タイプHTTP エージェント
キーairflow.trigger.health
データ型数値(符号なし)
URLhttp://192.102.200.97:8080/health
要求ステータスコード200
認証Basic Auth(Airflow アカウント)

前処理(JavaScript)

// JSON データを解析
var parsedData;
try {
    if (!value) throw "データが空です";
    parsedData = JSON.parse(value);
} catch (e) {
    return 1; // JSON パースエラー時は異常とみなす
}

// triggerer の状態を確認
if (parsedData.triggerer && parsedData.triggerer.status === "healthy") {
    return 0; // healthy なら 0
} else {
    return 1; // それ以外は 1
}

まとめ

この構成を適用すると、Zabbix が Airflow REST API を通じて DAG ID を自動的に収集し、各 DAG の最新実行状態を定期的にチェックして失敗時にアラートを発生させます。さらに Scheduler・Metadata データベース・Trigger プロセスのヘルス状態も監視することで、DAG の失敗という“症状”と Airflow のヘルスという“原因”を同時に把握できます。この方法はオンプレミスの Kubernetes 環境を例にしていますが、Airflow REST API にアクセスできる環境であれば VM やクラウド、マネージドサービスでも同じように利用できます。

Zabbix の深刻度設定では 軽度の障害重度の障害致命的な障害 などのレベルがあり トリガーの問題式と回復式を使って複雑な条件を定義できます
Airflow の運用では大きなダッシュボードよりも状態変化を即座に通知してくれるアラートが重要であり、この方法はその要求を正確に満たします。

🛠 마지막 수정일: 2025.11.26

💡 お困りですか?
Zabbix、Kubernetes、各種オープンソースインフラの構築・運用・最適化・障害解析が必要であれば、いつでもご連絡ください。

📧 メール: jikimy75@gmail.com
💼 サービス: 導入支援 | 性能チューニング | 障害解析コンサルティング