본문 바로가기
Workflow/Airflow

『Apache Airflow 기반의 데이터 파이프라인』 Chapter 5. 태스크 간 의존성 정의하기

by Night Fury 2024. 12. 10.
반응형
바스 하렌슬락, 율리안 더라위터르, 『Apache Airflow 기반의 데이터 파이프라인』, 김정민, 문선홍, MANNING-제이펍(2022), p87-115.

 

의존성 유형

  • 선형 체인(linear chain) 유형: 연속적으로 실행되는 작업
    • 오른쪽 비트 시프트 연산자(>>)를 사용하여 태스크 간의 의존성을 만들 수 있음
    • 여러 태스크에서 순서가 명확하게 정의되는 장점이 있음
join_datasets >> train_model >> deploy_model

 

  • 팬인/팬아웃(Fan-in/Fan-out) 유형: 하나의 태스크가 여러 다운스트림 태스크에 연결되거나 그 반대의 동작을 수행하는 유형
    • Fan-in: 다대일 구조
    • Fan-out: 일대다 구조
# Fan-in
start = DummyOperator(task_id="start")
start >> [fetch_sales, fetch_weather]

# Fan-out
fetch_sales = DummyOperator(task_id="fetch_sales")
clean_sales = DummyOperator(task_id="clean_sales")

fetch_weather = DummyOperator(task_id="fetch_weather")
clean_weather = DummyOperator(task_id="clean_weather")

fetch_sales >> clean_sales
fetch_weather >> clean_weather
[clean_sales, clean_weather] >> join_datasets

 

브랜치

  • e. g. 시스템이 개편되어 기존 데이터는 Bigquery, 새로운 데이터는 Redshift를 사용해야 하는 경우

태스크 내에서 브랜치

  • 장점: DAG 자체의 구조를 수정하지 않고도 약간의 유연성을 허용할 수 있는 장점
    • 코드로 분기가 가능한 유사한 태스크로 구성된 경우에만 작동함
      (e. g. 새로운 데이터가 완전히 다른 태스크 체인이 필요한 경우)
    • 이러한 경우에는 데이터 수집을 두 개의 개별 태스크 세트로 분할하는 것이 나을 수 있음 (dag 내부에서 브랜치)
  • 단점: DAG 실행 중에 Airflow에서 어떤 분기 코드를 사용하고 있는지 확인이 어려움
    • 태스크에 좀 더 세세한 로깅을 포함하는 것으로 해결 가능
def _fetch_sales(**context):
    if context["execution_date"] < ERP_CHANGE_DATE:
        _fetch_sales_old(**context)
    else:
        _fetch_sales_new(**context)

 

dag 내부에서 브랜치

    • 두 개의 개별 태스크 세트(각 시스템에 하나씩)를 개발하고 DAG가 이전 or 새로운 시스템에서 작업을 실행할 수 있도록 하는 것
    • BranchPythonOperator: 다운스트림 세트 중 선택할 수 있는 기능을 제공
      • BranchPythonOperator에 전달된 callable 인수는 작업 결과로 다운스트림 task의 id를 반환함
      • 반환된 id는 브랜치 task 완료 후 실행할 다운스트림 task를 결정
      • task list를 반환하는 경우에는 참조된 모든 task를 실행
    • Fan-in 유형에서 태스크가 실행되지 않는 현상
      • Airflow가 태스크 자체를 실행하기 전에 지정된 업스트림 태스크가 모두 성공적으로 완료되어야 진행하는 데,
        위 케이스는 New or Old 버전의 코드만 실행되기 때문에 발생되는 케이스
      • Airflow 트리거 규칙에 의해 티스크 실행 시기를 제어해야 함
        • 모든 operator에 전달할 수 있는 trigger_rule 인수를 이용해 개별 task에 대해 트리거 규칙을 정의할 수 있음
        • 잘못된 트리거 규칙으로 브랜치를 결합하면, 다운스트림 task는 건너뛰게 됨
        • none_failed로 설정하는 경우, 1개만 성공해도 다운스트림 task가 진행되는 데,
          아래 케이스는 clean_weather도 같이 실행되어야 하기 때문에 더미 태스크를 활용해서 해결해야 함

original

 

solution

import airflow

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator

ERP_CHANGE_DATE = airflow.utils.dates.days_ago(1)


def _pick_erp_system(**context):
    if context["execution_date"] < ERP_CHANGE_DATE:
        return "fetch_sales_old"
    else:
        return "fetch_sales_new"


def _fetch_sales_old(**context):
    print("Fetching sales data (OLD)...")


