배경
내가 책임지고 있는 팀은
Cloud Engineer / DevOps / Data Engineer 세 파트로 나뉘어 있다.
어느 날 Data Engineer 팀원이 “Airflow DAG 성공 여부를 알람으로 받고 싶다” 는 니즈를 제기했다.
Airflow Web UI에서 DAG 실행 상태는 확인할 수 있지만, 장애 알람으로 바로 받아볼 방법은 없었다.
검색해보니 Prometheus 기반 대시보드 예제는 조금 있었지만, 정작 실패 알람에 초점을 맞춘 레퍼런스는 없었다.
그래서 결국 Discovery → Item Prototype → Trigger Prototype까지 직접 만들어서 구현했다.
이 글에서는 온프레미스 Kubernetes 환경에서 Zabbix로 Airflow DAG 상태를 모니터링했던 방법을 공유한다.: airflow rest api와 통신만 된다면 온프레미스 kubernetes 든
클라우드 환경이든 상관없이 이 방법을 응용만 하면 되는 부분이니 참고들 바란다.
zabbix agent 및 기타 설정에 관한 설명은 따로 하지 않도록 하겠다.
왜 DAG 알람만으론 부족할까?
- DAG이 실패하는 경우도 문제지만, 더 근본적인 문제는 Airflow 자체가 정상적으로 동작하지 않는 상황이다.
- Scheduler가 멈추면 DAG이 아예 실행되지 않고, Metadata DB 연결이 끊기면 시스템 전체가 정지된다. Trigger 프로세스가 죽으면 특정 이벤트 기반 DAG 실행이 전혀 일어나지 않는다.
- 즉, DAG 실패 알람은 증상이고, Airflow 헬스 알람은 원인 진단에 가깝다.
- 둘을 같이 모니터링해야 장애를 빠르게 인지하고 조치할 수 있다.
컨셉
- Airflow REST API에서 DAG ID 목록을 주기적으로 수집한다.
- 각 DAG 실행 결과를 API로 조회한다.
- 최근 실행이
failed
이면 Zabbix 트리거로 알람을 발생시킨다.
1. Zabbix 서버(Linux)에서 하는 작업
1-1. DAG ID 수집 스크립트 작성
Zabbix 서버에서 Airflow API를 호출해 DAG ID 목록을 가져온다.
daginfo.sh
#!/bin/bash
API_URL="http://192.102.200.97:8080/api/v1/dags"
USERNAME="testuser"
PASSWORD="testpasswd"
LIMIT=100 # 한 번에 가져올 수 있는 최대 DAG 수
OFFSET=0
# 결과 파일 초기화
> /tmp/airflow_dag_ids.txt
while true; do
# 현재 OFFSET으로 API 호출
curl -s -u "$USERNAME:$PASSWORD" -o /tmp/airflow_response.json -w "%{http_code}" "${API_URL}?offset=${OFFSET}" > /tm
p/http_status.txt
HTTP_STATUS=$(cat /tmp/http_status.txt)
RESPONSE_CONTENT=$(cat /tmp/airflow_response.json)
# HTTP 상태 코드가 200이 아닐 경우 오류 처리
if [ "$HTTP_STATUS" -ne 200 ]; then
echo "Error: Received HTTP status $HTTP_STATUS" >&2
echo "Response content: $RESPONSE_CONTENT" >&2
break
fi
# dag_id만 추출하고 텍스트 파일에 추가.경로 변경되면
zabbix master item 에도 변경된걸로 적용
echo "$RESPONSE_CONTENT" | jq -r '.dags[].dag_id' >> /tmp/airflow_dag_ids.txt
# 다음 OFFSET으로 이동
OFFSET=$((OFFSET + LIMIT))
# 더 이상 DAG가 없으면 종료
if [ "$(echo "$RESPONSE_CONTENT" | jq '.dags | length')" -eq 0 ]; then
break
fi
done
# 최종 확인 메시지
echo "All DAG IDs saved to /tmp/airflow_dag_ids.txt"
- 크론탭 등록 (3시간마다 실행):
0 */3 * * * /home/example/daginfo.sh
2. Zabbix UI에서 하는 작업
2-1. Master Item 생성
- 이름:
dagid master item
- 키:
vfs.file.contents[/tmp/airflow_dag_ids.txt]
- 타입: Zabbix agent
- 데이터형: 텍스트
- 인터페이스:
127.0.0.1:10050
- 주기: 2h

