반응형
바스 하렌슬락, 율리안 더라위터르, 『Apache Airflow 기반의 데이터 파이프라인』, 김정민, 문선홍, MANNING-제이펍(2022), p87-115.
의존성 유형
- 선형 체인(linear chain) 유형: 연속적으로 실행되는 작업
- 오른쪽 비트 시프트 연산자(>>)를 사용하여 태스크 간의 의존성을 만들 수 있음
- 여러 태스크에서 순서가 명확하게 정의되는 장점이 있음
join_datasets >> train_model >> deploy_model
- 팬인/팬아웃(Fan-in/Fan-out) 유형: 하나의 태스크가 여러 다운스트림 태스크에 연결되거나 그 반대의 동작을 수행하는 유형
- Fan-in: 다대일 구조
- Fan-out: 일대다 구조
# Fan-in
start = DummyOperator(task_id="start")
start >> [fetch_sales, fetch_weather]
# Fan-out
fetch_sales = DummyOperator(task_id="fetch_sales")
clean_sales = DummyOperator(task_id="clean_sales")
fetch_weather = DummyOperator(task_id="fetch_weather")
clean_weather = DummyOperator(task_id="clean_weather")
fetch_sales >> clean_sales
fetch_weather >> clean_weather
[clean_sales, clean_weather] >> join_datasets
브랜치
- e. g. 시스템이 개편되어 기존 데이터는 Bigquery, 새로운 데이터는 Redshift를 사용해야 하는 경우
태스크 내에서 브랜치
- 장점: DAG 자체의 구조를 수정하지 않고도 약간의 유연성을 허용할 수 있는 장점
- 코드로 분기가 가능한 유사한 태스크로 구성된 경우에만 작동함
(e. g. 새로운 데이터가 완전히 다른 태스크 체인이 필요한 경우) - 이러한 경우에는 데이터 수집을 두 개의 개별 태스크 세트로 분할하는 것이 나을 수 있음 (dag 내부에서 브랜치)
- 코드로 분기가 가능한 유사한 태스크로 구성된 경우에만 작동함
- 단점: DAG 실행 중에 Airflow에서 어떤 분기 코드를 사용하고 있는지 확인이 어려움
- 태스크에 좀 더 세세한 로깅을 포함하는 것으로 해결 가능
def _fetch_sales(**context):
if context["execution_date"] < ERP_CHANGE_DATE:
_fetch_sales_old(**context)
else:
_fetch_sales_new(**context)
dag 내부에서 브랜치
- 두 개의 개별 태스크 세트(각 시스템에 하나씩)를 개발하고 DAG가 이전 or 새로운 시스템에서 작업을 실행할 수 있도록 하는 것
- BranchPythonOperator: 다운스트림 세트 중 선택할 수 있는 기능을 제공
- BranchPythonOperator에 전달된 callable 인수는 작업 결과로 다운스트림 task의 id를 반환함
- 반환된 id는 브랜치 task 완료 후 실행할 다운스트림 task를 결정
- task list를 반환하는 경우에는 참조된 모든 task를 실행
- Fan-in 유형에서 태스크가 실행되지 않는 현상
- Airflow가 태스크 자체를 실행하기 전에 지정된 업스트림 태스크가 모두 성공적으로 완료되어야 진행하는 데,
위 케이스는 New or Old 버전의 코드만 실행되기 때문에 발생되는 케이스 - Airflow 트리거 규칙에 의해 티스크 실행 시기를 제어해야 함
- 모든 operator에 전달할 수 있는 trigger_rule 인수를 이용해 개별 task에 대해 트리거 규칙을 정의할 수 있음
- 잘못된 트리거 규칙으로 브랜치를 결합하면, 다운스트림 task는 건너뛰게 됨
- none_failed로 설정하는 경우, 1개만 성공해도 다운스트림 task가 진행되는 데,
아래 케이스는 clean_weather도 같이 실행되어야 하기 때문에 더미 태스크를 활용해서 해결해야 함
- Airflow가 태스크 자체를 실행하기 전에 지정된 업스트림 태스크가 모두 성공적으로 완료되어야 진행하는 데,
import airflow
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
ERP_CHANGE_DATE = airflow.utils.dates.days_ago(1)
def _pick_erp_system(**context):
if context["execution_date"] < ERP_CHANGE_DATE:
return "fetch_sales_old"
else:
return "fetch_sales_new"
def _fetch_sales_old(**context):
print("Fetching sales data (OLD)...")
def _fetch_sales_new(**context):
print("Fetching sales data (NEW)...")
def _clean_sales_old(**context):
print("Preprocessing sales data (OLD)...")
def _clean_sales_new(**context):
print("Preprocessing sales data (NEW)...")
with DAG(
dag_id="04_branch_dag_join",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@daily",
) as dag:
start = DummyOperator(task_id="start")
# BranchPythonOperator
pick_erp_system = BranchPythonOperator(
task_id="pick_erp_system", python_callable=_pick_erp_system
)
fetch_sales_old = PythonOperator(
task_id="fetch_sales_old", python_callable=_fetch_sales_old
)
clean_sales_old = PythonOperator(
task_id="clean_sales_old", python_callable=_clean_sales_old
)
fetch_sales_new = PythonOperator(
task_id="fetch_sales_new", python_callable=_fetch_sales_new
)
clean_sales_new = PythonOperator(
task_id="clean_sales_new", python_callable=_clean_sales_new
)
# Using Trigger rule with dummy operator
join_erp = DummyOperator(task_id="join_erp_branch", trigger_rule="none_failed")
fetch_weather = DummyOperator(task_id="fetch_weather")
clean_weather = DummyOperator(task_id="clean_weather")
join_datasets = DummyOperator(task_id="join_datasets")
train_model = DummyOperator(task_id="train_model")
deploy_model = DummyOperator(task_id="deploy_model")
start >> [pick_erp_system, fetch_weather]
pick_erp_system >> [fetch_sales_old, fetch_sales_new]
fetch_sales_old >> clean_sales_old
fetch_sales_new >> clean_sales_new
[clean_sales_old, clean_sales_new] >> join_erp
fetch_weather >> clean_weather
[join_erp, clean_weather] >> join_datasets
join_datasets >> train_model >> deploy_model
조건부 task
- Airflow는 특정 조건에 따라 DAG에서 특정 task를 건너뛸 수 있는 방법이 있음
- 특정 데이터 세트를 사용할 수 있을 때에만 실행하거나, 최근에 실행된 DAG인 경우만 task를 실행할 수 있음
task 내에서의 조건
- 예시:
- 문제: 모델을 배포하는 task에서 데이터 정제 코드를 일부 변경 후, 백필을 이용해 변경 사항을
전체 데이터 세트에 적용하면 모델 또한 필요한 인스턴스에 다시 배포되어야 함 - 조건: 가장 최근에 실행된 DAG에 대해서만 모델을 배포
- 문제: 모델을 배포하는 task에서 데이터 정제 코드를 일부 변경 후, 백필을 이용해 변경 사항을
- 방법
- 조건에 대해서만 적용되도록 DAG를 변경 가능
- PythonOperator를 사용하여 구현하고, callable 함수 내에서 DAG의 실행 날짜를 명시적으로 확인
- 단점
- 의도된 동작은 하지만, 로직 조건이 혼용되고, PythonOperator 이외의 다른 기본 제공 operator를 사용할 수 없음
- 조건이 task 내에서 내부적으로 확인되기 때문에, Airflow UI에서 task 결과를 추적할 때 혼란스러울 수 있음
def _is_latest_run(**context):
now = pendulum.now("UTC")
left_window = context["dag"].following_schedule(context["execution_date"])
right_window = context["dag"].following_schedule(left_window)
return left_window < now <= right_window
def _deploy_model(**context):
if _is_latest_run(**context):
print("Deploying model")
deploy_model = PythonOperator(task_id="deploy_model", python_callable=_deploy_model)
task 자체를 조건부화
- 미리 정의된 조건에 따라서만 실행됨
- 해당 조건을 테스트하고 조건이 실패할 때, 모든 다운스트림 작업을 건너뛰는 task를 DAG에 추가하여 task를 조건부화할 수 있음
- AirflowSkipException: Airflow가 조건과 모든 다운스트림 task를 건너뛰라는 것을 나타냄
- Airflow는 다운스트림 task의 trigger rule을 살펴보고 trigger 수행 여부를 판단함
- 기본 trigger rule은 all_success(모든 업스트림 task가 성공한 경우)가 설정된 하나의 다운스트림 task만 있음
def _latest_only(**context):
now = pendulum.now("UTC")
left_window = context["dag"].following_schedule(context["execution_date"])
right_window = context["dag"].following_schedule(left_window)
if not left_window < now <= right_window:
raise AirflowSkipException()
latest_only = PythonOperator(task_id="latest_only", python_callable=_latest_only)
latest_only >> deploy_model
내장 operator
- LastOnlyOperator 내장 클래스 활용
- 조건부 배포를 쉽게 구현 가능
from airflow.operators.latest_only import LatestOnlyOperator
latest_only = LatestOnlyOperator(task_id="latest_only", dag=dag)
join_datasets >> train_model >> deploy_model
latest_only >> deploy_model
Trigger rule
- Airflow가 DAG 실행 내에서 작업을 실행하는 방법
- 각 task를 지속적으로 확인하여 실행 여부를 확인
- task 실행이 가능하다고 판단되면, 즉시 스케줄러에 의해 선택된 후 실행을 예약
- Airflow에 사용 가능한 실행 슬롯이 있다면 즉시 task가 실행됨
- Trigger rule은 태스크 실행 시기를 결정하기 위해 필요함
- trigger rule: task의 의존성 기능과 같이 Airflow가 task가 실행 준비가 되어 있는지 여부를 결정하기 위한 필수적인 조건
- 의존성 기능 = DAG 안에서 선행 태스크 조건
- default: all_success (모든 의존적인 task가 성공적으로 완료되어야 함)
- trigger rule: task의 의존성 기능과 같이 Airflow가 task가 실행 준비가 되어 있는지 여부를 결정하기 위한 필수적인 조건
- DAG 실행 중, task 하나가 오류를 발생한 경우
- 해당 task는 실패로 기록됨
- 다운스트림 task는 upstream_failed 상태가 할당되어 all_success 트리거 규칙에 의해 실행되지 않음
- 전파(propagation): upstream task 결과가 downstream task에도 영향을 미치는 동작 유형
Task 간에 데이터 공유
- Airflow XCom(cross-communication)을 사용하여 task 간에 작은 데이터를 공유할 수 있음
- XCom은 기본적으로 task 간에 메시지를 교환하여 특정 상태를 공유할 수 있게 함
- 값 등록: xcom_push 메서드를 활용
- xcom_push에 대한 호출은 Airflow가 해당 task와 DAG 및 실행 날짜에 대한 XCom의 값으로 입력된 값을 등록
- Web interface Admin > XCom에서 게시된 XCom값을 확인 가능
- 다른 task에서 XCom값 확인: xcom_pull 메서드 활용
- 값을 가져올 때, dag_id 및 실행 날짜를 정의할 수도 있음 (default로 현재 DAG와 실행 날짜로 설정됨)
- 값 등록: xcom_push 메서드를 활용
- e. g. train model, deploy model 작업 간에 모델 식별자를 공유 가능
def _train_model(**context):
model_id = str(uuid.uuid4())
context["task_instance"].xcom_push(key="model_id", value=model_id)
def _deploy_model(**context):
model_id = context["task_instance"].xcom_pull(
task_ids="train_model", key="model_id"
)
print(f"Deploying model {model_id}")
train_model = PythonOperator(task_id="train_model", python_callable=_train_model)
deploy_model = PythonOperator(task_id="deploy_model", python_callable=_deploy_model)
- 템플릿으로 XCom 참조
def _train_model(**context):
model_id = str(uuid.uuid4())
context["task_instance"].xcom_push(key="model_id", value=model_id)
def _deploy_model(templates_dict, **context):
model_id = templates_dict["model_id"]
print(f"Deploying model {model_id}")
train_model = PythonOperator(task_id="train_model", python_callable=_train_model)
deploy_model = PythonOperator(
task_id="deploy_model",
python_callable=_deploy_model,
templates_dict={
"model_id": "{{task_instance.xcom_pull(task_ids='train_model', key='model_id')}}"
},
)
- XCom 사용 시 주의 사항
- 풀링 task는 필요한 값을 사용하기 위해 task 간에 묵시적인 의존성(implicit dependency)이 필요
- 명시적 의존성 task(explicit task)와 달리 DAG에 표시되지 않으며, task 스케줄 시에 고려되지 않음
- XCom에 의해 의존성 있는 작업이 올바른 순서로 실행할 수 있도록 해야 함 (Airflow는 고려하지 않음)
- 숨겨진 의존성으로 인해 복잡성이 증가하여 권장하지 않음
- operator의 원자성을 무너뜨리는 패턴이 될 가능성이 있음
- XCom이 저장하는 모든 값은 직렬화(serialization)를 지원해야 한다는 기술적 한계가 존재
- lambda, 여러 다중 멀티프로세스 관련 클래스 같은 일부 파이썬 유형은 저장 불가능
- 사용되는 백엔드에 의해 XCom값의 저장 크기가 제한될 수 있음
- 기본적으로 XCom은 Airflow의 메타스토어에 저장되며 크기가 제한됨
- SQLite: BLOB, 2GB
- PostgreSQL: BYTEA, 1GB
- MySQL: BLOB, 64KB
- XCom을 적절하게 사용하면 강력한 도구가 되지만, 발생할 수 있는 오류를 방지하기 위해
task 간 의존성을 명확하게 기록하고 사용법을 신중히 검토해야 함
- 풀링 task는 필요한 값을 사용하기 위해 task 간에 묵시적인 의존성(implicit dependency)이 필요
- Airflow2에서는 XCom을 유연하게 활용하기 위해 커스텀 XCom 백엔드를 지정할 수 있는 옵션이 추가됨
- 커스텀 클래스를 정의하여 XCom을 저장 및 검색할 수 있음
- BaseXCom 기본 클래스가 상속되어야 하고, 값을 직렬화 및 역직렬화(deserialization) 하기 위해
두 가지 정적 메서드를 각각 구현해야 함 - 커스텀 백엔드 클래스에서 직렬화 메서드는 XCom 값이 operator 내에서 게시될 때마다 호출되는 반면,
역직렬화 메서드는 XCom값이 백엔드에서 가져올 때 호출됨 - 커스텀 XCom 백엔드는 XCom 값 저장 선택을 다양하게 함 (e. g. Azure Blob storage, Amazon S3, GCS)
Taskflow API로 파이썬 task 연결
- Taskflow API: 객체 지향 operator API보다 파이썬 함수 사용법에 가까운 구문을 사용하여
파이썬 task와 task 간 의존성을 좀 더 간단하게 구현할 수 있도록 제공- 장점: XCom의 task 간 의존성을 확인하지 못하는 문제점을
Taskflow API는 함수 내의 task 간의 의존성을 숨기지 않고 task 간의 값을 명시적으로 전달함으로써 해결 가능 - 단점: PythonOperator를 사용하여 구현되는 파이썬 task로 제한됨
- Airflow operator와 관련된 task는 일반 API를 사용하여 task 및 task 의존성을 정의해야 함
- 장점: XCom의 task 간 의존성을 확인하지 못하는 문제점을
- 많은 task 연결에 API로 구현할 때에는 Taskflow API를 활용
- Airflow2는 Taskflow API로 파이썬 task 및 의존성을 정의하기 위한 새로운 decorator-based API를 제공
- Taskflow 유형 task 간에 전달된 데이터는 XCom을 통해 저장됨
- 전달된 모든 값은 XCom의 제약 사항이 적용됨
- 데이터 세트의 크기 또한 XCom의 백엔드에 의해 제한될 수 있음
- PythonOperator를 사용하고 XCom으로 데이터를 전달하는 것보다 코드를 단순화할 수 있음
- PythonOperator를 이용해 Airflow task를 생성해야 하는 단점이 있음
- 값을 공유하기 위해 xcom_push, xcom_pull을 명시적으로 사용하여 값을 전송 및 반환해야 함
- Taskflow API는 파이썬 함수를 task로 쉽게 변환하고, DAG 정의에서 task 간에 데이터 공유를 명확하게 함
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="12_taskflow",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@daily",
) as dag:
@task
def train_model():
model_id = str(uuid.uuid4())
return model_id
@task
def deploy_model(model_id: str):
print(f"Deploying model {model_id}")
model_id = train_model()
deploy_model(model_id)
- Taskflow API를 활용한 예시 코드
import uuid
import airflow
from airflow import DAG
from airflow.decorators import task
from airflow.operators.dummy import DummyOperator
with DAG(
dag_id="13_taskflow_full",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@daily",
) as dag:
start = DummyOperator(task_id="start")
fetch_sales = DummyOperator(task_id="fetch_sales")
clean_sales = DummyOperator(task_id="clean_sales")
fetch_weather = DummyOperator(task_id="fetch_weather")
clean_weather = DummyOperator(task_id="clean_weather")
join_datasets = DummyOperator(task_id="join_datasets")
start >> [fetch_sales, fetch_weather]
fetch_sales >> clean_sales
fetch_weather >> clean_weather
[clean_sales, clean_weather] >> join_datasets
@task
def train_model():
model_id = str(uuid.uuid4())
return model_id
@task
def deploy_model(model_id: str):
print(f"Deploying model {model_id}")
model_id = train_model()
deploy_model(model_id)
join_datasets >> model_id
반응형
'Workflow > Airflow' 카테고리의 다른 글
『Apache Airflow 기반의 데이터 파이프라인』 Chapter 4. Airflow 콘텍스트를 사용하여 태스크 템플릿 작업하기 (3) | 2024.12.07 |
---|---|
『Apache Airflow 기반의 데이터 파이프라인』 Chapter 3. Airflow의 스케줄링 (1) | 2024.01.21 |
『Apache Airflow 기반의 데이터 파이프라인』 Chapter 2. Airflow DAG의 구조 (0) | 2024.01.15 |
『Apache Airflow 기반의 데이터 파이프라인』 Chapter 1. Apache Airflow 살펴보기 (1) | 2024.01.10 |
댓글