def _fetch_sales_new(**context):
    print("Fetching sales data (NEW)...")


def _clean_sales_old(**context):
    print("Preprocessing sales data (OLD)...")


def _clean_sales_new(**context):
    print("Preprocessing sales data (NEW)...")


with DAG(
    dag_id="04_branch_dag_join",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@daily",
) as dag:
    start = DummyOperator(task_id="start")
	
    # BranchPythonOperator
    pick_erp_system = BranchPythonOperator(
        task_id="pick_erp_system", python_callable=_pick_erp_system
    )

    fetch_sales_old = PythonOperator(
        task_id="fetch_sales_old", python_callable=_fetch_sales_old
    )
    clean_sales_old = PythonOperator(
        task_id="clean_sales_old", python_callable=_clean_sales_old
    )

    fetch_sales_new = PythonOperator(
        task_id="fetch_sales_new", python_callable=_fetch_sales_new
    )
    clean_sales_new = PythonOperator(
        task_id="clean_sales_new", python_callable=_clean_sales_new
    )
	
    # Using Trigger rule with dummy operator
    join_erp = DummyOperator(task_id="join_erp_branch", trigger_rule="none_failed")

    fetch_weather = DummyOperator(task_id="fetch_weather")
    clean_weather = DummyOperator(task_id="clean_weather")

    join_datasets = DummyOperator(task_id="join_datasets")
    train_model = DummyOperator(task_id="train_model")
    deploy_model = DummyOperator(task_id="deploy_model")

    start >> [pick_erp_system, fetch_weather]
    pick_erp_system >> [fetch_sales_old, fetch_sales_new]
    fetch_sales_old >> clean_sales_old
    fetch_sales_new >> clean_sales_new
    [clean_sales_old, clean_sales_new] >> join_erp
    fetch_weather >> clean_weather
    [join_erp, clean_weather] >> join_datasets
    join_datasets >> train_model >> deploy_model

 

조건부 task

  • Airflow는 특정 조건에 따라 DAG에서 특정 task를 건너뛸 수 있는 방법이 있음
  • 특정 데이터 세트를 사용할 수 있을 때에만 실행하거나, 최근에 실행된 DAG인 경우만 task를 실행할 수 있음

task 내에서의 조건

  • 예시:
    • 문제: 모델을 배포하는 task에서 데이터 정제 코드를 일부 변경 후, 백필을 이용해 변경 사항을
      전체 데이터 세트에 적용하면 모델 또한 필요한 인스턴스에 다시 배포되어야 함
    • 조건: 가장 최근에 실행된 DAG에 대해서만 모델을 배포
  • 방법
    • 조건에 대해서만 적용되도록 DAG를 변경 가능
    • PythonOperator를 사용하여 구현하고, callable 함수 내에서 DAG의 실행 날짜를 명시적으로 확인
  • 단점
    • 의도된 동작은 하지만, 로직 조건이 혼용되고, PythonOperator 이외의 다른 기본 제공 operator를 사용할 수 없음
    • 조건이 task 내에서 내부적으로 확인되기 때문에, Airflow UI에서 task 결과를 추적할 때 혼란스러울 수 있음
def _is_latest_run(**context):
    now = pendulum.now("UTC")
    left_window = context["dag"].following_schedule(context["execution_date"])
    right_window = context["dag"].following_schedule(left_window)
    return left_window < now <= right_window
    
def _deploy_model(**context):
    if _is_latest_run(**context):
        print("Deploying model")
        
deploy_model = PythonOperator(task_id="deploy_model", python_callable=_deploy_model)

 

task 자체를 조건부화

  • 미리 정의된 조건에 따라서만 실행됨 
  • 해당 조건을 테스트하고 조건이 실패할 때, 모든 다운스트림 작업을 건너뛰는 task를 DAG에 추가하여 task를 조건부화할 수 있음
  • AirflowSkipException: Airflow가 조건과 모든 다운스트림 task를 건너뛰라는 것을 나타냄
    • Airflow는 다운스트림 task의 trigger rule을 살펴보고 trigger 수행 여부를 판단함
    • 기본 trigger rule은 all_success(모든 업스트림 task가 성공한 경우)가 설정된 하나의 다운스트림 task만 있음
def _latest_only(**context):
    now = pendulum.now("UTC")
    left_window = context["dag"].following_schedule(context["execution_date"])
    right_window = context["dag"].following_schedule(left_window)

    if not left_window < now <= right_window:
        raise AirflowSkipException()
        
