1fanwang opened a new pull request, #68936:
URL: https://github.com/apache/airflow/pull/68936
## Why
When `TriggerDagRunOperator` runs with `wait_for_completion=True`
(synchronous,
non-deferrable), a worker crash *while it is polling* turns into a duplicate.
On retry, the operator recomputes a fresh `run_id` (with no `logical_date`/
`trigger_run_id` it derives one from `utcnow()`), so the runner triggers a
**second** child run instead of reconnecting to the one the first attempt
already started — or, with a fixed `run_id`, the retry fails with
`DagRunAlreadyExists`. Either way a transient worker blip mid-wait either
duplicates the downstream work or fails a task whose triggered run is healthy
and still running.
This is the same duplicate-job-on-retry problem `ResumableJobMixin`
(`durable=True`, Airflow 3.3) solves for submit-and-poll operators. The mixin
targets operators whose `execute()` does the submit+poll; on Airflow 3 the
trigger/wait happens in the task runner (the operator raises
`DagRunTriggerException`), so the same persist-then-reconnect contract is
applied where the wait actually lives.
## What
Add an opt-in `durable` flag to `TriggerDagRunOperator` (default `False`, no
behavior change). When `durable=True` and waiting synchronously:
- the triggered `run_id` is persisted to `task_state_store` **before**
polling
starts;
- on retry the runner reads it back and:
- **reconnects** and resumes the wait if the run is still active;
- **returns success** without resubmitting if it already finished in an
allowed state;
- **triggers a fresh run** if the prior one failed, is gone, or its state
is
unreadable.
Scoped to the synchronous wait; the deferrable path is unchanged and left
for a
follow-up. Crash recovery is silently disabled when `task_state_store` is
unavailable (degrades to today's behavior).
## Tests
- Four unit tests covering the contract: persist-before-poll, reconnect to a
running prior run, short-circuit on an already-succeeded prior run,
resubmit
after a failed prior run.
- Existing `TriggerDagRunOperator` tests are untouched and pass — opt-in
means
no behavior change on the default path.
### End-to-end (live, Breeze built from `main`)
A parent `TriggerDagRunOperator(wait_for_completion=True)` triggers a child
Dag
that sleeps; the parent's worker is `SIGKILL`ed while it polls, and the
scheduler
retries the task.
| | child runs from one parent task | parent task on retry |
|---|---|---|
| `durable=False` (today) | **2** — a duplicate child run | re-triggers a
fresh run |
| `durable=True` (this PR) | **1** | reconnects to the in-flight run,
succeeds |
<details><summary>Raw before / after</summary>
```text
BEFORE (durable=False) — worker killed mid-wait, scheduler retried
(try_number=2):
child dag runs:
manual__2026-...T08:17:47Z success <- attempt 1
manual__2026-...T08:19:46Z success <- attempt 2, DUPLICATE
=> 2 runs from one parent task
AFTER (durable=True) — same crash, try_number=2:
attempt=2.log: "Reconnecting to run triggered on a prior attempt."
run_id=manual__2026-...T08:53:51Z
parent run: success (trigger_child succeeded on try_number=2)
child dag runs:
manual__2026-...T08:53:51Z success <- single run, reconnected
=> 1 run from one parent task
```
</details>
## Risk
Opt-in, so the default path is unchanged. The new path adds two
`task_state_store` round-trips (one read, one write) per durable run, only
when
`durable=True`. A run deleted between attempts is treated as "resubmit
fresh".
The submit→persist gap is the same small window documented on
`ResumableJobMixin`:
a crash between triggering and persisting falls back to a fresh trigger on
retry.
## Open question
Default is `False` to preserve behavior; `ResumableJobMixin` defaults
`durable=True`.
Worth deciding whether this should eventually default on for synchronous
waits.
---
##### Was generative AI tooling used to co-author this PR?
- [ ] Yes (please specify the tool below)
---
* Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information. Note: commit author/co-author name and email in commits
become permanently public when merged.
* For fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
* When adding dependency, check compliance with the [ASF 3rd Party License
Policy](https://www.apache.org/legal/resolved.html#category-x).
* For significant user-facing changes create newsfragment:
`{pr_number}.significant.rst`, in
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
You can add this file in a follow-up commit after the PR is created so you
know the PR number.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]