-
Airflow - TestingAirflow 2024. 3. 25. 03:03
Airflow Test
1. DAG 무결성 테스트 - Airflow DagBag
내가 작성한 DAG가 잘 등록이되었는지 확인하려고할 때 유용.
from airflow.models import DagBag def test_dag_loading(): dag_bag = DagBag(dag_folder='dags',include_examples=False) assert dag_bag.size() == 2
dag_folders 파라미터에 dag들이 들어있는 디렉토리로 설정한다.
include_examples=False 설정으로 기본 예제 DAG들을 무시할 수 있다.
더 자세히def test_task_dependencies(): ## DAG, 테스크 무결성 테스트 dag_bag = DagBag(dag_folder='dags',include_examples=False) dag = dag_bag.get_dag('test') tasks = dag.tasks dependencies = { 'start': {'downstream': ['print_messages'], 'upstream': []}, 'print_messages': {'downstream': ['end'], 'upstream': ['start']}, 'end' : {'downstream': [], 'upstream': ['print_messages']}, } for task in tasks: assert task.downstream_task_ids == set(dependencies[task.task_id]['downstream']) assert task.upstream_task_ids == set(dependencies[task.task_id]['upstream'])
위와같은 형식으로 지정시 더 자세한 의존성에 대해 검증이 가능하다.
2. DAG 무결성 검사 - Airflow check_cycle
pytest를 이용해서 DAG에 이상이 없는지 확인 할 수있다.
import glob import importlib.util import os from airflow import DAG import pytest from airflow.utils.dag_cycle_tester import check_cycle DAG_PATH = os.path.join(os.path.dirname(__file__), "..", 'dags/**/*.py') # 테스트 할 DAG, task들이 존재하는 곳으로 설정 DAG_FILES = glob.glob(DAG_PATH, recursive=True) # DAG 파일들을 DAG_PATH 기준으로 하위에 모든 파일들을 리스트로 불러온다 @pytest.mark.parametrize('dag_file', DAG_FILES) def test_dag_integrately(dag_file): module_name, _ = os.path.splitext(dag_file) # # print(module_name); module_path = os.path.join(DAG_PATH, dag_file) mod_spec = importlib.util.spec_from_file_location(module_name, module_path) module = importlib.util.module_from_spec(mod_spec) mod_spec.loader.exec_module(module) dag_objs = [var for var in vars(module).values() if isinstance(var, DAG)] for dag in dag_objs: check_cycle(dag) assert dag_objs
ex. task간의 종속성에 대한 실패 케이스 확인
start >> refactored_get_data_from_wikipedia >> end >> start
위와같이 start가 가장 마지막으로 start가 옴으로 선형 task 의존성을 무너뜨려보면 결과는 아래와 같이 출력된다.
3. Task mocking - DagBag
Task를 mocking하여 SshOperator나 S3Operator등을 임의로 실행했다는 셈 치고 DAG를 실행할 수 있다.
class TestDagBackTestDag(TestCase): @mock.patch('airflow.operators.python.PythonOperator.execute') def test_task_execution(self, mock_execute): dag_bag = DagBag(dag_folder='dags',include_examples=False) DEFAULT_DATE = pendulum.now('UTC').add(seconds=-5) dag_bag.get_dag('test').get_task('start').run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) DEFAULT_DATE2 = pendulum.now('UTC').add(seconds=-3) dag_bag.get_dag('test').get_task('print_messages').run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE2) assert mock_execute.call_count == 1
+ Python test
당연히 위보다 더 자세하게 PythonOperator에 들어갈 callable에대한 비즈니스 로직이있다면 해당 테스팅도 가능하다.
(기존 유닛테스트 방식)정리하자면
Airflow 테스트 방법이 여러가지있음을 확인했다.
DAG 유효성 테스트, Task 유효성 테스트, Task 실행 테스트, python operator 유닛 테스트 등등...
그리고 이번에 공부하면서 DagBag을 이용하면 편리하게 테스트가 가능함을 알게 되었다...
책으로 봤을 때는 DagBag보다 불편했다. 일일히 파일을 건드리는게 테스트 사용하는 입장으로써는 사용성이 좋지않아서...
그리고 CI/CD시 이상적인 테스트 레이어가 아래와 같이 가능할 것으로 보인다.LOW(저수준) --------------------------------------------------------------- HIGH(고수준) (PythonOperator - callable )Unit Test > Task Unit Test > DAG Unit Test > Job E2E Test
'Airflow' 카테고리의 다른 글
Airflow - Sensor (0) 2024.02.18 Airflow - Branch (0) 2024.01.21 Airflow - Task 의존성 (0) 2024.01.21 Airflow - DAG, Task, Operator, Scheduling (0) 2023.12.20 Airflow - 에어플로우가 구동되려면. (0) 2023.12.19