2-2. LLD(Discovery) 규칙 생성
Master Item에서 가져온 DAG ID 목록을 기반으로 Discovery 수행.
- 이름:
Airflow DAGID Discovery
- 키:
airflow.discovery.dagsid
(임의 지정 가능) - 마스터 아이템: 위에서 만든
dagid master item
- 전처리 (JavaScript):
try {
// value 값 출력 (디버깅용)
console.log("Input value received:", value);
if (!value) {
console.log("No value received or value is empty.");
return JSON.stringify({ "data": [] });
}
// 파일의 내용을 줄 단위로 분할 (윈도우와 유닉스 개행 문자 모두 처리)
var lines = value.split(/\r?\n/);
// DAG ID 목록을 저장할 배열 초기화
var data = [];
// 각 줄을 순회하면서 JSON 객체로 변환하여 배열에 추가
for (var i = 0; i < lines.length; i++) {
var dag_id = lines[i].trim(); // 양 끝 공백 제거
if (dag_id) { // 빈 줄이 아닌 경우만 추가
data.push({ "{#DAG_ID}": dag_id });
console.log("DAG ID added:", dag_id);
} else {
console.log("Empty line at index:", i);
}
}
console.log("Total DAG IDs found:", data.length);
// JSON 형식으로 반환하여 LLD에서 사용할 수 있도록 설정
return JSON.stringify({ "data": data });
} catch (error) {
console.log("Error in JavaScript preprocessing:", error);
return JSON.stringify({ "data": [] });
}


