This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4588de0efd6 Lock in Databricks workflow depends_on parent-key behavior 
(#47614) (#66681)
4588de0efd6 is described below

commit 4588de0efd625055d62760c527b9c9facd9a95e8
Author: deepinsight coder <[email protected]>
AuthorDate: Tue Jun 2 11:24:17 2026 -0700

    Lock in Databricks workflow depends_on parent-key behavior (#47614) (#66681)
    
    * Lock in DatabricksTaskBaseOperator depends_on parent-key behavior
    
    The runtime fix for issue #47614 shipped in PR #48492; this PR adds
    end-to-end regression coverage so the bug cannot silently regress, plus
    small type-hint and constructor-clarity follow-ups in the same area.
    
    Tests build a real DAG + DatabricksWorkflowTaskGroup with
    DatabricksNotebookOperator tasks and assert depends_on payloads for the
    default-key, custom-key, >100-char-key, diamond, fan-out, root-task, and
    external-upstream cases. Also fixes the existing
    test_convert_to_databricks_workflow_task to pass strings (not mocks) so
    the depends_on branch is actually exercised, and adds a one-line check
    that _generate_databricks_task_key raises when called with a parent
    task_id but no task_dict.
    
    closes: #47614
    
    * Remove manually added databricks changelog entry
    
    Provider changelogs are regenerated from git log by the release
    manager and should not be edited by hand.
    
    * Assert depends_on reaches the Databricks Jobs API payload
    
    Existing TestWorkflowDependsOn coverage verified the in-process
    create_workflow_json() output. The new TestWorkflowDependsOnWirePayload
    class drives _CreateDatabricksWorkflowOperator._create_or_reset_job end
    to end and asserts on the spec that DatabricksHook.create_job /
    DatabricksHook.reset_job actually receive, i.e. the payload that lands
    in /api/2.1/jobs/create and /api/2.1/jobs/reset.
    
    Both branches (no existing job -> create_job, existing job -> reset_job)
    are exercised; both assert tasks[child].depends_on == [{task_key:
    md5(parent)}] and tasks[parent].depends_on == [].
    
    * Fix Databricks provider mypy timezone imports
    
    * Fix Databricks provider timezone import compatibility
---
 .../providers/databricks/operators/databricks.py   |   4 +-
 .../databricks/operators/databricks_workflow.py    |   2 +-
 .../providers/databricks/utils/openlineage.py      |   2 +-
 .../unit/databricks/operators/test_databricks.py   |  26 ++-
 .../operators/test_databricks_workflow.py          | 242 ++++++++++++++++++++-
 .../sensors/test_databricks_partition.py           |   3 +-
 .../unit/databricks/sensors/test_databricks_sql.py |   3 +-
 .../unit/databricks/utils/test_openlineage.py      |   3 +-
 8 files changed, 270 insertions(+), 15 deletions(-)

diff --git 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
index c9e7c6891fb..9898993d414 100644
--- 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
+++ 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
@@ -1425,7 +1425,7 @@ class DatabricksTaskBaseOperator(BaseOperator, ABC):
 
     def _convert_to_databricks_workflow_task(
         self,
-        relevant_upstreams: list[BaseOperator],
+        relevant_upstreams: list[str],
         task_dict: dict[str, BaseOperator],
         context: Context | None = None,
     ) -> dict[str, object]:
@@ -1679,7 +1679,7 @@ class 
DatabricksNotebookOperator(DatabricksTaskBaseOperator):
 
     def _convert_to_databricks_workflow_task(
         self,
-        relevant_upstreams: list[BaseOperator],
+        relevant_upstreams: list[str],
         task_dict: dict[str, BaseOperator],
         context: Context | None = None,
     ) -> dict[str, object]:
diff --git 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
index 5bbf9d3c78a..779c2fc9f15 100644
--- 
a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
+++ 
b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py
@@ -158,7 +158,7 @@ class _CreateDatabricksWorkflowOperator(BaseOperator):
         self.python_params = python_params or []
         self.spark_submit_params = spark_submit_params or []
         self.tasks_to_convert = tasks_to_convert or {}
-        self.relevant_upstreams = [task_id]
+        self.relevant_upstreams: list[str] = []
         self.workflow_run_metadata: WorkflowRunMetadata | None = None
         super().__init__(task_id=task_id, **kwargs)
 
diff --git 
a/providers/databricks/src/airflow/providers/databricks/utils/openlineage.py 
b/providers/databricks/src/airflow/providers/databricks/utils/openlineage.py
index 56f4400df61..58e87a21dd7 100644
--- a/providers/databricks/src/airflow/providers/databricks/utils/openlineage.py
+++ b/providers/databricks/src/airflow/providers/databricks/utils/openlineage.py
@@ -24,7 +24,7 @@ from typing import TYPE_CHECKING, Any
 import requests
 
 from airflow.providers.common.compat.openlineage.check import 
require_openlineage_version
-from airflow.utils import timezone
+from airflow.providers.common.compat.sdk import timezone
 
 if TYPE_CHECKING:
     from openlineage.client.event_v2 import RunEvent
diff --git 
a/providers/databricks/tests/unit/databricks/operators/test_databricks.py 
b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
index a1b7b4f11b3..4684b14282c 100644
--- a/providers/databricks/tests/unit/databricks/operators/test_databricks.py
+++ b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
@@ -2778,15 +2778,22 @@ class TestDatabricksNotebookOperator:
         operator.task_group = databricks_workflow_task_group
         operator.task_id = "test_task"
         operator.upstream_task_ids = ["upstream_task"]
-        relevant_upstreams = [MagicMock(task_id="upstream_task")]
-        task_dict = {"upstream_task": MagicMock(task_id="upstream_task")}
+        upstream_task = DatabricksNotebookOperator(
+            notebook_path="/path/to/upstream",
+            source="WORKSPACE",
+            task_id="upstream_task",
+            dag=dag,
+        )
+        relevant_upstreams = ["upstream_task"]
+        task_dict = {"upstream_task": upstream_task}
 
         task_json = 
operator._convert_to_databricks_workflow_task(relevant_upstreams, task_dict)
 
         task_key = hashlib.md5(b"example_dag__test_task").hexdigest()
+        upstream_task_key = 
hashlib.md5(b"example_dag__upstream_task").hexdigest()
         expected_json = {
             "task_key": task_key,
-            "depends_on": [],
+            "depends_on": [{"task_key": upstream_task_key}],
             "timeout_seconds": 0,
             "email_notifications": {},
             "notebook_task": {
@@ -2957,6 +2964,19 @@ class TestDatabricksTaskOperator:
         expected_task_key = hashlib.md5(task_key).hexdigest()
         assert expected_task_key == operator.databricks_task_key
 
+    def 
test_generate_databricks_task_key_requires_task_dict_when_task_id_passed(self):
+        """Looking up a parent task's key without a ``task_dict`` is a 
programmer error."""
+        operator = DatabricksTaskOperator(
+            task_id="test_task",
+            databricks_conn_id="test_conn_id",
+            task_config={},
+        )
+        with pytest.raises(
+            ValueError,
+            match="Must pass task_dict if task_id is provided in 
_generate_databricks_task_key.",
+        ):
+            operator._generate_databricks_task_key(task_id="upstream_task")
+
     def test_user_databricks_task_key(self):
         task_config = {}
         operator = DatabricksTaskOperator(
diff --git 
a/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py
 
b/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py
index 9cfa7e91ae3..84069ee0ff7 100644
--- 
a/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py
+++ 
b/providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py
@@ -17,6 +17,7 @@
 
 from __future__ import annotations
 
+import hashlib
 from unittest.mock import MagicMock, patch
 
 import pytest
@@ -28,8 +29,9 @@ pytest.importorskip("flask_session")
 
 from airflow import DAG
 from airflow.models.baseoperator import BaseOperator
-from airflow.providers.common.compat.sdk import AirflowException
+from airflow.providers.common.compat.sdk import AirflowException, timezone
 from airflow.providers.databricks.hooks.databricks import RunLifeCycleState
+from airflow.providers.databricks.operators.databricks import 
DatabricksNotebookOperator
 from airflow.providers.databricks.operators.databricks_workflow import (
     DatabricksWorkflowTaskGroup,
     WorkflowRunMetadata,
@@ -37,7 +39,6 @@ from 
airflow.providers.databricks.operators.databricks_workflow import (
     _flatten_node,
 )
 from airflow.providers.standard.operators.empty import EmptyOperator
-from airflow.utils import timezone
 
 DEFAULT_DATE = timezone.datetime(2021, 1, 1)
 
@@ -333,3 +334,240 @@ def test_on_kill(mock_databricks_hook, context, 
mock_workflow_run_metadata):
     operator.on_kill()
 
     operator._hook.cancel_run.assert_called_once_with(RUN_ID)
+
+
+class TestWorkflowDependsOn:
+    """End-to-end coverage that ``depends_on`` references the *parent's* 
``task_key``.
+
+    Regression coverage for issue apache/airflow#47614 (root cause fixed by 
#48492).
+    Each test builds a real ``DAG`` + ``DatabricksWorkflowTaskGroup`` 
populated with
+    real ``DatabricksNotebookOperator`` tasks (no operator mocks), then drives
+    ``_CreateDatabricksWorkflowOperator.create_workflow_json`` and asserts the
+    resulting ``tasks[*]['depends_on']`` payload.
+    """
+
+    DAG_ID = "test_depends_on_dag"
+    GROUP_ID = "wf_group"
+    CONN_ID = "databricks_conn"
+
+    @staticmethod
+    def _build_notebook(task_id: str, **kwargs) -> DatabricksNotebookOperator:
+        return DatabricksNotebookOperator(
+            task_id=task_id,
+            notebook_path=f"/path/{task_id}",
+            source="WORKSPACE",
+            **kwargs,
+        )
+
+    def _expected_default_key(self, group_task_id: str) -> str:
+        full_task_id = f"{self.GROUP_ID}.{group_task_id}"
+        return 
hashlib.md5(f"{self.DAG_ID}__{full_task_id}".encode()).hexdigest()
+
+    def _launch_task(self, dag: DAG) -> _CreateDatabricksWorkflowOperator:
+        launch = dag.task_dict[f"{self.GROUP_ID}.launch"]
+        assert isinstance(launch, _CreateDatabricksWorkflowOperator)
+        return launch
+
+    @staticmethod
+    def _tasks_by_key(workflow_json: dict) -> dict:
+        return {t["task_key"]: t for t in workflow_json["tasks"]}
+
+    def test_depends_on_uses_parent_key_default_keys(self):
+        """``task_A >> task_B`` — ``task_B.depends_on`` references 
``task_A``'s key."""
+        with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE) 
as dag:
+            with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID, 
databricks_conn_id=self.CONN_ID):
+                task_a = self._build_notebook("task_a")
+                task_b = self._build_notebook("task_b")
+                task_a >> task_b
+
+        tasks_by_key = 
self._tasks_by_key(self._launch_task(dag).create_workflow_json())
+        a_key = self._expected_default_key("task_a")
+        b_key = self._expected_default_key("task_b")
+
+        assert set(tasks_by_key) == {a_key, b_key}
+        assert tasks_by_key[a_key]["depends_on"] == []
+        assert tasks_by_key[b_key]["depends_on"] == [{"task_key": a_key}]
+
+    def test_depends_on_uses_parent_key_custom_parent_key(self):
+        """An explicit ``databricks_task_key`` on the parent flows into 
``depends_on``."""
+        with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE) 
as dag:
+            with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID, 
databricks_conn_id=self.CONN_ID):
+                task_a = self._build_notebook("task_a", 
databricks_task_key="custom_a")
+                task_b = self._build_notebook("task_b")
+                task_a >> task_b
+
+        tasks_by_key = 
self._tasks_by_key(self._launch_task(dag).create_workflow_json())
+        b_key = self._expected_default_key("task_b")
+
+        assert "custom_a" in tasks_by_key
+        assert tasks_by_key[b_key]["depends_on"] == [{"task_key": "custom_a"}]
+
+    def test_depends_on_falls_back_to_hash_when_parent_key_too_long(self):
+        """A >100-char explicit key is rejected; both task and ``depends_on`` 
use the hash."""
+        too_long_key = "x" * 101
+        with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE) 
as dag:
+            with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID, 
databricks_conn_id=self.CONN_ID):
+                task_a = self._build_notebook("task_a", 
databricks_task_key=too_long_key)
+                task_b = self._build_notebook("task_b")
+                task_a >> task_b
+
+        tasks_by_key = 
self._tasks_by_key(self._launch_task(dag).create_workflow_json())
+        a_key = self._expected_default_key("task_a")
+        b_key = self._expected_default_key("task_b")
+
+        assert too_long_key not in tasks_by_key
+        assert a_key in tasks_by_key
+        assert tasks_by_key[b_key]["depends_on"] == [{"task_key": a_key}]
+
+    def test_depends_on_diamond_dependency(self):
+        """``A >> [B, C] >> D`` — D depends on both B and C; B and C each 
depend only on A."""
+        with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE) 
as dag:
+            with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID, 
databricks_conn_id=self.CONN_ID):
+                task_a = self._build_notebook("task_a")
+                task_b = self._build_notebook("task_b")
+                task_c = self._build_notebook("task_c")
+                task_d = self._build_notebook("task_d")
+                task_a >> [task_b, task_c] >> task_d
+
+        tasks_by_key = 
self._tasks_by_key(self._launch_task(dag).create_workflow_json())
+        a_key = self._expected_default_key("task_a")
+        b_key = self._expected_default_key("task_b")
+        c_key = self._expected_default_key("task_c")
+        d_key = self._expected_default_key("task_d")
+
+        assert tasks_by_key[a_key]["depends_on"] == []
+        assert tasks_by_key[b_key]["depends_on"] == [{"task_key": a_key}]
+        assert tasks_by_key[c_key]["depends_on"] == [{"task_key": a_key}]
+        d_parent_keys = {entry["task_key"] for entry in 
tasks_by_key[d_key]["depends_on"]}
+        assert d_parent_keys == {b_key, c_key}
+
+    def test_depends_on_fan_out_dependency(self):
+        """``A >> [B, C]`` — both downstreams reference A's key only."""
+        with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE) 
as dag:
+            with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID, 
databricks_conn_id=self.CONN_ID):
+                task_a = self._build_notebook("task_a")
+                task_b = self._build_notebook("task_b")
+                task_c = self._build_notebook("task_c")
+                task_a >> [task_b, task_c]
+
+        tasks_by_key = 
self._tasks_by_key(self._launch_task(dag).create_workflow_json())
+        a_key = self._expected_default_key("task_a")
+        b_key = self._expected_default_key("task_b")
+        c_key = self._expected_default_key("task_c")
+
+        assert tasks_by_key[a_key]["depends_on"] == []
+        assert tasks_by_key[b_key]["depends_on"] == [{"task_key": a_key}]
+        assert tasks_by_key[c_key]["depends_on"] == [{"task_key": a_key}]
+
+    def test_root_tasks_have_empty_depends_on(self):
+        """Root tasks' Airflow upstream is the launch task; that must never 
appear in ``depends_on``."""
+        with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE) 
as dag:
+            with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID, 
databricks_conn_id=self.CONN_ID):
+                root_a = self._build_notebook("root_a")
+                root_b = self._build_notebook("root_b")
+                self._build_notebook("downstream").set_upstream([root_a, 
root_b])
+
+        launch_task = self._launch_task(dag)
+        # Sanity: both roots actually have the launch task as an Airflow 
upstream.
+        for root_task_id in (f"{self.GROUP_ID}.root_a", 
f"{self.GROUP_ID}.root_b"):
+            assert launch_task.task_id in 
dag.task_dict[root_task_id].upstream_task_ids
+
+        tasks_by_key = self._tasks_by_key(launch_task.create_workflow_json())
+        root_a_key = self._expected_default_key("root_a")
+        root_b_key = self._expected_default_key("root_b")
+
+        assert tasks_by_key[root_a_key]["depends_on"] == []
+        assert tasks_by_key[root_b_key]["depends_on"] == []
+
+    def test_depends_on_filters_out_external_upstream(self):
+        """An Airflow upstream outside the workflow group must not appear in 
``depends_on``."""
+        with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE) 
as dag:
+            external_op = EmptyOperator(task_id="external_op")
+            with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID, 
databricks_conn_id=self.CONN_ID):
+                dbx_task = self._build_notebook("dbx_task")
+            external_op >> dbx_task
+
+        tasks_by_key = 
self._tasks_by_key(self._launch_task(dag).create_workflow_json())
+        dbx_key = self._expected_default_key("dbx_task")
+
+        assert tasks_by_key[dbx_key]["depends_on"] == []
+
+
+class TestWorkflowDependsOnWirePayload:
+    """Wire-boundary coverage: the spec sent to the Databricks Jobs API 
carries ``depends_on``.
+
+    :class:`TestWorkflowDependsOn` asserts the in-process 
``create_workflow_json`` payload.
+    These tests assert the *wire* payload — what ``_create_or_reset_job`` 
actually hands to
+    ``DatabricksHook.create_job`` (new job) or ``DatabricksHook.reset_job`` 
(existing job),
+    which is what the Databricks REST API receives.
+    """
+
+    DAG_ID = "test_depends_on_wire_dag"
+    GROUP_ID = "wf_group"
+    CONN_ID = "databricks_conn"
+
+    @staticmethod
+    def _build_notebook(task_id: str, **kwargs) -> DatabricksNotebookOperator:
+        return DatabricksNotebookOperator(
+            task_id=task_id,
+            notebook_path=f"/path/{task_id}",
+            source="WORKSPACE",
+            **kwargs,
+        )
+
+    def _expected_default_key(self, group_task_id: str) -> str:
+        full_task_id = f"{self.GROUP_ID}.{group_task_id}"
+        return 
hashlib.md5(f"{self.DAG_ID}__{full_task_id}".encode()).hexdigest()
+
+    def _launch_task(self, dag: DAG) -> _CreateDatabricksWorkflowOperator:
+        launch = dag.task_dict[f"{self.GROUP_ID}.launch"]
+        assert isinstance(launch, _CreateDatabricksWorkflowOperator)
+        return launch
+
+    @staticmethod
+    def _tasks_by_key(workflow_json: dict) -> dict:
+        return {t["task_key"]: t for t in workflow_json["tasks"]}
+
+    def _build_two_task_dag(self) -> DAG:
+        with DAG(dag_id=self.DAG_ID, schedule=None, start_date=DEFAULT_DATE) 
as dag:
+            with DatabricksWorkflowTaskGroup(group_id=self.GROUP_ID, 
databricks_conn_id=self.CONN_ID):
+                task_a = self._build_notebook("task_a")
+                task_b = self._build_notebook("task_b")
+                task_a >> task_b
+        return dag
+
+    def _assert_parent_depends_on(self, job_spec: dict) -> None:
+        tasks_by_key = self._tasks_by_key(job_spec)
+        a_key = self._expected_default_key("task_a")
+        b_key = self._expected_default_key("task_b")
+
+        assert len(job_spec["tasks"]) == 2
+        assert set(tasks_by_key) == {a_key, b_key}
+        assert tasks_by_key[a_key]["depends_on"] == []
+        assert tasks_by_key[b_key]["depends_on"] == [{"task_key": a_key}]
+
+    def test_create_job_payload_carries_parent_depends_on(self, 
mock_databricks_hook):
+        """No existing job → ``create_job`` receives a spec whose 
``depends_on`` references the parent key."""
+        launch_task = self._launch_task(self._build_two_task_dag())
+        launch_task._hook.list_jobs.return_value = []
+        launch_task._hook.create_job.return_value = 999
+
+        launch_task._create_or_reset_job(context=MagicMock())
+
+        launch_task._hook.create_job.assert_called_once()
+        launch_task._hook.reset_job.assert_not_called()
+        (job_spec,) = launch_task._hook.create_job.call_args.args
+        self._assert_parent_depends_on(job_spec)
+
+    def test_reset_job_payload_carries_parent_depends_on(self, 
mock_databricks_hook):
+        """Existing job → ``reset_job`` receives a spec whose ``depends_on`` 
references the parent key."""
+        launch_task = self._launch_task(self._build_two_task_dag())
+        launch_task._hook.list_jobs.return_value = [{"job_id": 42}]
+
+        launch_task._create_or_reset_job(context=MagicMock())
+
+        launch_task._hook.reset_job.assert_called_once()
+        launch_task._hook.create_job.assert_not_called()
+        job_id, job_spec = launch_task._hook.reset_job.call_args.args
+        assert job_id == 42
+        self._assert_parent_depends_on(job_spec)
diff --git 
a/providers/databricks/tests/unit/databricks/sensors/test_databricks_partition.py
 
