Zabbix로 Airflow 모니터링하는 법

배경

내가 책임지고 있는 팀은
Cloud Engineer / DevOps / Data Engineer 세 파트로 나뉘어 있다.
어느 날 Data Engineer 팀원이 “Airflow DAG 성공 여부를 알람으로 받고 싶다” 는 니즈를 제기했다.

Airflow Web UI에서 DAG 실행 상태는 확인할 수 있지만, 장애 알람으로 바로 받아볼 방법은 없었다.
검색해보니 Prometheus 기반 대시보드 예제는 조금 있었지만, 정작 실패 알람에 초점을 맞춘 레퍼런스는 없었다.

그래서 결국 Discovery → Item Prototype → Trigger Prototype까지 직접 만들어서 구현했다.
이 글에서는 온프레미스 Kubernetes 환경에서 Zabbix로 Airflow DAG 상태를 모니터링했던 방법을 공유한다.: airflow rest api와 통신만 된다면 온프레미스 kubernetes 든
클라우드 환경이든 상관없이 이 방법을 응용만 하면 되는 부분이니 참고들 바란다.
zabbix agent 및 기타 설정에 관한 설명은 따로 하지 않도록 하겠다.


왜 DAG 알람만으론 부족할까?

  • DAG이 실패하는 경우도 문제지만, 더 근본적인 문제는 Airflow 자체가 정상적으로 동작하지 않는 상황이다.
  • Scheduler가 멈추면 DAG이 아예 실행되지 않고, Metadata DB 연결이 끊기면 시스템 전체가 정지된다. Trigger 프로세스가 죽으면 특정 이벤트 기반 DAG 실행이 전혀 일어나지 않는다.
  • 즉, DAG 실패 알람은 증상이고, Airflow 헬스 알람은 원인 진단에 가깝다.
  • 둘을 같이 모니터링해야 장애를 빠르게 인지하고 조치할 수 있다.

컨셉

  • Airflow REST API에서 DAG ID 목록을 주기적으로 수집한다.
  • 각 DAG 실행 결과를 API로 조회한다.
  • 최근 실행이 failed이면 Zabbix 트리거로 알람을 발생시킨다.

1. Zabbix 서버(Linux)에서 하는 작업

1-1. DAG ID 수집 스크립트 작성

Zabbix 서버에서 Airflow API를 호출해 DAG ID 목록을 가져온다.

daginfo.sh

#!/bin/bash

API_URL="http://192.102.200.97:8080/api/v1/dags"
USERNAME="testuser"
PASSWORD="testpasswd"
LIMIT=100  # 한 번에 가져올 수 있는 최대 DAG 수
OFFSET=0

# 결과 파일 초기화
> /tmp/airflow_dag_ids.txt

while true; do
    # 현재 OFFSET으로 API 호출
    curl -s -u "$USERNAME:$PASSWORD" -o /tmp/airflow_response.json -w "%{http_code}" "${API_URL}?offset=${OFFSET}" > /tm
p/http_status.txt
    HTTP_STATUS=$(cat /tmp/http_status.txt)
    RESPONSE_CONTENT=$(cat /tmp/airflow_response.json)

    # HTTP 상태 코드가 200이 아닐 경우 오류 처리
    if [ "$HTTP_STATUS" -ne 200 ]; then
        echo "Error: Received HTTP status $HTTP_STATUS" >&2
        echo "Response content: $RESPONSE_CONTENT" >&2
        break
    fi

    # dag_id만 추출하고 텍스트 파일에 추가.경로 변경되면
      zabbix master item 에도 변경된걸로 적용
    echo "$RESPONSE_CONTENT" | jq -r '.dags[].dag_id' >> /tmp/airflow_dag_ids.txt

    # 다음 OFFSET으로 이동
    OFFSET=$((OFFSET + LIMIT))

    # 더 이상 DAG가 없으면 종료
    if [ "$(echo "$RESPONSE_CONTENT" | jq '.dags | length')" -eq 0 ]; then
        break
    fi
done

# 최종 확인 메시지
echo "All DAG IDs saved to /tmp/airflow_dag_ids.txt"

  • 크론탭 등록 (3시간마다 실행): 0 */3 * * * /home/example/daginfo.sh

