This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-7-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0c9c86a3b7f1fd8d2974eaf26a8621bc0fe15374 Author: Daniel Standish <[email protected]> AuthorDate: Wed Sep 27 09:25:46 2023 -0700 Fail dag test if defer without triggerer (#34619) If user runs dag.test and task defers and no triggerer is running, we should fail so user does not sit there waiting forever. --------- Co-authored-by: Tzu-ping Chung <[email protected]> (cherry picked from commit e81bb487796780705f6df984fbfed04f555943d7) --- airflow/models/dag.py | 41 ++++++++++++++++++++++++++++++---- tests/cli/commands/test_dag_command.py | 37 ++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 4 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index ca3cce4cc4..e3f575f650 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -96,7 +96,13 @@ from airflow.models.dagcode import DagCode from airflow.models.dagpickle import DagPickle from airflow.models.dagrun import RUN_ID_REGEX, DagRun from airflow.models.param import DagParam, ParamsDict -from airflow.models.taskinstance import Context, TaskInstance, TaskInstanceKey, clear_task_instances +from airflow.models.taskinstance import ( + Context, + TaskInstance, + TaskInstanceKey, + TaskReturnCode, + clear_task_instances, +) from airflow.secrets.local_filesystem import LocalFilesystemBackend from airflow.security import permissions from airflow.stats import Stats @@ -276,6 +282,14 @@ def get_dataset_triggered_next_run_info( } +class _StopDagTest(Exception): + """ + Raise when DAG.test should stop immediately. + + :meta private: + """ + + @functools.total_ordering class DAG(LoggingMixin): """ @@ -2758,7 +2772,17 @@ class DAG(LoggingMixin): try: add_logger_if_needed(ti) ti.task = tasks[ti.task_id] - _run_task(ti, session=session) + ret = _run_task(ti, session=session) + if ret is TaskReturnCode.DEFERRED: + if not _triggerer_is_healthy(): + raise _StopDagTest( + "Task has deferred but triggerer component is not running. " + "You can start the triggerer by running `airflow triggerer` in a terminal." + ) + except _StopDagTest: + # Let this exception bubble out and not be swallowed by the + # except block below. + raise except Exception: self.log.exception("Task failed; ti=%s", ti) if conn_file_path or variable_file_path: @@ -3886,7 +3910,14 @@ class DagContext: return None -def _run_task(ti: TaskInstance, session): +def _triggerer_is_healthy(): + from airflow.jobs.triggerer_job_runner import TriggererJobRunner + + job = TriggererJobRunner.most_recent_job() + return job and job.is_alive() + + +def _run_task(ti: TaskInstance, session) -> TaskReturnCode | None: """ Run a single task instance, and push result to Xcom for downstream tasks. @@ -3896,18 +3927,20 @@ def _run_task(ti: TaskInstance, session): Args: ti: TaskInstance to run """ + ret = None log.info("*****************************************************") if ti.map_index > 0: log.info("Running task %s index %d", ti.task_id, ti.map_index) else: log.info("Running task %s", ti.task_id) try: - ti._run_raw_task(session=session) + ret = ti._run_raw_task(session=session) session.flush() log.info("%s ran successfully!", ti.task_id) except AirflowSkipException: log.info("Task Skipped, continuing") log.info("*****************************************************") + return ret def _get_or_create_dagrun( diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 1bb3541b7c..2387eebcb4 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -34,9 +34,13 @@ from airflow import settings from airflow.api_connexion.schemas.dag_schema import DAGSchema from airflow.cli import cli_parser from airflow.cli.commands import dag_command +from airflow.decorators import task from airflow.exceptions import AirflowException from airflow.models import DagBag, DagModel, DagRun +from airflow.models.baseoperator import BaseOperator +from airflow.models.dag import _StopDagTest from airflow.models.serialized_dag import SerializedDagModel +from airflow.triggers.temporal import TimeDeltaTrigger from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.types import DagRunType @@ -816,3 +820,36 @@ class TestCliDags: ) dag_command.dag_test(cli_args) assert "data_interval" in mock__get_or_create_dagrun.call_args.kwargs + + def test_dag_test_no_triggerer(self, dag_maker): + with dag_maker() as dag: + + @task + def one(): + return 1 + + @task + def two(val): + return val + 1 + + class MyOp(BaseOperator): + template_fields = ("tfield",) + + def __init__(self, tfield, **kwargs): + self.tfield = tfield + super().__init__(**kwargs) + + def execute(self, context, event=None): + if event is None: + print("I AM DEFERRING") + self.defer(trigger=TimeDeltaTrigger(timedelta(seconds=20)), method_name="execute") + return + print("RESUMING") + return self.tfield + 1 + + task_one = one() + task_two = two(task_one) + op = MyOp(task_id="abc", tfield=str(task_two)) + task_two >> op + with pytest.raises(_StopDagTest, match="Task has deferred but triggerer component is not running"): + dag.test()