b/providers/databricks/tests/unit/databricks/sensors/test_databricks_partition.py
index 473a0d073c3..23e6d12c310 100644
--- 
a/providers/databricks/tests/unit/databricks/sensors/test_databricks_partition.py
+++ 
b/providers/databricks/tests/unit/databricks/sensors/test_databricks_partition.py
@@ -24,10 +24,9 @@ from unittest.mock import patch
 import pytest
 
 from airflow.models import DAG
-from airflow.providers.common.compat.sdk import AirflowException
+from airflow.providers.common.compat.sdk import AirflowException, timezone
 from airflow.providers.common.sql.hooks.handlers import fetch_all_handler
 from airflow.providers.databricks.sensors.databricks_partition import 
DatabricksPartitionSensor
-from airflow.utils import timezone
 
 TASK_ID = "db-partition-sensor"
 DEFAULT_CONN_ID = "databricks_default"
diff --git 
a/providers/databricks/tests/unit/databricks/sensors/test_databricks_sql.py 
b/providers/databricks/tests/unit/databricks/sensors/test_databricks_sql.py
index ce72e0b8316..3431722f770 100644
--- a/providers/databricks/tests/unit/databricks/sensors/test_databricks_sql.py
+++ b/providers/databricks/tests/unit/databricks/sensors/test_databricks_sql.py
@@ -24,9 +24,8 @@ from unittest.mock import patch
 import pytest
 
 from airflow.models import DAG
