Airflow 구조 이해하기

목록으로 돌아가기

Airflow 디렉토리 구조

물론 airflow 작동을 하는 DAGs 폴더는 변경 가능하다. 기본적인 경로명을 확인하고 해당 경로명이 의도한게 맞다면 폴더를 만들고 해당 경로에 파이썬 파일을 작성해 저장하면 된다.

cat ~/airflow/airflow.cfg | grep dags_folder
>> dags_folder = /home/ubuntu/airflow/dags

# 폴더 생성
mkdir -p ~/airflow/dags/

기본 인자 정의

먼저 필요한 라이브러리를 참조하고 DAG 소유자, 이전 실행 결과에 종속여부, DAG 시작 날짜 설정, 테스크 실패시 재시도 횟수를 정의 할 수 있다.

# 기본 인자 정의
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

# 기본 인자 정의
default_args = {
    'owner': 'user',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'retries': 1,
}

DAG 정의

그 후 DAG를 정의해야합니다. 아래 코드에서는 DAG 이름, 기본 인자를 사용, DAG 설명, 스케줄 없이 수동 실행, 과거 실행을 캐치업하지 않음으로 설정돼 있습니다.

with DAG(
    'sequential_task_test',
    default_args=default_args,
    description='A test DAG for sequential task execution',
    schedule_interval=None,  # 스케줄 없이 수동 실행
    catchup=False,
) as dag:
    # 테스트 상세 작성
    pass

스케줄링의 경우 다양한 표시 방법으로 표현 가능합니다.

minute hour day_of_month month day_of_week

schedule_interval='0 12 * * *'
schedule_interval='@daily'

테스크 정의

그 후 테스크를 정의해야 합니다. 이 과정에서 웹 API 통신 혹은 웹 크롤링 작업을 할 수 있습니다. PythonOperator를 통해 테스크를 정의되며 정의 내용은 태스크 ID, 실행할 Python 함수이며 이에도 다양한 정의 요소가 존재합니다.

as dag:

    def task_1_function():
        print("Task 1 executed")

    def task_2_function():
        print("Task 2 executed")

    task_1 = PythonOperator(
        task_id='task_1',
        python_callable=task_1_function,
    )

    task_2 = PythonOperator(
        task_id='task_2',
        python_callable=task_2_function,
    )

    # task_1이 완료된 후 task_2 실행
    task_1 >> task_2

테스크를 실행시키는 순서?

제일 마지막에서 있는 task_1 >> task_2처럼 실행 순서를 정의할 수 있습니다. 해당 요소는 한 줄만 들어가면 됩니다. 지금은 예시로 3줄 작성

as dag:

    # ... 테스트들 정의

    # task_1이 완료된 후 task_2 실행
    task_1 >> task_2
    
    # 다중 종속 task_1 -> task_2 -> task_3 순으로 실행
    task_1 >> task_2 >> task_3 
    
    # task_1 실행 후 동시에 task_2와 task_3 실행
    task_1 >> [task_2, task_3]

전체 코드

간단한 출력문을 통해 Airflow 구조를 기초적으로 이해하는 전체 코드 입니다!

# 기본 인자 정의
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

# 기본 인자 정의
default_args = {
    'owner': 'user',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'retries': 1,
}

# DAG 정의
with DAG(
    'sequential_task_test',
    default_args=default_args,
    description='A test DAG for sequential task execution',
    schedule_interval=None,  # 스케줄 없이 수동 실행
    catchup=False,
) as dag:

    def task_1_function():
        print("Task 1 executed")

    def task_2_function():
        print("Task 2 executed")

    task_1 = PythonOperator(
        task_id='task_1',
        python_callable=task_1_function,
    )

    task_2 = PythonOperator(
        task_id='task_2',
        python_callable=task_2_function,
    )

    # task_1이 완료된 후 task_2 실행
    task_1 >> task_2



author-profile
Written by 유찬영

댓글