Beat-Nick opened a new pull request, #67135:
URL: https://github.com/apache/airflow/pull/67135
# Databricks Workflow Repair In Airflow 3
## Goal
Re-enable automatic repair for `DatabricksWorkflowTaskGroup` on Airflow 3
without
Flask-AppBuilder, direct metadata database access, or task instance state
mutation.
When `max_full_run_repairs > 0`, a failed Databricks Workflow run should be
repuaired with one
Databricks `repair_run` call per failed batch:
```python
{
"run_id": run_id,
"rerun_all_failed_tasks": True,
"rerun_dependent_tasks": True,
}
```
`latest_repair_id` is added on later repairs so Databricks repairs the
existing run instead
of starting a new workflow run. `max_full_run_repairs` defaults to `0`,
preserving current behavior.
Databricks API reference:
https://docs.databricks.com/api/workspace/jobs/repairrun
## Design
`DatabricksWorkflowTaskGroup` injects a `full_run_repair_coordinator` task
after `launch` on
Airflow 3 when `max_full_run_repairs > 0`.
`launch` still owns only the setup phase: create or reset the Databricks
job, start the
workflow run, store the run link, and return `{conn_id, job_id, run_id}`
through XCom.
`full_run_repair_coordinator` owns the long-lived repair loop. It pulls the
launch metadata and
waits for the parent Databricks run to reach a terminal state. On success it
completes. On failure
with budget remaining, it calls `repair_run`, bumps `{repair_attempts,
latest_repair_id}`, and
keeps watching the same `run_id`. On failure after the budget is exhausted,
the coordinator fails.
Like other Databricks operators, the coordinator honors `deferrable`
(defaults to
`conf.getboolean("operators", "default_deferrable", fallback=False)`). When
`True` it defers on
`DatabricksWorkflowRepairCoordinatorTrigger` between each repair cycle; when
`False` it runs the
same state machine inline as a sync poll loop. Both paths share the
`build_repair_run_json` helper
so the Databricks API payload stays in lock-step.
Downstream Databricks task monitors are not forced into any mode — they keep
whatever
`deferrable` the user configured on the operator. When a Databricks sub-run
fails:
- in deferrable mode, the monitor defers on
`DatabricksWorkflowRepairWaitTrigger` until a new
sub-run with the same `task_key` appears, then defers on
`DatabricksExecutionTrigger` to monitor
the repaired sub-run;
- in sync mode, the monitor sync-polls the parent run's task list for the
same condition and
drops back into its existing `while not run_state.is_terminal` loop on the
new `run_id`.
The sync and async paths share `find_new_workflow_task_attempt` so the
candidate-selection rule is
identical. The sync branch is gated on `max_full_run_repairs > 0`; when the
task group has no
repair budget, the monitor's existing sync poll loop runs unchanged (shipped
behavior preserved).
If the parent run remains terminal-failed without producing a new sub-run
for the grace
window, the downstream task fails.
## Why A Coordinator Task?
The repair loop cannot live cleanly in `launch` because `launch` must
complete before Airflow
will schedule the downstream monitor tasks. Those monitors need the
`{conn_id, job_id, run_id}`
XCom from `launch`, and they also need to be running so they can notice
failed sub-runs and
wait for repaired attempts.
If `launch` stayed deferred until the whole Databricks run completed or
exhausted repairs,
the downstream monitors would remain blocked behind their upstream
dependency. Pushing XCom
early would not change that dependency state.
The coordinator keeps the graph semantics simple:
- `launch` publishes the stable Databricks run metadata and succeeds.
- downstream monitors fan out and track individual sub-runs.
- one sibling task owns the parent-run repair budget and `latest_repair_id`.
- failed monitors discover repaired attempts from the Databricks API instead
of racing to call
`repair_run` themselves.
This also gives the implementation a single writer for Databricks repairs.
Without that, several
failed task monitors could try to repair the same parent run concurrently.
## Alternatives Considered
The sibling coordinator is not the only possible shape, but Databricks
repair has one hard
constraint that drives the design: a run must not be in progress before
`repair_run` can be
called. That makes task-local repair impossible while the parent workflow is
still running;
some run-scoped owner has to wait for terminal state, decide whether budget
remains, and issue
the repair.
Other options were considered:
- keep the repair loop in `launch`: simpler ownership, but `launch` would
remain running until
the Databricks workflow completed, blocking the downstream monitor tasks
that give Airflow
task-level visibility;
- add a finalizer task after all monitors: this makes the coordinator look
like a natural join,
but it fights Airflow task-state semantics because failed upstream
monitors would already have
completed or failed before repair can re-run their Databricks tasks;
Given those tradeoffs, the sibling coordinator is the least invasive option:
it keeps a single
writer for the parent-run repair budget while preserving the existing
fan-out of Airflow monitor
tasks. The main cost is that the TaskGroup gains an internal lifecycle task,
so downstream
dependencies on the group also wait for the coordinator to finish.
## Notes
- Downstream Airflow tasks stay in the same Airflow attempt while waiting
for Databricks repair
and monitoring the repaired sub-run.
- Repair progress is stored in coordinator trigger serialization state.
Downstream tasks discover
new attempts from the Databricks API, not from shared XCom.
- Airflow 3 repair extra links remain disabled. Only `WorkflowJobRunLink` is
exposed on Airflow 3.
- Provider docs, `providers/databricks/docs/changelog.rst`, and the FastAPI
manual repair endpoint
are outside the staged changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]