-from airflow.providers.common.compat.sdk import AirflowException
+from airflow.providers.common.compat.sdk import AirflowException, timezone
 from airflow.providers.databricks.sensors.databricks_sql import 
DatabricksSqlSensor
-from airflow.utils import timezone
 
 TASK_ID = "db-sensor"
 DEFAULT_CONN_ID = "databricks_default"
diff --git 
a/providers/databricks/tests/unit/databricks/utils/test_openlineage.py 
b/providers/databricks/tests/unit/databricks/utils/test_openlineage.py
index 10efbb7fffc..8702456ca86 100644
--- a/providers/databricks/tests/unit/databricks/utils/test_openlineage.py
+++ b/providers/databricks/tests/unit/databricks/utils/test_openlineage.py
@@ -29,7 +29,7 @@ from airflow.providers.common.compat.openlineage.facet import 
(
     ExternalQueryRunFacet,
     SQLJobFacet,
 )
-from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException
+from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException, timezone
 from airflow.providers.databricks.hooks.databricks import DatabricksHook
 from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook
 from airflow.providers.databricks.utils.openlineage import (
@@ -41,7 +41,6 @@ from airflow.providers.databricks.utils.openlineage import (
     emit_openlineage_events_for_databricks_queries,
 )
 from airflow.providers.openlineage.conf import namespace
-from airflow.utils import timezone
 from airflow.utils.state import TaskInstanceState
 
 

Reply via email to