latest_only = PythonOperator(task_id="latest_only", python_callable=_latest_only)        

latest_only >> deploy_model

 

내장 operator

  • LastOnlyOperator 내장 클래스 활용
    • 조건부 배포를 쉽게 구현 가능
from airflow.operators.latest_only import LatestOnlyOperator

latest_only = LatestOnlyOperator(task_id="latest_only", dag=dag)

join_datasets >> train_model >> deploy_model
latest_only >> deploy_model

 

Trigger rule

  • Airflow가 DAG 실행 내에서 작업을 실행하는 방법
    • 각 task를 지속적으로 확인하여 실행 여부를 확인
    • task 실행이 가능하다고 판단되면, 즉시 스케줄러에 의해 선택된 후 실행을 예약
    • Airflow에 사용 가능한 실행 슬롯이 있다면 즉시 task가 실행됨
  • Trigger rule은 태스크 실행 시기를 결정하기 위해 필요함
    • trigger rule: task의 의존성 기능과 같이 Airflow가 task가 실행 준비가 되어 있는지 여부를 결정하기 위한 필수적인 조건
      • 의존성 기능 = DAG 안에서 선행 태스크 조건
      • default: all_success (모든 의존적인 task가 성공적으로 완료되어야 함)
  • DAG 실행 중, task 하나가 오류를 발생한 경우
    • 해당 task는 실패로 기록됨
    • 다운스트림 task는 upstream_failed 상태가 할당되어 all_success 트리거 규칙에 의해 실행되지 않음
      • 전파(propagation): upstream task 결과가 downstream task에도 영향을 미치는 동작 유형

 

Task 간에 데이터 공유

  • Airflow XCom(cross-communication)을 사용하여 task 간에 작은 데이터를 공유할 수 있음
  • XCom은 기본적으로 task 간에 메시지를 교환하여 특정 상태를 공유할 수 있게 함
    • 값 등록: xcom_push 메서드를 활용
      • xcom_push에 대한 호출은 Airflow가 해당 task와 DAG 및 실행 날짜에 대한 XCom의 값으로 입력된 값을 등록
      • Web interface Admin > XCom에서 게시된 XCom값을 확인 가능
    • 다른 task에서 XCom값 확인: xcom_pull 메서드 활용
      • 값을 가져올 때, dag_id 및 실행 날짜를 정의할 수도 있음 (default로 현재 DAG와 실행 날짜로 설정됨)
  • e. g. train model, deploy model 작업 간에 모델 식별자를 공유 가능
def _train_model(**context):
    model_id = str(uuid.uuid4())
    context["task_instance"].xcom_push(key="model_id", value=model_id)


def _deploy_model(**context):
    model_id = context["task_instance"].xcom_pull(
        task_ids="train_model", key="model_id"
    )
    print(f"Deploying model {model_id}")
    
    
train_model = PythonOperator(task_id="train_model", python_callable=_train_model)
deploy_model = PythonOperator(task_id="deploy_model", python_callable=_deploy_model)

Web interface Admin > XCom

 

  • 템플릿으로 XCom 참조
def _train_model(**context):
    model_id = str(uuid.uuid4())
    context["task_instance"].xcom_push(key="model_id", value=model_id)


def _deploy_model(templates_dict, **context):
    model_id = templates_dict["model_id"]
    print(f"Deploying model {model_id}")


train_model = PythonOperator(task_id="train_model", python_callable=_train_model)

