This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-7-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0c9c86a3b7f1fd8d2974eaf26a8621bc0fe15374
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Sep 27 09:25:46 2023 -0700

    Fail dag test if defer without triggerer (#34619)
    
    If user runs dag.test and task defers and no triggerer is running, we 
should fail so user does not sit there waiting forever.
    
    ---------
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
    (cherry picked from commit e81bb487796780705f6df984fbfed04f555943d7)
---
 airflow/models/dag.py                  | 41 ++++++++++++++++++++++++++++++----
 tests/cli/commands/test_dag_command.py | 37 ++++++++++++++++++++++++++++++
 2 files changed, 74 insertions(+), 4 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index ca3cce4cc4..e3f575f650 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -96,7 +96,13 @@ from airflow.models.dagcode import DagCode
 from airflow.models.dagpickle import DagPickle
 from airflow.models.dagrun import RUN_ID_REGEX, DagRun
 from airflow.models.param import DagParam, ParamsDict
-from airflow.models.taskinstance import Context, TaskInstance, 
TaskInstanceKey, clear_task_instances
+from airflow.models.taskinstance import (
+    Context,
+    TaskInstance,
+    TaskInstanceKey,
+    TaskReturnCode,
+    clear_task_instances,
+)
 from airflow.secrets.local_filesystem import LocalFilesystemBackend
 from airflow.security import permissions
 from airflow.stats import Stats
@@ -276,6 +282,14 @@ def get_dataset_triggered_next_run_info(
     }
 
 
+class _StopDagTest(Exception):
+    """
+    Raise when DAG.test should stop immediately.
+
+    :meta private:
+    """
+
+
 @functools.total_ordering
 class DAG(LoggingMixin):
     """
@@ -2758,7 +2772,17 @@ class DAG(LoggingMixin):
                 try:
                     add_logger_if_needed(ti)
                     ti.task = tasks[ti.task_id]
-                    _run_task(ti, session=session)
+                    ret = _run_task(ti, session=session)
+                    if ret is TaskReturnCode.DEFERRED:
+                        if not _triggerer_is_healthy():
+                            raise _StopDagTest(
+                                "Task has deferred but triggerer component is 
not running. "
+                                "You can start the triggerer by running 
`airflow triggerer` in a terminal."
+                            )
+                except _StopDagTest:
+                    # Let this exception bubble out and not be swallowed by the
+                    # except block below.
+                    raise
                 except Exception:
                     self.log.exception("Task failed; ti=%s", ti)
         if conn_file_path or variable_file_path:
@@ -3886,7 +3910,14 @@ class DagContext:
             return None
 
 
-def _run_task(ti: TaskInstance, session):
+def _triggerer_is_healthy():
+    from airflow.jobs.triggerer_job_runner import TriggererJobRunner
+
+    job = TriggererJobRunner.most_recent_job()
+    return job and job.is_alive()
+
+
+def _run_task(ti: TaskInstance, session) -> TaskReturnCode | None:
     """
     Run a single task instance, and push result to Xcom for downstream tasks.
 
@@ -3896,18 +3927,20 @@ def _run_task(ti: TaskInstance, session):
     Args:
         ti: TaskInstance to run
     """
+    ret = None
     log.info("*****************************************************")
     if ti.map_index > 0:
         log.info("Running task %s index %d", ti.task_id, ti.map_index)
     else:
         log.info("Running task %s", ti.task_id)
     try:
-        ti._run_raw_task(session=session)
+        ret = ti._run_raw_task(session=session)
         session.flush()
         log.info("%s ran successfully!", ti.task_id)
     except AirflowSkipException:
         log.info("Task Skipped, continuing")
     log.info("*****************************************************")
+    return ret
 
 
 def _get_or_create_dagrun(
diff --git a/tests/cli/commands/test_dag_command.py 
b/tests/cli/commands/test_dag_command.py
index 1bb3541b7c..2387eebcb4 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -34,9 +34,13 @@ from airflow import settings
 from airflow.api_connexion.schemas.dag_schema import DAGSchema
 from airflow.cli import cli_parser
 from airflow.cli.commands import dag_command
+from airflow.decorators import task
 from airflow.exceptions import AirflowException
 from airflow.models import DagBag, DagModel, DagRun
+from airflow.models.baseoperator import BaseOperator
+from airflow.models.dag import _StopDagTest
 from airflow.models.serialized_dag import SerializedDagModel
+from airflow.triggers.temporal import TimeDeltaTrigger
 from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.types import DagRunType
@@ -816,3 +820,36 @@ class TestCliDags:
         )
         dag_command.dag_test(cli_args)
         assert "data_interval" in mock__get_or_create_dagrun.call_args.kwargs
+
+    def test_dag_test_no_triggerer(self, dag_maker):
+        with dag_maker() as dag:
+
+            @task
+            def one():
+                return 1
+
+            @task
+            def two(val):
+                return val + 1
+
+            class MyOp(BaseOperator):
+                template_fields = ("tfield",)
+
+                def __init__(self, tfield, **kwargs):
+                    self.tfield = tfield
+                    super().__init__(**kwargs)
+
+                def execute(self, context, event=None):
+                    if event is None:
+                        print("I AM DEFERRING")
+                        
self.defer(trigger=TimeDeltaTrigger(timedelta(seconds=20)), 
method_name="execute")
+                        return
+                    print("RESUMING")
+                    return self.tfield + 1
+
+            task_one = one()
+            task_two = two(task_one)
+            op = MyOp(task_id="abc", tfield=str(task_two))
+            task_two >> op
+        with pytest.raises(_StopDagTest, match="Task has deferred but 
triggerer component is not running"):
+            dag.test()

Reply via email to