2. Zabbix UI에서 하는 작업

2-1. Master Item 생성

  • 이름: dagid master item
  • 키: vfs.file.contents[/tmp/airflow_dag_ids.txt]
  • 타입: Zabbix agent
  • 데이터형: 텍스트
  • 인터페이스: 127.0.0.1:10050
  • 주기: 2h

2-2. LLD(Discovery) 규칙 생성

Master Item에서 가져온 DAG ID 목록을 기반으로 Discovery 수행.

  • 이름: Airflow DAGID Discovery
  • 키: airflow.discovery.dagsid (임의 지정 가능)
  • 마스터 아이템: 위에서 만든 dagid master item
  • 전처리 (JavaScript):
try {
    // value 값 출력 (디버깅용)
    console.log("Input value received:", value);

    if (!value) {
        console.log("No value received or value is empty.");
        return JSON.stringify({ "data": [] });
    }

    // 파일의 내용을 줄 단위로 분할 (윈도우와 유닉스 개행 문자 모두 처리)
    var lines = value.split(/\r?\n/);

    // DAG ID 목록을 저장할 배열 초기화
    var data = [];

    // 각 줄을 순회하면서 JSON 객체로 변환하여 배열에 추가
    for (var i = 0; i < lines.length; i++) {
        var dag_id = lines[i].trim();  // 양 끝 공백 제거
        if (dag_id) {  // 빈 줄이 아닌 경우만 추가
            data.push({ "{#DAG_ID}": dag_id });
            console.log("DAG ID added:", dag_id);
        } else {
            console.log("Empty line at index:", i);
        }
    }

    console.log("Total DAG IDs found:", data.length);

    // JSON 형식으로 반환하여 LLD에서 사용할 수 있도록 설정
    return JSON.stringify({ "data": data });

} catch (error) {
    console.log("Error in JavaScript preprocessing:", error);
    return JSON.stringify({ "data": [] });
}



2-3. Item Prototype 생성

