Beat-Nick opened a new pull request, #68358:
URL: https://github.com/apache/airflow/pull/68358
<!-- SPDX-License-Identifier: Apache-2.0
https://www.apache.org/licenses/LICENSE-2.0 -->
# Enable Databricks Workflow Repair In Airflow 3
Adds automatic Databricks Workflow repair support to
`DatabricksWorkflowTaskGroup` on Airflow 3.
Today, a transient Databricks task failure fails the Dag. Retrying the
Airflow task does not recover the Databricks Workflow run: the retried task
calls the Databricks API, sees that the existing run is still failed, and fails
again.
When `workflow_repair_attempts > 0`, `DatabricksWorkflowTaskGroup` injects a
`repair_coordinator` task for the workflow run. After a workflow failure, the
coordinator calls Databricks `repair_run` so Databricks reruns the failed tasks
and their dependent tasks in the existing workflow run.
Downstream `DatabricksNotebookOperator` tasks find the replacement sub-run
for their `task_key` and continue watching that run.
```python
task_group = DatabricksWorkflowTaskGroup(
group_id="Example Workflow",
databricks_conn_id=DATABRICKS_CONN_ID,
workflow_repair_attempts=2,
workflow_repair_polling_period=15,
workflow_repair_timeout=300,
)
```
<img width="1204" height="838" alt="image"
src="https://github.com/user-attachments/assets/2439bd46-8654-448d-8c39-7a15f5254545"
/>
## Design
`launch` remains responsible only for starting the workflow run. It creates
or resets the Databricks job, starts the run, stores the run link, and returns
`{conn_id, job_id, run_id}` through XCom.
`repair_coordinator` is injected after `launch` when
`workflow_repair_attempts > 0`. It owns the repair for the parent Databricks
run. It watches the original `run_id` until the run reaches a terminal state:
- if the run succeeds, the coordinator succeeds;
- if the run fails and repair budget remains, the coordinator calls
`repair_run`;
- if the run fails after the repair budget is exhausted, the coordinator
fails.
The coordinator is the only task that calls `repair_run`, so multiple failed
Databricks sub-runs cannot race each other to repair the same parent workflow
run. The first repair uses the original `run_id`; later repairs also pass
`latest_repair_id` so Databricks continues the same repair chain.
`DatabricksNotebookOperator` monitors its Databricks sub-run. On failure, if
workflow repair is enabled, it waits in the same Airflow task attempt for
Databricks to expose a newer sub-run for the same `task_key` on the parent
workflow run, then monitors that attempt. If no new attempt appears within the
grace window, the task fails.
There are two grace windows to avoid treating stale Databricks state as
final:
- after `repair_run` is accepted, the coordinator gives Databricks up to
`workflow_repair_reflection_timeout` seconds (default 300s / 5 minutes) to stop
reporting the parent run as terminal before it gives up; it polls every
`workflow_repair_polling_period` seconds in between;
- after a sub-run fails, the task monitor gives Databricks a few polls to
expose the replacement sub-run, and resets that counter if the parent run's
`repair_history` grows.
The coordinator and `DatabricksNotebookOperator` both support sync and
deferrable execution. The behavior is the same in both modes:
- deferrable mode waits with triggers between Databricks polling cycles;
- sync mode waits with inline polling loops.
Both modes share the same repair payload builder and task-attempt selection
helper, so Databricks API requests and replacement-task matching stay
consistent across sync and deferrable execution.
When `workflow_repair_attempts == 0`, the repair coordinator is not injected
and existing Databricks operator behavior is unchanged.
## Notes
- Downstream Airflow tasks stay in the same Airflow attempt while waiting
for Databricks repair and monitoring the repaired sub-run.
- Airflow 3 manual repair links remain out of scope for this PR. A future
change could add a provider FastAPI endpoint and links, but that needs explicit
race handling with the automatic coordinator.
##### Was generative AI tooling used to co-author this PR?
<!--
If generative AI tooling has been used in the process of authoring this PR,
please
change below checkbox to `[X]` followed by the name of the tool, uncomment
the "Generated-by".
-->
- [x] Yes (please specify the tool below)
Generated-by: Claude Code + internally reviewed by my team.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]