This is an automated email from the ASF dual-hosted git repository.
kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2a924677557 Retry TriggerDagRunOperator when the triggered DagRun
fails (#65390)
2a924677557 is described below
commit 2a92467755718f29d6ef3464323bab41309eab1b
Author: Sam Wheating <[email protected]>
AuthorDate: Mon Jun 8 21:33:01 2026 +0100
Retry TriggerDagRunOperator when the triggered DagRun fails (#65390)
* fix(task-sdk): honour retry eligibility in TriggerDagRunOperator on
failed triggered DAG
* fail when run exists and skip_when_already_exists=false
* fix typing
---------
Co-authored-by: Kushal Bohra <[email protected]>
---
.../src/airflow/sdk/execution_time/task_runner.py | 8 +----
.../task_sdk/execution_time/test_task_runner.py | 41 ++++++++++++++++++++++
2 files changed, 42 insertions(+), 7 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 123667bfa1d..beda0728c7d 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1815,13 +1815,7 @@ def _handle_trigger_dag_run(
log.error(
"DagRun finished with failed state.",
dag_id=drte.trigger_dag_id, state=comms_msg.state
)
- msg = TaskState(
- state=TaskInstanceState.FAILED,
- end_date=datetime.now(tz=timezone.utc),
- rendered_map_index=ti.rendered_map_index,
- )
- state = TaskInstanceState.FAILED
- return msg, state
+ return _handle_current_task_failed(ti)
if comms_msg.state in drte.allowed_states:
log.info(
"DagRun finished with allowed state.",
dag_id=drte.trigger_dag_id, state=comms_msg.state
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index eda5ea9d9cd..f3b07482b4f 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -5035,6 +5035,47 @@ class TestTriggerDagRunOperator:
]
mock_supervisor_comms.assert_has_calls(expected_calls)
+ def test_handle_trigger_dag_run_wait_for_completion_failed_state_retries(
+ self, create_runtime_ti, mock_supervisor_comms
+ ):
+ """When triggered DAG reaches a failed state, task should honor retry
eligibility."""
+ from airflow.providers.standard.operators.trigger_dagrun import
TriggerDagRunOperator
+
+ task = TriggerDagRunOperator(
+ task_id="test_task",
+ trigger_dag_id="test_dag",
+ trigger_run_id="test_run_id",
+ poke_interval=5,
+ wait_for_completion=True,
+ allowed_states=[DagRunState.SUCCESS],
+ failed_states=[DagRunState.FAILED],
+ deferrable=False,
+ )
+ ti = create_runtime_ti(
+
dag_id="test_handle_trigger_dag_run_wait_for_completion_failed_state_retries",
+ run_id="test_run",
+ task=task,
+ should_retry=True,
+ )
+
+ def _send_side_effect(*args, **kwargs):
+ msg = kwargs.get("msg")
+ if msg is None and args:
+ msg = args[0]
+ if isinstance(msg, TriggerDagRun):
+ return OKResponse(ok=True)
+ if isinstance(msg, GetDagRunState):
+ return DagRunStateResult(state=DagRunState.FAILED)
+ return None
+
+ mock_supervisor_comms.send.side_effect = _send_side_effect
+
+ log = mock.MagicMock()
+ with mock.patch("time.sleep", return_value=None):
+ state, _, _ = run(ti, ti.get_template_context(), log)
+
+ assert state == TaskInstanceState.UP_FOR_RETRY
+
@pytest.mark.parametrize(
("allowed_states", "failed_states", "intermediate_state"),
[