-
Airflow - BranchAirflow 2024. 1. 21. 18:25
Branch
git에서도 흔히 사용되는 기능이다.
"가지"라는 뜻으로 나무가 가지를 나뉜어 자라는 것처럼
DAG, Task 내부에서 분기 기능이 필요할 경우, 사용된다.Branch를 어디에 적용할까?
Branch를 누가봐도 if문 혹은 Airflow에서 기본적으로 제공하고 이용할 줄 알지만, 효용성이 필요하다.
카드사의 새로운 시리즈
얼마전, H카드사에서 A신용카드를 에디션1에서 혜택, 실적, 카드 디자인을 리뉴얼하여 에디션2라는 카드를 새로 출시한다.
A 신용카드 에디션1을 기존에 이용하던 고객들은 실적, 혜택을 그대로 유지하고 에디션 2를 새로 발급하여 이용하는 고객들의 경우, 에디션2만의 혜택과 에디션1의 일부 혜택을 이어가는 형식이었다.
H 카드사에서는 익월부터 에디션1 뿐만아닌, 에디션2의 실적계산, 청구요금에 대해 같은 A 신용카드이기 때문에 분기 처리가 필요할 것이다.
A 신용카드 에디션 2를 위한 금액 산정, 실적산정을 하는 DAG를 Branch 처리하기 위한 방법을 알아보자.
1. Task 내부에 분기처리하기.
task 내부에 if문을 통해 간단히 분기 처리가 가능하다.
#...DB에서 Ed1, Ed2의 고객 목록들을 모두 읽어 kargs에 집어넣었다는 가정 def _A_credit_card_calculate(**kargs): ed1Type = 'ed1'; if kargs['ti'].xcom_pull(key='card_type_xcom') == ed1Type: ed1_calc_task(**kargs) else: ed2_calc_task(**kargs) A_credit_card_calculate_task = PythonOperator( task_id='A_credit_card_calculate_task', python_callable=_A_credit_card_calculate)
내부에서 분기하여 처리하면 빠르지만, DAG내에 오류 발생시 추적이 힘들어질 가능성이 크다.
(단, 로직이 복잡하지않은 경우 task 내부에서 처리해도 무관하다고 생각이든다.)2. DAG를 따로 두어 분기처리하기.
BranchPythonOperator를 통해서 내부 분기 처리후 return시 반환 할 task_id를 문자열로 반환하면 조건에 맞게 실행이 된다.
def _A_credit_card_edition_branch(**kargs): ed1Type = 'ed1'; if kargs['ti'].xcom_pull(key='card_type_xcom') == ed1Type: return 'ed1_calc_task' else: return 'ed2_calc_task' A_credit_card_branch_task = BranchPythonOperator( task_id='A_credit_card_edition_branch_task', python_callable=_A_credit_card_edition_branch) A_credit_card_branch_task >> [ed1_calc_task, ed2_calc_task]
이렇게 구현하면 좀 더 세부적으로 task를 구분지어 관리가 편하고, 청구내역 혹은 이달 실적에 대해 같은 성격을 가진 task를 재사용 할 수 있어 효용성이 드러난다.
'Airflow' 카테고리의 다른 글
Airflow - Testing (0) 2024.03.25 Airflow - Sensor (0) 2024.02.18 Airflow - Task 의존성 (0) 2024.01.21 Airflow - DAG, Task, Operator, Scheduling (0) 2023.12.20 Airflow - 에어플로우가 구동되려면. (0) 2023.12.19