물론 airflow 작동을 하는 DAGs 폴더는 변경 가능하다. 기본적인 경로명을 확인하고 해당 경로명이 의도한게 맞다면 폴더를 만들고 해당 경로에 파이썬 파일을 작성해 저장하면 된다.
cat ~/airflow/airflow.cfg | grep dags_folder
>> dags_folder = /home/ubuntu/airflow/dags
# 폴더 생성
mkdir -p ~/airflow/dags/
먼저 필요한 라이브러리를 참조하고 DAG 소유자, 이전 실행 결과에 종속여부, DAG 시작 날짜 설정, 테스크 실패시 재시도 횟수를 정의 할 수 있다.
# 기본 인자 정의
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
# 기본 인자 정의
default_args = {
'owner': 'user',
'depends_on_past': False,
'start_date': days_ago(1),
'retries': 1,
}
그 후 DAG를 정의해야합니다. 아래 코드에서는 DAG 이름, 기본 인자를 사용, DAG 설명, 스케줄 없이 수동 실행, 과거 실행을 캐치업하지 않음으로 설정돼 있습니다.
with DAG(
'sequential_task_test',
default_args=default_args,
description='A test DAG for sequential task execution',
schedule_interval=None, # 스케줄 없이 수동 실행
catchup=False,
) as dag:
# 테스트 상세 작성
pass
스케줄링의 경우 다양한 표시 방법으로 표현 가능합니다.
minute hour day_of_month month day_of_week
schedule_interval='0 12 * * *'
schedule_interval='@daily'
그 후 테스크를 정의해야 합니다. 이 과정에서 웹 API 통신 혹은 웹 크롤링 작업을 할 수 있습니다. PythonOperator를 통해 테스크를 정의되며 정의 내용은 태스크 ID, 실행할 Python 함수이며 이에도 다양한 정의 요소가 존재합니다.
as dag:
def task_1_function():
print("Task 1 executed")
def task_2_function():
print("Task 2 executed")
task_1 = PythonOperator(
task_id='task_1',
python_callable=task_1_function,
)
task_2 = PythonOperator(
task_id='task_2',
python_callable=task_2_function,
)
# task_1이 완료된 후 task_2 실행
task_1 >> task_2
제일 마지막에서 있는 task_1 >> task_2
처럼 실행 순서를 정의할 수 있습니다. 해당 요소는 한 줄만 들어가면 됩니다. 지금은 예시로 3줄 작성
as dag:
# ... 테스트들 정의
# task_1이 완료된 후 task_2 실행
task_1 >> task_2
# 다중 종속 task_1 -> task_2 -> task_3 순으로 실행
task_1 >> task_2 >> task_3
# task_1 실행 후 동시에 task_2와 task_3 실행
task_1 >> [task_2, task_3]
간단한 출력문을 통해 Airflow 구조를 기초적으로 이해하는 전체 코드 입니다!
# 기본 인자 정의
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
# 기본 인자 정의
default_args = {
'owner': 'user',
'depends_on_past': False,
'start_date': days_ago(1),
'retries': 1,
}
# DAG 정의
with DAG(
'sequential_task_test',
default_args=default_args,
description='A test DAG for sequential task execution',
schedule_interval=None, # 스케줄 없이 수동 실행
catchup=False,
) as dag:
def task_1_function():
print("Task 1 executed")
def task_2_function():
print("Task 2 executed")
task_1 = PythonOperator(
task_id='task_1',
python_callable=task_1_function,
)
task_2 = PythonOperator(
task_id='task_2',
python_callable=task_2_function,
)
# task_1이 완료된 후 task_2 실행
task_1 >> task_2
댓글