Airflow
-
Airflow - TestingAirflow 2024. 3. 25. 03:03
Airflow Test1. DAG 무결성 테스트 - Airflow DagBag내가 작성한 DAG가 잘 등록이되었는지 확인하려고할 때 유용.from airflow.models import DagBag def test_dag_loading(): dag_bag = DagBag(dag_folder='dags',include_examples=False) assert dag_bag.size() == 2dag_folders 파라미터에 dag들이 들어있는 디렉토리로 설정한다. include_examples=False 설정으로 기본 예제 DAG들을 무시할 수 있다. 더 자세히def test_task_dependencies(): ## DAG, 테스크 무결성 테스트 dag_bag = DagBag(dag_folder='dag..
-
Airflow - SensorAirflow 2024. 2. 18. 18:48
Airflow - Sensor센서는 N개의 Task 중 하나라도 구동완료시 의존하는 다른 Task 하나 이상을 실행시키는데 유용하다. 유스케이스로 한번 알아보자.유스케이스쿠컬팡리라는 쇼핑몰에서 여러 광고를 진열중이다. 이 중 광고가 4가지가 존재한다. 1번째 광고는 쿠컬팡리 구독서비스 가입 이벤트이다. 2번째 광고는 참치회사에서 할인 해주는 행사를 디스플레이한다. 3번째는 강아지 사료 회사에서 할인해주는 상품 디스플레이 4번째는 애플 신제품 오픈 안내 광고 안내이다. 4가지 광고를 조회하는 클라이언트의 IP와 시간의 결과값을 분석해야한다. 같은 IP에서 출발하여 광고를 조회하는 중복된 조회는 스킵한다. 마지막으로 해당 결과 값을 엑셀로 만든 후 마케팅팀과 사업팀에 메일로 전송해야하는 케이스다. 클라이언트..
-
Airflow - BranchAirflow 2024. 1. 21. 18:25
Branch git에서도 흔히 사용되는 기능이다. "가지"라는 뜻으로 나무가 가지를 나뉜어 자라는 것처럼 DAG, Task 내부에서 분기 기능이 필요할 경우, 사용된다. Branch를 어디에 적용할까? Branch를 누가봐도 if문 혹은 Airflow에서 기본적으로 제공하고 이용할 줄 알지만, 효용성이 필요하다. 카드사의 새로운 시리즈 얼마전, H카드사에서 A신용카드를 에디션1에서 혜택, 실적, 카드 디자인을 리뉴얼하여 에디션2라는 카드를 새로 출시한다. A 신용카드 에디션1을 기존에 이용하던 고객들은 실적, 혜택을 그대로 유지하고 에디션 2를 새로 발급하여 이용하는 고객들의 경우, 에디션2만의 혜택과 에디션1의 일부 혜택을 이어가는 형식이었다. H 카드사에서는 익월부터 에디션1 뿐만아닌, 에디션2의 실..
-
Airflow - Task 의존성Airflow 2024. 1. 21. 17:26
선형 의존성 가장 일반적인 유형의 의존성 관계를 정의하는 방법이다. 선형 의존성을 정의하는 방법 task1 >> task2 >> task3 # task2, 3는 이전 task에 의존하는 관계가 되어 # task2는 task1이 끝나야 실행되고, task3는 task2가 끝나야 실행이 된다. Fan-in, Fan-out 형식의 관계 Fan-in - N : 1 관계 1의 위치에 놓이는 task를 실행하기 위해서는 N개의 tasks가 종료되어야 실행된다. Fan-in 의존성을 정의하는 방법 task1 >> [task2, task3] Fan-out - 1 : N 관계 N개들의 tasks가 실행되기 위해서는 1의 task가 반드시 끝나야 실행된다. Fan-out의 N개의 tasks는 병렬 실행이 가능하다. Fan..
-
Airflow - DAG, Task, Operator, SchedulingAirflow 2023. 12. 20. 00:43
DAG (Directed Acyclic Graph)구글에는 폐로가 없는 유향 그래프라고 뜻한다. 사진처럼 단방향으로 이루어진 흐름만이 존재한다. 동그라미 안에있는 것들을 Airflow에서는 Task라고 정의한다. Airflow에서는 이런 동그라미 요소들, Task들의 요소를 조합하여 데이터를 가공하거나 처리한다. 정의한 task는 다중으로 실행도 가능하고, 이를 병렬로도 실행이 가능하다.TaskDAG 하나에 최소 하나를 구성하는 단위이다. Task는 첫 그림처럼 여러가지로 두어, 1번 → 2번 혹은 3번 → 4번 혹은 5번 혹은 종료와 같은 조건식으로 분기하여 실행도 가능하다. DAG내에 Task들을 조합하며 사용할 때에는 이렇게 사용한다.task1 >> task2 >> task3 >> task4task..
-
Airflow - 에어플로우가 구동되려면.Airflow 2023. 12. 19. 01:45
에어플로우 엔진구성 에어플로우 스케줄러 DAG를 분석, 등록하고 워커에 DAG의 스케줄러를 등록한다. 에어플로우 워커 스케줄러에서 예약했던 DAG의 스케줄링을 실행한다. 에어플로우 웹 서버 스케줄러에서 분석한 DAG를 비주얼라이징하고, 워커에서 실행한 DAG의 결과를 보여준다. 실제로 도커 컴포즈로 올린 컨테이너만 보아도 위 그림처럼 나뉘어서 생성이 됨을 확인 할 수 있음. (airflow-trigger는 docker-compose 파일을 확인해보니 주기적으로 에어플로우 job테이블을 확인하여 DAG의 트리거 결과 및 상태를 확인하는 명령어를 실행하는 컨테이너이다.) 번외 23년 11월에 DAG의 start_date 파라미터를 항상 “실행하는 때”로 등록하니, 실행이 안된다. 반드시 과거로 설정해야한다...