deploy_model = PythonOperator(
    task_id="deploy_model",
    python_callable=_deploy_model,
    templates_dict={
        "model_id": "{{task_instance.xcom_pull(task_ids='train_model', key='model_id')}}"
    },
)

 

  • XCom 사용 시 주의 사항
    • 풀링 task는 필요한 값을 사용하기 위해 task 간에 묵시적인 의존성(implicit dependency)이 필요
      • 명시적 의존성 task(explicit task)와 달리 DAG에 표시되지 않으며, task 스케줄 시에 고려되지 않음
      • XCom에 의해 의존성 있는 작업이 올바른 순서로 실행할 수 있도록 해야 함 (Airflow는 고려하지 않음)
      • 숨겨진 의존성으로 인해 복잡성이 증가하여 권장하지 않음
    • operator의 원자성을 무너뜨리는 패턴이 될 가능성이 있음
    • XCom이 저장하는 모든 값은 직렬화(serialization)를 지원해야 한다는 기술적 한계가 존재
      • lambda, 여러 다중 멀티프로세스 관련 클래스 같은 일부 파이썬 유형은 저장 불가능
      • 사용되는 백엔드에 의해 XCom값의 저장 크기가 제한될 수 있음
        • 기본적으로 XCom은 Airflow의 메타스토어에 저장되며 크기가 제한됨
        • SQLite: BLOB, 2GB
        • PostgreSQL: BYTEA, 1GB
        • MySQL: BLOB, 64KB
    • XCom을 적절하게 사용하면 강력한 도구가 되지만, 발생할 수 있는 오류를 방지하기 위해
      task 간 의존성을 명확하게 기록하고 사용법을 신중히 검토해야 함
  • Airflow2에서는 XCom을 유연하게 활용하기 위해 커스텀 XCom 백엔드를 지정할 수 있는 옵션이 추가됨
    • 커스텀 클래스를 정의하여 XCom을 저장 및 검색할 수 있음
    • BaseXCom 기본 클래스가 상속되어야 하고, 값을 직렬화 및 역직렬화(deserialization) 하기 위해
      두 가지 정적 메서드를 각각 구현해야 함
    • 커스텀 백엔드 클래스에서 직렬화 메서드는 XCom 값이 operator 내에서 게시될 때마다 호출되는 반면,
      역직렬화 메서드는 XCom값이 백엔드에서 가져올 때 호출됨
    • 커스텀 XCom 백엔드는 XCom 값 저장 선택을 다양하게 함 (e. g. Azure Blob storage, Amazon S3, GCS)

 

Taskflow API로 파이썬 task 연결

  • Taskflow API: 객체 지향 operator API보다 파이썬 함수 사용법에 가까운 구문을 사용하여
    파이썬 task와 task 간 의존성을 좀 더 간단하게 구현할 수 있도록 제공
    • 장점: XCom의 task 간 의존성을 확인하지 못하는 문제점을
      Taskflow API는 함수 내의 task 간의 의존성을 숨기지 않고 task 간의 값을 명시적으로 전달함으로써 해결 가능
    • 단점: PythonOperator를 사용하여 구현되는 파이썬 task로 제한됨
      • Airflow operator와 관련된 task는 일반 API를 사용하여 task 및 task 의존성을 정의해야 함
  • 많은 task 연결에 API로 구현할 때에는 Taskflow API를 활용
    • Airflow2는 Taskflow API로 파이썬 task 및 의존성을 정의하기 위한 새로운 decorator-based API를 제공
  • Taskflow 유형 task 간에 전달된 데이터는 XCom을 통해 저장됨
    • 전달된 모든 값은 XCom의 제약 사항이 적용됨
    • 데이터 세트의 크기 또한 XCom의 백엔드에 의해 제한될 수 있음
  • PythonOperator를 사용하고 XCom으로 데이터를 전달하는 것보다 코드를 단순화할 수 있음
    • PythonOperator를 이용해 Airflow task를 생성해야 하는 단점이 있음
    • 값을 공유하기 위해 xcom_push, xcom_pull을 명시적으로 사용하여 값을 전송 및 반환해야 함
    • Taskflow API는 파이썬 함수를 task로 쉽게 변환하고, DAG 정의에서 task 간에 데이터 공유를 명확하게 함
from airflow import DAG
from airflow.decorators import task


with DAG(
    dag_id="12_taskflow",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@daily",
) as dag:

    @task
    def train_model():
        model_id = str(uuid.uuid4())
        return model_id

    @task
    def deploy_model(model_id: str):
        print(f"Deploying model {model_id}")

    model_id = train_model()
    deploy_model(model_id)

 

  • Taskflow API를 활용한 예시 코드
import uuid

import airflow

from airflow import DAG
from airflow.decorators import task
from airflow.operators.dummy import DummyOperator


with DAG(
    dag_id="13_taskflow_full",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@daily",
) as dag:
    start = DummyOperator(task_id="start")

    fetch_sales = DummyOperator(task_id="fetch_sales")
    clean_sales = DummyOperator(task_id="clean_sales")

    fetch_weather = DummyOperator(task_id="fetch_weather")
    clean_weather = DummyOperator(task_id="clean_weather")

    join_datasets = DummyOperator(task_id="join_datasets")

    start >> [fetch_sales, fetch_weather]
    fetch_sales >> clean_sales
    fetch_weather >> clean_weather
    [clean_sales, clean_weather] >> join_datasets

    @task
    def train_model():
        model_id = str(uuid.uuid4())
        return model_id

    @task
    def deploy_model(model_id: str):
        print(f"Deploying model {model_id}")

    model_id = train_model()
    deploy_model(model_id)

    join_datasets >> model_id
반응형

댓글