반응형
바스 하렌슬락, 율리안 더라위터르, 『Apache Airflow 기반의 데이터 파이프라인』, 김정민, 문선홍, MANNING-제이펍(2022), p62-86.
데이터 검사
- 파이프라인을 구축하기 전에 접근 방식에 대한 기술적 계획을 세우는 것이 중요
- 구축 목적 확인
- 데이터 빈도, 형식, 소스 유형 검사
- 데이터 증분 방식으로 적재하는 방법과 데이터를 다루는 방법 이해 필요
템플릿 작업
- 예시: 위키피디아 페이지 뷰 수 >> zip 다운로드 >> zip 압축 풀기 >> 한 시간 동안 페이지 뷰 수 추출
- 런타임 시 삽입될 변수 ➡️ 이중 괄호 사용 {{ }} (Jinja 템플릿 문자열)
- Jinja: 런타임 시에 템플릿 문자열의 변수와 and 및 or 표현식을 대체하는 템플릿 엔진
- execution_date: task context에서 실행 시 사용할 수 있는 날짜 시간 변수
- 찾아보니 logical_date로 바뀜
- Pendulum의 datetime 객체: https://pendulum.eustace.io/
from airflow.operators.bash import BashOperator
get_data = BashOperator(
task_id="get_data",
bash_command=(
"curl -o /tmp/wikipageviews.gz "
"https://dumps.wikimedia.org/other/pageviews/"
"{{ execution_date.year }}/"
"{{ execution_date.year }}-{{ '{:02}'.format(execution_date.month) }}/"
"pageviews-{{ execution_date.year }}"
"{{ '{:02}'.format(execution_date.month) }}" # 빈 앞자리를 0으로 채우는 월, 일, 시간 값
"{{ '{:02}'.format(execution_date.day) }}-" # 빈 앞자리를 0으로 채우는 월, 일, 시간 값
"{{ '{:02}'.format(execution_date.hour) }}0000.gz" # 빈 앞자리를 0으로 채우는 월, 일, 시간 값
),
dag=dag,
)
- 템플릿화에 사용할 수 있는 변수
- template_fields 목록: https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
from airflow.operators.python import PythonOperator
def _print_context(**kwargs):
print(kwargs)
print_context=PythonOperator(
task_id = "print_context"
python_callable = _print_context
dag=dag
)
PythonOperator 템플릿
- 런타임 콘텍스트로 템플릿화할 수 있는 인수를 사용하지 않고 task context 변수를 활용
- 별도로 런타임 콘텍스트를 적용할 수 있는 python_callable을 사용하기 때문에 표준을 따르지 않음
- python_callable: 함수를 callable 객체로 만들어줌
- callable: 객체가 호출 가능한 상태인지 확인해 줌
- python에서는 __call__()을 구현하는 모든 객체는 호출 가능한 것으로 간주됨
from urllib import request
from airflow.operators.python import PythonOperator
def _get_data(execution_date):
year, month, day, hour, *_ = execution_date.timetuple()
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
output_path = "/tmp/wikipageviews.gz"
request.urlretrieve(url, output_path)
get_data = PythonOperator(task_id="get_data", python_callable=_get_data, dag=dag)
- Airflow2에서는 PythonOperator가 callable 인수 이름으로부터 콘텍스트 변수가 호출 가능한지 판단
- Airflow1처럼 provide_context=True로 설정할 필요 없음
- Airflow 콘텍스트 변수 확인
- **kwargs라는 단일 인수에 모든 키워드 인수를 포함 가능
- **context 인수로 명칭 변경 가능
- 명시적인 인수 정의를 하면 linter, type hinting 도구를 사용할 수 있음
def _print_context(**context):
start = context["execution_date"]
end = context["next_execution_date"]
print(f"Start: {start}, end: {end}")
# Prints e.g.:
# Start: 2019-07-13T14:00:00+00:00, end: 2019-07-13T15:00:00+00:00
- callable 커스텀 변수
- op_args에 제공된 리스트의 각 값이 callable 함수에 전달됨
- op_kwargs에 제공된 키워드 인수가 callable 함수에 전달됨
def _get_data(year, month, day, hour, output_path, **_):
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
request.urlretrieve(url, output_path)
get_data = PythonOperator(
task_id="get_data",
python_callable=_get_data,
op_kwargs={
"year": "{{ execution_date.year }}",
"month": "{{ execution_date.month }}",
"day": "{{ execution_date.day }}",
"hour": "{{ execution_date.hour }}",
"output_path": "/tmp/wikipageviews.gz",
},
dag=dag,
)
- 템플릿 인수 검사: 작업을 실행한 후, "Rendered Template' 버튼을 클릭하여 검사 가능
- Rendered Template: 렌더링 되는 지정된 연산자의 모든 속성과 해당 값들을 표시 (태스크 인스턴스별로 표시됨)
- 위 작업은 Airflow에서 작업을 스케줄 해야 하기 때문에, CLI를 활용하는 것이 효율적
- CLI: airflow tasks render [dag id] [task id] [desired excution date]
다른 시스템과 연결
- Airflow는 XCom이라는 기본 메커니즘을 제공하여 Airflow 메타스토어에서 선택 가능한(pickable) 개체를 저장하고 읽을 수 있음
- pickle: 파이썬의 직렬화 프로토콜
- 직렬화: 메모리 개체를 나중에 다시 읽을 수 있도록 디스크에 저장할 수 있는 형식
- 크기가 작은 오브젝트일 때, XCom을 이용한 피클링이 적합
- 피클링 된 객체는 메타스토어의 blob에 저장됨
- blob(binary large object): 텍스트, 이미지, 사운드, 비디오 등 큰 크기의 파일을 저장할 때 사용하는 이진 데이터
- pickle: 파이썬의 직렬화 프로토콜
- 큰 데이터를 task 간에 전송하려면 Airflow 외부에 데이터를 유지하는 것이 좋음
- 데이터베이스에 대한 연결 설정 및 완료 후 연결 끊기 같은 복잡한 작업은 내부에서 처리됨
- DAG의 template_searchpath 인수: 파일 탐색 경로 템플릿화
- Jinja는 기본 경로(DAG file의 경로)와 template_searchpath로 추가된 경로를 함께 탐색 가능
- template_ext: 특정 유형의 파일을 템플릿으로 처리 가능 (e. g. .sh, .sql)
- PostgreOperator는 Postgres와 통신하기 위해 hook을 인스턴스화함
- hook: 외부 시스템과 통신하기 위한 인터페이스를 제공하는 추상화된 API
- 외부 시스템과의 상호작용을 간편하게 처리 가능
- 인스턴스화된 hook은 연결 생성, Postgres에 쿼리를 전송하고 연결에 대한 종료 작업을 처리
- Operator는 사용자의 요청을 hook으로 전달하는 작업만 담당함
- 즉 operator는 무엇을 해야 하는지 기술하고, hook은 작업 방법을 결정함
- hook: 외부 시스템과 통신하기 위한 인터페이스를 제공하는 추상화된 API
작업 코드
from urllib import request
import airflow.utils.dates
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
dag = DAG(
dag_id="listing_4_20",
start_date=airflow.utils.dates.days_ago(1),
schedule_interval="@hourly",
template_searchpath="/tmp",
max_active_runs=1,
)
def _get_data(year, month, day, hour, output_path):
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
request.urlretrieve(url, output_path)
get_data = PythonOperator(
task_id="get_data",
python_callable=_get_data,
op_kwargs={
"year": "{{ execution_date.year }}",
"month": "{{ execution_date.month }}",
"day": "{{ execution_date.day }}",
"hour": "{{ execution_date.hour }}",
"output_path": "/tmp/wikipageviews.gz",
},
dag=dag,
)
extract_gz = BashOperator(
task_id="extract_gz", bash_command="gunzip --force /tmp/wikipageviews.gz", dag=dag
)
def _fetch_pageviews(pagenames, execution_date):
result = dict.fromkeys(pagenames, 0)
with open("/tmp/wikipageviews", "r") as f:
for line in f:
domain_code, page_title, view_counts, _ = line.split(" ")
if domain_code == "en" and page_title in pagenames:
result[page_title] = view_counts
with open("/tmp/postgres_query.sql", "w") as f:
for pagename, pageviewcount in result.items():
f.write(
"INSERT INTO pageview_counts VALUES ("
f"'{pagename}', {pageviewcount}, '{execution_date}'"
");\n"
)
fetch_pageviews = PythonOperator(
task_id="fetch_pageviews",
python_callable=_fetch_pageviews,
op_kwargs={"pagenames": {"Google", "Amazon", "Apple", "Microsoft", "Facebook"}},
dag=dag,
)
write_to_postgres = PostgresOperator(
task_id="write_to_postgres",
postgres_conn_id="my_postgres",
sql="postgres_query.sql",
dag=dag,
)
get_data >> extract_gz >> fetch_pageviews >> write_to_postgres
반응형
'Workflow > Airflow' 카테고리의 다른 글
『Apache Airflow 기반의 데이터 파이프라인』 Chapter 5. 태스크 간 의존성 정의하기 (2) | 2024.12.10 |
---|---|
『Apache Airflow 기반의 데이터 파이프라인』 Chapter 3. Airflow의 스케줄링 (1) | 2024.01.21 |
『Apache Airflow 기반의 데이터 파이프라인』 Chapter 2. Airflow DAG의 구조 (0) | 2024.01.15 |
『Apache Airflow 기반의 데이터 파이프라인』 Chapter 1. Apache Airflow 살펴보기 (1) | 2024.01.10 |
댓글