Beat-Nick opened a new pull request, #67135:
URL: https://github.com/apache/airflow/pull/67135

   # Databricks Workflow Repair In Airflow 3
   
   ## Goal
   
   Re-enable automatic repair for `DatabricksWorkflowTaskGroup` on Airflow 3 
without
   Flask-AppBuilder, direct metadata database access, or task instance state 
mutation.
   
   When `max_full_run_repairs > 0`, a failed Databricks Workflow run should be 
repuaired with one
   Databricks `repair_run` call per failed batch:
   
   ```python
   {
       "run_id": run_id,
       "rerun_all_failed_tasks": True,
       "rerun_dependent_tasks": True,
   }
   ```
   
   `latest_repair_id` is added on later repairs so Databricks repairs the 
existing run instead
   of starting a new workflow run. `max_full_run_repairs` defaults to `0`, 
preserving current behavior.
   
   Databricks API reference: 
https://docs.databricks.com/api/workspace/jobs/repairrun
   
   ## Design
   
   `DatabricksWorkflowTaskGroup` injects a `full_run_repair_coordinator` task 
after `launch` on
   Airflow 3 when `max_full_run_repairs > 0`.
   
   `launch` still owns only the setup phase: create or reset the Databricks 
job, start the
   workflow run, store the run link, and return `{conn_id, job_id, run_id}` 
through XCom.
   
   `full_run_repair_coordinator` owns the long-lived repair loop. It pulls the 
launch metadata and
   waits for the parent Databricks run to reach a terminal state. On success it 
completes. On failure
   with budget remaining, it calls `repair_run`, bumps `{repair_attempts, 
latest_repair_id}`, and
   keeps watching the same `run_id`. On failure after the budget is exhausted, 
the coordinator fails.
   
   Like other Databricks operators, the coordinator honors `deferrable` 
(defaults to
   `conf.getboolean("operators", "default_deferrable", fallback=False)`). When 
`True` it defers on
   `DatabricksWorkflowRepairCoordinatorTrigger` between each repair cycle; when 
`False` it runs the
   same state machine inline as a sync poll loop. Both paths share the 
`build_repair_run_json` helper
   so the Databricks API payload stays in lock-step.
   
   Downstream Databricks task monitors are not forced into any mode — they keep 
whatever
   `deferrable` the user configured on the operator. When a Databricks sub-run 
fails:
   
   - in deferrable mode, the monitor defers on 
`DatabricksWorkflowRepairWaitTrigger` until a new
     sub-run with the same `task_key` appears, then defers on 
`DatabricksExecutionTrigger` to monitor
     the repaired sub-run;
   - in sync mode, the monitor sync-polls the parent run's task list for the 
same condition and
     drops back into its existing `while not run_state.is_terminal` loop on the 
new `run_id`.
   
   The sync and async paths share `find_new_workflow_task_attempt` so the 
candidate-selection rule is
   identical. The sync branch is gated on `max_full_run_repairs > 0`; when the 
task group has no
   repair budget, the monitor's existing sync poll loop runs unchanged (shipped 
behavior preserved).
   
   If the parent run remains terminal-failed without producing a new sub-run 
for the grace
   window, the downstream task fails.
   
   ## Why A Coordinator Task?
   
   The repair loop cannot live cleanly in `launch` because `launch` must 
complete before Airflow
   will schedule the downstream monitor tasks. Those monitors need the 
`{conn_id, job_id, run_id}`
   XCom from `launch`, and they also need to be running so they can notice 
failed sub-runs and
   wait for repaired attempts.
   
   If `launch` stayed deferred until the whole Databricks run completed or 
exhausted repairs,
   the downstream monitors would remain blocked behind their upstream 
dependency. Pushing XCom
   early would not change that dependency state.
   
   The coordinator keeps the graph semantics simple:
   
   - `launch` publishes the stable Databricks run metadata and succeeds.
   - downstream monitors fan out and track individual sub-runs.
   - one sibling task owns the parent-run repair budget and `latest_repair_id`.
   - failed monitors discover repaired attempts from the Databricks API instead 
of racing to call
     `repair_run` themselves.
   
   This also gives the implementation a single writer for Databricks repairs. 
Without that, several
   failed task monitors could try to repair the same parent run concurrently.
   
   ## Alternatives Considered
   
   The sibling coordinator is not the only possible shape, but Databricks 
repair has one hard
   constraint that drives the design: a run must not be in progress before 
`repair_run` can be
   called. That makes task-local repair impossible while the parent workflow is 
still running;
   some run-scoped owner has to wait for terminal state, decide whether budget 
remains, and issue
   the repair.
   
   Other options were considered:
   
   - keep the repair loop in `launch`: simpler ownership, but `launch` would 
remain running until
     the Databricks workflow completed, blocking the downstream monitor tasks 
that give Airflow
     task-level visibility;
   - add a finalizer task after all monitors: this makes the coordinator look 
like a natural join,
     but it fights Airflow task-state semantics because failed upstream 
monitors would already have
     completed or failed before repair can re-run their Databricks tasks;
   
   Given those tradeoffs, the sibling coordinator is the least invasive option: 
it keeps a single
   writer for the parent-run repair budget while preserving the existing 
fan-out of Airflow monitor
   tasks. The main cost is that the TaskGroup gains an internal lifecycle task, 
so downstream
   dependencies on the group also wait for the coordinator to finish.
   
   ## Notes
   
   - Downstream Airflow tasks stay in the same Airflow attempt while waiting 
for Databricks repair
     and monitoring the repaired sub-run.
   - Repair progress is stored in coordinator trigger serialization state. 
Downstream tasks discover
     new attempts from the Databricks API, not from shared XCom.
   - Airflow 3 repair extra links remain disabled. Only `WorkflowJobRunLink` is 
exposed on Airflow 3.
   - Provider docs, `providers/databricks/docs/changelog.rst`, and the FastAPI 
manual repair endpoint
     are outside the staged changes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to