2-3. Item Prototype 생성
각 DAG 실행 상태를 REST API로 확인.
- 이름:
DAG {#DAG_ID} Runs 상태
- 키:
dagruns.status[{#DAG_ID}]
- 데이터형 : 수치 (unsigned)
- 타입: HTTP agent
- URL:
http://192.
102.200.97
:8080/api/v1/dags/{#DAG_ID}/dagRuns?order_by=-execution_date - 인증: Basic Auth (Airflow 계정)
- 주기: 5m
- 호스트 인터페이스 : 127.0.0.1:10050
- 전처리 (JavaScript):
// JSON 데이터 파싱
var parsedData;
try {
parsedData = JSON.parse(value);
} catch (e) {
throw "JSON 파싱 오류 발생";
}
// 최신 dagRun만 추출
if (parsedData.dag_runs && parsedData.dag_runs.length > 0) {
// 가장 최신의 dag_run 상태 가져오기
var latestRun = parsedData.dag_runs[0];
var state = latestRun.state;
// 상태에 따라 반환값 설정
if (state === "failed") {
return 1;
} else if (state === "success" || state === "running") {
return 0;
} else {
return 0;
}
} else {
// 빈 데이터인 경우 (실행 이력이 없는 경우)
return 0;
}
- 반환값:
- 실패 →
1
- 성공/실행중 →
0
- 실패 →



2-4. Trigger Prototype 생성
- 이름:
DAG {#DAG_ID} 상태가 정상이 아닙니다
- 조건식:
last(/Airflow - example Product/dagruns.status[{#DAG_ID}])=1
- 복구 조건:
last(/Airflow - example Product/dagruns.status[{#DAG_ID}])=0
- 심각도: 가벼운 장애 (상황에 따라 변경)

2-5. Airflow 헬스 상태 체크
: 트리거에 대한 가이드는 생략
(1) Scheduler Health
- 이름:
Airflow 스케줄러 health 상태 체크
- 키:
airflow.health
- 타입: HTTP agent
- URL:
http://192.102.200.97:8080/health
- 요구 스테이터스 코드 : 200
- 인증: Basic Auth (Airflow 계정)
- 전처리 (JavaScript):
// JSON 데이터 파싱
var parsedData;
try {
if (!value) throw "데이터가 비어 있습니다";
parsedData = JSON.parse(value);
} catch (e) {
return 1; // JSON 파싱 오류 시 비정상 상태로 간주하여 1 반환
}
// scheduler 상태 확인
if (parsedData.scheduler && parsedData.scheduler.status === "healthy") {
return 0; // "healthy"일 경우 0 반환
} else {
return 1; // 그 외의 경우 1 반환
}



(2) Metadata DB Health
- 이름:
Airflow metadata health
- 키:
airflow.metadata.health
- 타입: HTTP agent
- URL:
http://192.102.200.97:8080/health
- 요구 스테이터스 코드 : 200
- 인증: Basic Auth (Airflow 계정)
- 전처리 (JavaScript):
// JSON 데이터 파싱
var parsedData;
try {
if (!value) throw "데이터가 비어 있습니다";
parsedData = JSON.parse(value);
} catch (e) {
return 1; // JSON 파싱 오류 시 비정상 상태로 간주하여 1 반환
}
// scheduler 상태 확인
if (parsedData.metadatabase && parsedData.metadatabase.status === "healthy") {
return 0; // "healthy"일 경우 0 반환
} else {
return 1; // 그 외의 경우 1 반환
}



(3) Trigger Health
- 이름:
Airflow trigger health 상태 체크
- 키:
airflow.trigger.health
- 타입: HTTP agent
- URL:
http://192.102.200.97:8080/health
- 요구 스테이터스 코드 : 200
- 인증: Basic Auth (Airflow 계정)
- 전처리 (JavaScript):
// JSON 데이터 파싱
var parsedData;
try {
if (!value) throw "데이터가 비어 있습니다";
parsedData = JSON.parse(value);
} catch (e) {
return 1; // JSON 파싱 오류 시 비정상 상태로 간주하여 1 반환
}
// scheduler 상태 확인
if (parsedData.triggerer && parsedData.triggerer.status === "healthy") {
return 0; // "healthy"일 경우 0 반환
} else {
return 1; // 그 외의 경우 1 반환
}



마무리
이 구성을 적용하면 Zabbix가 Airflow REST API를 통해:
- DAG ID를 자동 수집하고, 각 DAG의 최근 실행 실패 여부를 알람으로 제공
- 동시에 Scheduler / Metadata DB / Trigger 프로세스의 헬스 상태까지 감시
즉, DAG 실패 알람(증상) 과 Airflow 헬스 알람(원인) 을 동시에 커버할 수 있다.
airflow
환경은 온프레미스 Kubernetes였지만, 사실 이 방식은 어디서든 동일하다.
Airflow REST API에 접근 가능하기만 하면 VM, 클라우드, 매니지드 서비스 모두 적용할 수 있다.
이번의 Airflow 같은 경우
운영자가 꼭 필요한 건 대시보드를 통한 분석보다는 “즉시 알람”이다.
이 방법은 그 핵심을 충족시킨다.
마무리
이 구성을 적용하면, Zabbix가 Airflow REST API를 통해 DAG ID를 자동으로 가져오고, 각 DAG 실행 상태를 주기적으로 확인해 실패 시 알람을 발생시킨다.
- Zabbix 서버에서는 스크립트로 DAG ID를 수집
- Zabbix UI에서는 LLD, Item Prototype, Trigger Prototype으로 알람 체계 구성
레퍼런스가 거의 없는 영역이라, 같은 고민을 하는 엔지니어들에게 도움이 되기를 바란다.
ⓒ 2025 엉뚱한 녀석의 블로그 [quirky guy's Blog]. All rights reserved. Unauthorized copying or redistribution of the text and images is prohibited. When sharing, please include the original source link.
답글 남기기
댓글을 달기 위해서는 로그인해야합니다.