moomindani commented on code in PR #68358:
URL: https://github.com/apache/airflow/pull/68358#discussion_r3432380463


##########
providers/databricks/docs/operators/workflow.rst:
##########
@@ -68,3 +68,29 @@ To minimize update conflicts, we recommend that you keep 
parameters in the ``not
 ``DatabricksWorkflowTaskGroup`` and not in the ``DatabricksNotebookOperator`` 
whenever possible.
 This is because, tasks in the ``DatabricksWorkflowTaskGroup`` are passed in on 
the job trigger time and
 do not modify the job definition.
+
+Automatic repair (Airflow 3+)
+-----------------------------
+
+Set ``workflow_repair_attempts=N`` to auto-repair a failed workflow run up to 
N times. The task
+group injects a ``repair_coordinator`` sibling that waits for the run to 
terminate and then calls
+Databricks ``repair_run`` with ``rerun_all_failed_tasks=True``. Default is 
``0`` (off).
+
+.. code-block:: python
+
+    task_group = DatabricksWorkflowTaskGroup(
+        group_id="Example Workflow",
+        databricks_conn_id="databricks_default",
+        workflow_repair_attempts=2,
+        workflow_repair_polling_period=15,
+        workflow_repair_timeout=300,
+    )
+
+After ``repair_run`` is accepted, Databricks needs a moment to drop the parent 
run out of its
+terminal state. The coordinator polls every ``workflow_repair_polling_period`` 
seconds and gives
+Databricks up to ``workflow_repair_timeout`` seconds (default 300s / 5 
minutes) to

Review Comment:
   The default here is described as 300s / 5 minutes, but the code default for 
`workflow_repair_timeout` is `180` (3 minutes) — see 
`_DatabricksWorkflowRepairCoordinatorOperator.__init__` 
(`workflow_repair_timeout: int = 180`). One of them should be corrected so the 
docs match the code.
   
   ---
   Drafted-by: Claude Code (Opus 4.8); reviewed by @moomindani before posting
   



##########
providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py:
##########
@@ -300,6 +311,231 @@ def on_kill(self) -> None:
             )
 
 
+class _DatabricksWorkflowRepairCoordinatorOperator(BaseOperator):
+    """Watch a Databricks Workflow run and issue repairs after terminal 
failures."""
+
+    caller = "_DatabricksWorkflowRepairCoordinatorOperator"
+
+    def __init__(
+        self,
+        task_id: str,
+        databricks_conn_id: str,
+        launch_task_id: str,
+        workflow_repair_attempts: int,
+        workflow_repair_polling_period: int = 30,
+        workflow_repair_timeout: int = 180,
+        databricks_retry_limit: int = 3,
+        databricks_retry_delay: int = 10,
+        databricks_retry_args: dict[Any, Any] | None = None,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        **kwargs,
+    ):
+        if workflow_repair_attempts < 1:

Review Comment:
   The coordinator and the waiter (`DatabricksWorkflowRepairWaitTrigger`) 
coordinate only via the shared `end_time + workflow_repair_timeout` deadline. 
If the coordinator detects the failure ~one `workflow_repair_polling_period` 
late and the repair isn't reflected before the waiter's deadline, the waiter 
can fail its task while Databricks actually goes on to repair (and possibly 
succeed) the run — leaving the Airflow task FAILED but the Databricks run 
repaired. With the defaults (180 vs 30) there's plenty of margin, so this only 
bites when `workflow_repair_timeout` is small relative to 
`workflow_repair_polling_period`.
   
   A hard guard feels too strong here, since the safe margin really depends on 
Databricks' (non-deterministic) repair-reflection latency. But it might be 
worth emitting a `log.warning(...)` at construction time when 
`workflow_repair_timeout` is small relative to `workflow_repair_polling_period` 
(e.g. `< polling_period * 2`), so users get a heads-up at config time without 
being blocked. At minimum, a comment making this implicit relationship explicit 
would help.
   
   ---
   Drafted-by: Claude Code (Opus 4.8); reviewed by @moomindani before posting
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to