This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0a034706ea5 Fix `DatabricksWorkflowTaskGroup` leaking TaskGroupContext
on internal exception (#66582)
0a034706ea5 is described below
commit 0a034706ea503ee7973fe2daf2a013c7078b9a9a
Author: Noritaka Sekiyama <[email protected]>
AuthorDate: Sat May 9 04:26:25 2026 +0900
Fix `DatabricksWorkflowTaskGroup` leaking TaskGroupContext on internal
exception (#66582)
* Removing resources properly for DatabricksWorkflowGroup
* Add the unit test to check that super().__exit()__ called even exception
happen
* remove unnecessary comment
* Fix unit tests
* Fix unit tests
* Strengthen test to actually exercise the bug
The previous test raised an AirflowException from inside the workflow body,
but super().__exit__ is reached even on the unfixed code in that path
(the for-loop over tasks is empty), so the test passed without the fix.
Replace it with a regression test that triggers the raise inside
DatabricksWorkflowTaskGroup.__exit__ itself (via an EmptyOperator that
does not implement _convert_to_databricks_workflow_task) and asserts
TaskGroupContext is empty afterward — fails on unfixed code, passes on
fixed code. Also drops two unused imports introduced earlier.
---------
Co-authored-by: artsiomyudovin <[email protected]>
---
.../databricks/operators/databricks_workflow.py | 35 +++++++++++-----------
.../operators/test_databricks_workflow.py | 28 +++++++++++++++++
2 files changed, 46 insertions(+), 17 deletions(-)
diff --git
a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
index 4eeb41babed..5bbf9d3c78a 100644
---
a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
+++
b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
@@ -393,20 +393,21 @@ class DatabricksWorkflowTaskGroup(TaskGroup):
spark_submit_params=self.spark_submit_params,
)
- for task in tasks:
- if not (
- hasattr(task, "_convert_to_databricks_workflow_task")
- and callable(task._convert_to_databricks_workflow_task)
- ):
- raise AirflowException(
- f"Task {task.task_id} does not support conversion to
databricks workflow task."
- )
-
- task.workflow_run_metadata = create_databricks_workflow_task.output
-
create_databricks_workflow_task.relevant_upstreams.append(task.task_id)
- create_databricks_workflow_task.add_task(task.task_id, task)
-
- for root_task in roots:
- root_task.set_upstream(create_databricks_workflow_task)
-
- super().__exit__(_type, _value, _tb)
+ try:
+ for task in tasks:
+ if not (
+ hasattr(task, "_convert_to_databricks_workflow_task")
+ and callable(task._convert_to_databricks_workflow_task)
+ ):
+ raise AirflowException(
+ f"Task {task.task_id} does not support conversion to
databricks workflow task."
+ )
+
+ task.workflow_run_metadata =
create_databricks_workflow_task.output
+
create_databricks_workflow_task.relevant_upstreams.append(task.task_id)
+ create_databricks_workflow_task.add_task(task.task_id, task)
+
+ for root_task in roots:
+ root_task.set_upstream(create_databricks_workflow_task)
+ finally:
+ super().__exit__(_type, _value, _tb)
diff --git
a/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py
b/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py
index 518bac7b04c..9cfa7e91ae3 100644
---
a/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py
+++
b/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py
@@ -21,6 +21,8 @@ from unittest.mock import MagicMock, patch
import pytest
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
# Do not run the tests when FAB / Flask is not installed
pytest.importorskip("flask_session")
@@ -276,6 +278,32 @@ def
test_task_group_exit_creates_operator(mock_databricks_workflow_operator):
)
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Uses Airflow 3 task SDK
TaskGroupContext layout")
+def test_task_group_context_cleaned_up_on_internal_exception():
+ """
+ Regression test for GH-42164.
+
+ When DatabricksWorkflowTaskGroup.__exit__ raises (e.g. an added task does
not
+ support conversion), super().__exit__ must still run so TaskGroupContext
does
+ not leak the workflow group onto the global stack and break later DAGs with
+ "Cannot mix TaskGroups from different DAGs".
+ """
+ from airflow.sdk.definitions._internal.contextmanager import
TaskGroupContext
+
+ TaskGroupContext._context.clear()
+
+ with pytest.raises(AirflowException, match="does not support conversion"):
# noqa: PT012 raise happens on context exit
+ with DAG(dag_id="example_databricks_workflow_dag_err", schedule=None,
start_date=DEFAULT_DATE):
+ with DatabricksWorkflowTaskGroup(
+ group_id="test_databricks_workflow_err",
databricks_conn_id="databricks_conn"
+ ):
+ # EmptyOperator does not implement
_convert_to_databricks_workflow_task,
+ # which makes DatabricksWorkflowTaskGroup.__exit__ raise
mid-way.
+ EmptyOperator(task_id="not_convertible")
+
+ assert not TaskGroupContext._context, "TaskGroupContext leaked the
workflow task group"
+
+
def
test_task_group_root_tasks_set_upstream_to_operator(mock_databricks_workflow_operator):
"""Test that tasks added to a DatabricksWorkflowTaskGroup are set upstream
to the operator."""
with DAG(dag_id="example_databricks_workflow_dag", schedule=None,
start_date=DEFAULT_DATE):