1fanwang opened a new pull request, #68936:
URL: https://github.com/apache/airflow/pull/68936

   ## Why
   
   When `TriggerDagRunOperator` runs with `wait_for_completion=True` 
(synchronous,
   non-deferrable), a worker crash *while it is polling* turns into a duplicate.
   
   On retry, the operator recomputes a fresh `run_id` (with no `logical_date`/
   `trigger_run_id` it derives one from `utcnow()`), so the runner triggers a
   **second** child run instead of reconnecting to the one the first attempt
   already started — or, with a fixed `run_id`, the retry fails with
   `DagRunAlreadyExists`. Either way a transient worker blip mid-wait either
   duplicates the downstream work or fails a task whose triggered run is healthy
   and still running.
   
   This is the same duplicate-job-on-retry problem `ResumableJobMixin`
   (`durable=True`, Airflow 3.3) solves for submit-and-poll operators. The mixin
   targets operators whose `execute()` does the submit+poll; on Airflow 3 the
   trigger/wait happens in the task runner (the operator raises
   `DagRunTriggerException`), so the same persist-then-reconnect contract is
   applied where the wait actually lives.
   
   ## What
   
   Add an opt-in `durable` flag to `TriggerDagRunOperator` (default `False`, no
   behavior change). When `durable=True` and waiting synchronously:
   
   - the triggered `run_id` is persisted to `task_state_store` **before** 
polling
     starts;
   - on retry the runner reads it back and:
     - **reconnects** and resumes the wait if the run is still active;
     - **returns success** without resubmitting if it already finished in an
       allowed state;
     - **triggers a fresh run** if the prior one failed, is gone, or its state 
is
       unreadable.
   
   Scoped to the synchronous wait; the deferrable path is unchanged and left 
for a
   follow-up. Crash recovery is silently disabled when `task_state_store` is
   unavailable (degrades to today's behavior).
   
   ## Tests
   
   - Four unit tests covering the contract: persist-before-poll, reconnect to a
     running prior run, short-circuit on an already-succeeded prior run, 
resubmit
     after a failed prior run.
   - Existing `TriggerDagRunOperator` tests are untouched and pass — opt-in 
means
     no behavior change on the default path.
   
   ### End-to-end (live, Breeze built from `main`)
   
   A parent `TriggerDagRunOperator(wait_for_completion=True)` triggers a child 
Dag
   that sleeps; the parent's worker is `SIGKILL`ed while it polls, and the 
scheduler
   retries the task.
   
   | | child runs from one parent task | parent task on retry |
   |---|---|---|
   | `durable=False` (today) | **2** — a duplicate child run | re-triggers a 
fresh run |
   | `durable=True` (this PR) | **1** | reconnects to the in-flight run, 
succeeds |
   
   <details><summary>Raw before / after</summary>
   
   ```text
   BEFORE (durable=False) — worker killed mid-wait, scheduler retried 
(try_number=2):
     child dag runs:
       manual__2026-...T08:17:47Z  success   <- attempt 1
       manual__2026-...T08:19:46Z  success   <- attempt 2, DUPLICATE
     => 2 runs from one parent task
   
   AFTER (durable=True) — same crash, try_number=2:
     attempt=2.log: "Reconnecting to run triggered on a prior attempt."
                    run_id=manual__2026-...T08:53:51Z
     parent run: success   (trigger_child succeeded on try_number=2)
     child dag runs:
       manual__2026-...T08:53:51Z  success   <- single run, reconnected
     => 1 run from one parent task
   ```
   
   </details>
   
   ## Risk
   
   Opt-in, so the default path is unchanged. The new path adds two
   `task_state_store` round-trips (one read, one write) per durable run, only 
when
   `durable=True`. A run deleted between attempts is treated as "resubmit 
fresh".
   The submit→persist gap is the same small window documented on 
`ResumableJobMixin`:
   a crash between triggering and persisting falls back to a fresh trigger on 
retry.
   
   ## Open question
   
   Default is `False` to preserve behavior; `ResumableJobMixin` defaults 
`durable=True`.
   Worth deciding whether this should eventually default on for synchronous 
waits.
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [ ] Yes (please specify the tool below)
   
   ---
   
   * Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information. Note: commit author/co-author name and email in commits 
become permanently public when merged.
   * For fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   * When adding dependency, check compliance with the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).
   * For significant user-facing changes create newsfragment: 
`{pr_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
 You can add this file in a follow-up commit after the PR is created so you 
know the PR number.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to