This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a034706ea5 Fix `DatabricksWorkflowTaskGroup` leaking TaskGroupContext 
on internal exception (#66582)
0a034706ea5 is described below

commit 0a034706ea503ee7973fe2daf2a013c7078b9a9a
Author: Noritaka Sekiyama <[email protected]>
AuthorDate: Sat May 9 04:26:25 2026 +0900

    Fix `DatabricksWorkflowTaskGroup` leaking TaskGroupContext on internal 
exception (#66582)
    
    * Removing resources properly for DatabricksWorkflowGroup
    
    * Add the unit test to check that super().__exit()__ called even exception 
happen
    
    * remove unnecessary comment
    
    * Fix unit tests
    
    * Fix unit tests
    
    * Strengthen test to actually exercise the bug
    
    The previous test raised an AirflowException from inside the workflow body,
    but super().__exit__ is reached even on the unfixed code in that path
    (the for-loop over tasks is empty), so the test passed without the fix.
    
    Replace it with a regression test that triggers the raise inside
    DatabricksWorkflowTaskGroup.__exit__ itself (via an EmptyOperator that
    does not implement _convert_to_databricks_workflow_task) and asserts
    TaskGroupContext is empty afterward — fails on unfixed code, passes on
    fixed code. Also drops two unused imports introduced earlier.
    
    ---------
    
    Co-authored-by: artsiomyudovin <[email protected]>
---
 .../databricks/operators/databricks_workflow.py    | 35 +++++++++++-----------
 .../operators/test_databricks_workflow.py          | 28 +++++++++++++++++
 2 files changed, 46 insertions(+), 17 deletions(-)

diff --git 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
index 4eeb41babed..5bbf9d3c78a 100644
--- 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
+++ 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
@@ -393,20 +393,21 @@ class DatabricksWorkflowTaskGroup(TaskGroup):
             spark_submit_params=self.spark_submit_params,
         )
 
-        for task in tasks:
-            if not (
-                hasattr(task, "_convert_to_databricks_workflow_task")
-                and callable(task._convert_to_databricks_workflow_task)
-            ):
-                raise AirflowException(
-                    f"Task {task.task_id} does not support conversion to 
databricks workflow task."
-                )
-
-            task.workflow_run_metadata = create_databricks_workflow_task.output
-            
create_databricks_workflow_task.relevant_upstreams.append(task.task_id)
-            create_databricks_workflow_task.add_task(task.task_id, task)
-
-        for root_task in roots:
-            root_task.set_upstream(create_databricks_workflow_task)
-
-        super().__exit__(_type, _value, _tb)
+        try:
+            for task in tasks:
+                if not (
+                    hasattr(task, "_convert_to_databricks_workflow_task")
+                    and callable(task._convert_to_databricks_workflow_task)
+                ):
+                    raise AirflowException(
+                        f"Task {task.task_id} does not support conversion to 
databricks workflow task."
+                    )
+
+                task.workflow_run_metadata = 
create_databricks_workflow_task.output
+                
create_databricks_workflow_task.relevant_upstreams.append(task.task_id)
+                create_databricks_workflow_task.add_task(task.task_id, task)
+
+            for root_task in roots:
+                root_task.set_upstream(create_databricks_workflow_task)
+        finally:
+            super().__exit__(_type, _value, _tb)
diff --git 
a/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py
 
b/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py
index 518bac7b04c..9cfa7e91ae3 100644
--- 
a/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py
+++ 
b/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py
@@ -21,6 +21,8 @@ from unittest.mock import MagicMock, patch
 
 import pytest
 
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
 # Do not run the tests when FAB / Flask is not installed
 pytest.importorskip("flask_session")
 
@@ -276,6 +278,32 @@ def 
test_task_group_exit_creates_operator(mock_databricks_workflow_operator):
     )
 
 
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Uses Airflow 3 task SDK 
TaskGroupContext layout")
+def test_task_group_context_cleaned_up_on_internal_exception():
+    """
+    Regression test for GH-42164.
+
+    When DatabricksWorkflowTaskGroup.__exit__ raises (e.g. an added task does 
not
+    support conversion), super().__exit__ must still run so TaskGroupContext 
does
+    not leak the workflow group onto the global stack and break later DAGs with
+    "Cannot mix TaskGroups from different DAGs".
+    """
+    from airflow.sdk.definitions._internal.contextmanager import 
TaskGroupContext
+
+    TaskGroupContext._context.clear()
+
+    with pytest.raises(AirflowException, match="does not support conversion"): 
 # noqa: PT012 raise happens on context exit
+        with DAG(dag_id="example_databricks_workflow_dag_err", schedule=None, 
start_date=DEFAULT_DATE):
+            with DatabricksWorkflowTaskGroup(
+                group_id="test_databricks_workflow_err", 
databricks_conn_id="databricks_conn"
+            ):
+                # EmptyOperator does not implement 
_convert_to_databricks_workflow_task,
+                # which makes DatabricksWorkflowTaskGroup.__exit__ raise 
mid-way.
+                EmptyOperator(task_id="not_convertible")
+
+    assert not TaskGroupContext._context, "TaskGroupContext leaked the 
workflow task group"
+
+
 def 
test_task_group_root_tasks_set_upstream_to_operator(mock_databricks_workflow_operator):
     """Test that tasks added to a DatabricksWorkflowTaskGroup are set upstream 
to the operator."""
     with DAG(dag_id="example_databricks_workflow_dag", schedule=None, 
start_date=DEFAULT_DATE):

Reply via email to