각 DAG 실행 상태를 REST API로 확인.

  • 이름: DAG {#DAG_ID} Runs 상태
  • 키: dagruns.status[{#DAG_ID}]
  • 데이터형 : 수치 (unsigned)
  • 타입: HTTP agent
  • URL: http://192.102.200.97:8080/api/v1/dags/{#DAG_ID}/dagRuns?order_by=-execution_date
  • 인증: Basic Auth (Airflow 계정)
  • 주기: 5m
  • 호스트 인터페이스 : 127.0.0.1:10050
  • 전처리 (JavaScript):
// JSON 데이터 파싱
var parsedData;
try {
    parsedData = JSON.parse(value);
} catch (e) {
    throw "JSON 파싱 오류 발생";
}

// 최신 dagRun만 추출
if (parsedData.dag_runs && parsedData.dag_runs.length > 0) {
    // 가장 최신의 dag_run 상태 가져오기
    var latestRun = parsedData.dag_runs[0];
    var state = latestRun.state;

    // 상태에 따라 반환값 설정
    if (state === "failed") {
        return 1;
    } else if (state === "success" || state === "running") {
        return 0;
    } else {
        return 0;
    }
} else {
    // 빈 데이터인 경우 (실행 이력이 없는 경우)
    return 0;
}
  • 반환값:
    • 실패 → 1
    • 성공/실행중 → 0

2-4. Trigger Prototype 생성

  • 이름: DAG {#DAG_ID} 상태가 정상이 아닙니다
  • 조건식: last(/Airflow - example Product/dagruns.status[{#DAG_ID}])=1
  • 복구 조건: last(/Airflow - example Product/dagruns.status[{#DAG_ID}])=0
  • 심각도: 가벼운 장애 (상황에 따라 변경)

2-5. Airflow 헬스 상태 체크
: 트리거에 대한 가이드는 생략

(1) Scheduler Health

  • 이름: Airflow 스케줄러 health 상태 체크
  • 키: airflow.health
  • 타입: HTTP agent
  • URL: http://192.102.200.97:8080/health
  • 요구 스테이터스 코드 : 200
  • 인증: Basic Auth (Airflow 계정)
  • 전처리 (JavaScript):
// JSON 데이터 파싱
var parsedData;
try {
    if (!value) throw "데이터가 비어 있습니다";
    parsedData = JSON.parse(value);
} catch (e) {
    return 1; // JSON 파싱 오류 시 비정상 상태로 간주하여 1 반환
}

// scheduler 상태 확인
if (parsedData.scheduler && parsedData.scheduler.status === "healthy") {
    return 0; // "healthy"일 경우 0 반환
} else {
    return 1; // 그 외의 경우 1 반환
}

(2) Metadata DB Health

  • 이름: Airflow metadata health
  • 키: airflow.metadata.health
  • 타입: HTTP agent
  • URL: http://192.102.200.97:8080/health
  • 요구 스테이터스 코드 : 200
  • 인증: Basic Auth (Airflow 계정)
  • 전처리 (JavaScript):
// JSON 데이터 파싱
var parsedData;
try {
    if (!value) throw "데이터가 비어 있습니다";
    parsedData = JSON.parse(value);
} catch (e) {
    return 1; // JSON 파싱 오류 시 비정상 상태로 간주하여 1 반환
}

// scheduler 상태 확인
if (parsedData.metadatabase && parsedData.metadatabase.status === "healthy") {
    return 0; // "healthy"일 경우 0 반환
} else {
    return 1; // 그 외의 경우 1 반환
}


(3) Trigger Health

  • 이름: Airflow trigger health 상태 체크
  • 키: airflow.trigger.health
  • 타입: HTTP agent
  • URL: http://192.102.200.97:8080/health
  • 요구 스테이터스 코드 : 200
  • 인증: Basic Auth (Airflow 계정)
  • 전처리 (JavaScript):
// JSON 데이터 파싱
var parsedData;
try {
    if (!value) throw "데이터가 비어 있습니다";
    parsedData = JSON.parse(value);
} catch (e) {
    return 1; // JSON 파싱 오류 시 비정상 상태로 간주하여 1 반환
}

// scheduler 상태 확인
if (parsedData.triggerer && parsedData.triggerer.status === "healthy") {
    return 0; // "healthy"일 경우 0 반환
} else {
    return 1; // 그 외의 경우 1 반환
}


마무리

이 구성을 적용하면 Zabbix가 Airflow REST API를 통해:

  • DAG ID를 자동 수집하고, 각 DAG의 최근 실행 실패 여부를 알람으로 제공
  • 동시에 Scheduler / Metadata DB / Trigger 프로세스의 헬스 상태까지 감시

즉, DAG 실패 알람(증상)Airflow 헬스 알람(원인) 을 동시에 커버할 수 있다.
airflow

환경은 온프레미스 Kubernetes였지만, 사실 이 방식은 어디서든 동일하다.
Airflow REST API에 접근 가능하기만 하면 VM, 클라우드, 매니지드 서비스 모두 적용할 수 있다.

이번의 Airflow 같은 경우
운영자가 꼭 필요한 건 대시보드를 통한 분석보다는 “즉시 알람”이다.
이 방법은 그 핵심을 충족시킨다.

마무리

이 구성을 적용하면, Zabbix가 Airflow REST API를 통해 DAG ID를 자동으로 가져오고, 각 DAG 실행 상태를 주기적으로 확인해 실패 시 알람을 발생시킨다.

  • Zabbix 서버에서는 스크립트로 DAG ID를 수집
  • Zabbix UI에서는 LLD, Item Prototype, Trigger Prototype으로 알람 체계 구성

레퍼런스가 거의 없는 영역이라, 같은 고민을 하는 엔지니어들에게 도움이 되기를 바란다.

ⓒ 2025 엉뚱한 녀석의 블로그 [quirky guy's Blog]. 본문 및 이미지를 무단 복제·배포할 수 없습니다. 공유 시 반드시 원문 링크를 명시해 주세요.
ⓒ 2025 엉뚱한 녀석의 블로그 [quirky guy's Blog]. All rights reserved. Unauthorized copying or redistribution of the text and images is prohibited. When sharing, please include the original source link.