This is an automated email from the ASF dual-hosted git repository.

kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a924677557 Retry TriggerDagRunOperator when the triggered DagRun 
fails (#65390)
2a924677557 is described below

commit 2a92467755718f29d6ef3464323bab41309eab1b
Author: Sam Wheating <[email protected]>
AuthorDate: Mon Jun 8 21:33:01 2026 +0100

    Retry TriggerDagRunOperator when the triggered DagRun fails (#65390)
    
    * fix(task-sdk): honour retry eligibility in TriggerDagRunOperator on 
failed triggered DAG
    
    * fail when run exists and skip_when_already_exists=false
    
    * fix typing
    
    ---------
    
    Co-authored-by: Kushal Bohra <[email protected]>
---
 .../src/airflow/sdk/execution_time/task_runner.py  |  8 +----
 .../task_sdk/execution_time/test_task_runner.py    | 41 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 7 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 123667bfa1d..beda0728c7d 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1815,13 +1815,7 @@ def _handle_trigger_dag_run(
                 log.error(
                     "DagRun finished with failed state.", 
dag_id=drte.trigger_dag_id, state=comms_msg.state
                 )
-                msg = TaskState(
-                    state=TaskInstanceState.FAILED,
-                    end_date=datetime.now(tz=timezone.utc),
-                    rendered_map_index=ti.rendered_map_index,
-                )
-                state = TaskInstanceState.FAILED
-                return msg, state
+                return _handle_current_task_failed(ti)
             if comms_msg.state in drte.allowed_states:
                 log.info(
                     "DagRun finished with allowed state.", 
dag_id=drte.trigger_dag_id, state=comms_msg.state
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index eda5ea9d9cd..f3b07482b4f 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -5035,6 +5035,47 @@ class TestTriggerDagRunOperator:
         ]
         mock_supervisor_comms.assert_has_calls(expected_calls)
 
+    def test_handle_trigger_dag_run_wait_for_completion_failed_state_retries(
+        self, create_runtime_ti, mock_supervisor_comms
+    ):
+        """When triggered DAG reaches a failed state, task should honor retry 
eligibility."""
+        from airflow.providers.standard.operators.trigger_dagrun import 
TriggerDagRunOperator
+
+        task = TriggerDagRunOperator(
+            task_id="test_task",
+            trigger_dag_id="test_dag",
+            trigger_run_id="test_run_id",
+            poke_interval=5,
+            wait_for_completion=True,
+            allowed_states=[DagRunState.SUCCESS],
+            failed_states=[DagRunState.FAILED],
+            deferrable=False,
+        )
+        ti = create_runtime_ti(
+            
dag_id="test_handle_trigger_dag_run_wait_for_completion_failed_state_retries",
+            run_id="test_run",
+            task=task,
+            should_retry=True,
+        )
+
+        def _send_side_effect(*args, **kwargs):
+            msg = kwargs.get("msg")
+            if msg is None and args:
+                msg = args[0]
+            if isinstance(msg, TriggerDagRun):
+                return OKResponse(ok=True)
+            if isinstance(msg, GetDagRunState):
+                return DagRunStateResult(state=DagRunState.FAILED)
+            return None
+
+        mock_supervisor_comms.send.side_effect = _send_side_effect
+
+        log = mock.MagicMock()
+        with mock.patch("time.sleep", return_value=None):
+            state, _, _ = run(ti, ti.get_template_context(), log)
+
+        assert state == TaskInstanceState.UP_FOR_RETRY
+
     @pytest.mark.parametrize(
         ("allowed_states", "failed_states", "intermediate_state"),
         [

Reply via email to