This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e72f3a1a6d8 Move lineage from airflow core to task sdk (#61157)
e72f3a1a6d8 is described below
commit e72f3a1a6d803bee0ab55143baf00222f00f1ed9
Author: Amogh Desai <[email protected]>
AuthorDate: Wed Jan 28 15:44:03 2026 +0530
Move lineage from airflow core to task sdk (#61157)
---
.github/boring-cyborg.yml | 4 +-
airflow-core/src/airflow/lineage/__init__.py | 11 +++++
airflow-core/src/airflow/plugins_manager.py | 12 -----
airflow-core/tests/unit/lineage/__init__.py | 17 -------
.../tests/test_pytest_args_for_test_types.py | 1 -
devel-common/src/tests_common/pytest_plugin.py | 25 ++++++++---
.../src/tests_common/test_utils/mock_plugins.py | 14 +++++-
.../providers/common/compat/lineage/hook.py | 6 +--
.../src/airflow/providers/common/compat/sdk.py | 27 ++++++++++-
.../tests/unit/common/compat/lineage/test_hook.py | 52 +++++++++++++++++-----
providers/openlineage/pyproject.toml | 2 +-
.../providers/openlineage/plugins/openlineage.py | 2 +-
.../unit/openlineage/extractors/test_manager.py | 20 ++++++---
task-sdk/src/airflow/sdk/io/path.py | 6 +--
.../hook.py => task-sdk/src/airflow/sdk/lineage.py | 12 +++--
task-sdk/src/airflow/sdk/plugins_manager.py | 12 +++++
task-sdk/tests/task_sdk/docs/test_public_api.py | 1 +
.../tests/task_sdk/test_lineage.py | 15 +++----
18 files changed, 159 insertions(+), 80 deletions(-)
diff --git a/.github/boring-cyborg.yml b/.github/boring-cyborg.yml
index 9f42441f1c3..f484a482e74 100644
--- a/.github/boring-cyborg.yml
+++ b/.github/boring-cyborg.yml
@@ -442,8 +442,8 @@ labelPRBasedOnFilePath:
- airflow-core/docs/howto/usage-cli.rst
area:Lineage:
- - airflow-core/src/airflow/lineage/**/*
- - airflow-core/tests/unit/lineage/**/*
+ - task-sdk/src/airflow/sdk/lineage.py
+ - task-sdk/tests/task_sdk/test_lineage.py
- airflow-core/docs/administration-and-deployment/lineage.rst
area:Logging:
diff --git a/airflow-core/src/airflow/lineage/__init__.py
b/airflow-core/src/airflow/lineage/__init__.py
index 217e5db9607..15afa1a75de 100644
--- a/airflow-core/src/airflow/lineage/__init__.py
+++ b/airflow-core/src/airflow/lineage/__init__.py
@@ -15,3 +15,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from __future__ import annotations
+
+from airflow.utils.deprecation_tools import add_deprecated_classes
+
+add_deprecated_classes(
+ {
+ __name__: {"hook": "airflow.sdk.lineage"},
+ "hook": {"*": "airflow.sdk.lineage"},
+ },
+ package=__name__,
+)
diff --git a/airflow-core/src/airflow/plugins_manager.py
b/airflow-core/src/airflow/plugins_manager.py
index af05f1aacd2..f26878fa4ee 100644
--- a/airflow-core/src/airflow/plugins_manager.py
+++ b/airflow-core/src/airflow/plugins_manager.py
@@ -38,7 +38,6 @@ from airflow._shared.plugins_manager import (
from airflow.configuration import conf
if TYPE_CHECKING:
- from airflow.lineage.hook import HookLineageReader
from airflow.listeners.listener import ListenerManager
from airflow.partition_mapper.base import PartitionMapper
from airflow.task.priority_strategy import PriorityWeightStrategy
@@ -283,17 +282,6 @@ def get_partition_mapper_plugins() -> dict[str,
type[PartitionMapper]]:
}
-@cache
-def get_hook_lineage_readers_plugins() -> list[type[HookLineageReader]]:
- """Collect and get hook lineage reader classes registered by plugins."""
- log.debug("Initialize hook lineage readers plugins")
- result: list[type[HookLineageReader]] = []
-
- for plugin in _get_plugins()[0]:
- result.extend(plugin.hook_lineage_readers)
- return result
-
-
@cache
def integrate_macros_plugins() -> None:
"""Integrates macro plugins."""
diff --git a/airflow-core/tests/unit/lineage/__init__.py
b/airflow-core/tests/unit/lineage/__init__.py
deleted file mode 100644
index 217e5db9607..00000000000
--- a/airflow-core/tests/unit/lineage/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git a/dev/breeze/tests/test_pytest_args_for_test_types.py
b/dev/breeze/tests/test_pytest_args_for_test_types.py
index 982dc4e37b1..7e70fe6162a 100644
--- a/dev/breeze/tests/test_pytest_args_for_test_types.py
+++ b/dev/breeze/tests/test_pytest_args_for_test_types.py
@@ -163,7 +163,6 @@ def _find_all_integration_folders() -> list[str]:
"airflow-core/tests/unit/decorators",
"airflow-core/tests/unit/hooks",
"airflow-core/tests/unit/io",
- "airflow-core/tests/unit/lineage",
"airflow-core/tests/unit/listeners",
"airflow-core/tests/unit/logging",
"airflow-core/tests/unit/macros",
diff --git a/devel-common/src/tests_common/pytest_plugin.py
b/devel-common/src/tests_common/pytest_plugin.py
index 258d3345706..9622906efff 100644
--- a/devel-common/src/tests_common/pytest_plugin.py
+++ b/devel-common/src/tests_common/pytest_plugin.py
@@ -1980,17 +1980,30 @@ def _mock_plugins(request: pytest.FixtureRequest):
@pytest.fixture
def hook_lineage_collector():
- from airflow.lineage.hook import HookLineageCollector
+ from airflow.providers.common.compat.sdk import HookLineageCollector
+
+ from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS
hlc = HookLineageCollector()
- with mock.patch(
- "airflow.lineage.hook.get_hook_lineage_collector",
- return_value=hlc,
- ):
- # Redirect calls to compat provider to support back-compat tests of
2.x as well
+
+ if AIRFLOW_V_3_0_PLUS:
+ from unittest import mock
+
+ patch_target = "airflow.lineage.hook.get_hook_lineage_collector"
+ if AIRFLOW_V_3_2_PLUS:
+ patch_target = "airflow.sdk.lineage.get_hook_lineage_collector"
+
+ with mock.patch(patch_target, return_value=hlc):
+ from airflow.providers.common.compat.lineage.hook import
get_hook_lineage_collector
+
+ yield get_hook_lineage_collector()
+ else:
+ from airflow.lineage import hook
from airflow.providers.common.compat.lineage.hook import
get_hook_lineage_collector
+ hook._hook_lineage_collector = hlc
yield get_hook_lineage_collector()
+ hook._hook_lineage_collector = None
@pytest.fixture
diff --git a/devel-common/src/tests_common/test_utils/mock_plugins.py
b/devel-common/src/tests_common/test_utils/mock_plugins.py
index dfc654d15ee..d4bc391418f 100644
--- a/devel-common/src/tests_common/test_utils/mock_plugins.py
+++ b/devel-common/src/tests_common/test_utils/mock_plugins.py
@@ -85,6 +85,7 @@ def mock_plugin_manager(plugins=None, **kwargs):
if AIRFLOW_V_3_2_PLUS:
# Always start the block with an non-initialized plugins, so
ensure_plugins_loaded runs.
from airflow import plugins_manager
+ from airflow.sdk import plugins_manager as sdk_plugins_manager
plugins_manager._get_plugins.cache_clear()
plugins_manager._get_ui_plugins.cache_clear()
@@ -92,10 +93,12 @@ def mock_plugin_manager(plugins=None, **kwargs):
plugins_manager.get_fastapi_plugins.cache_clear()
plugins_manager._get_extra_operators_links_plugins.cache_clear()
plugins_manager.get_timetables_plugins.cache_clear()
- plugins_manager.get_hook_lineage_readers_plugins.cache_clear()
plugins_manager.integrate_macros_plugins.cache_clear()
plugins_manager.get_priority_weight_strategy_plugins.cache_clear()
+ sdk_plugins_manager.integrate_macros_plugins.cache_clear()
+ sdk_plugins_manager.get_hook_lineage_readers_plugins.cache_clear()
+
if plugins is not None or "import_errors" in kwargs:
exit_stack.enter_context(
mock.patch(
@@ -106,6 +109,15 @@ def mock_plugin_manager(plugins=None, **kwargs):
),
)
)
+ exit_stack.enter_context(
+ mock.patch(
+ "airflow.sdk.plugins_manager._get_plugins",
+ return_value=(
+ plugins or [],
+ kwargs.get("import_errors", {}),
+ ),
+ )
+ )
elif kwargs:
raise NotImplementedError(
"mock_plugin_manager does not support patching other
attributes in Airflow 3.2+"
diff --git
a/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
b/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
index 2eb07f446f3..37b352b2cc5 100644
---
a/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
+++
b/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
@@ -21,7 +21,7 @@ from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any
- from airflow.lineage.hook import LineageContext
+ from airflow.sdk.lineage import LineageContext
def _lacks_asset_methods(collector):
@@ -62,7 +62,7 @@ def _add_extra_polyfill(collector):
import attr
- from airflow.lineage.hook import HookLineage as _BaseHookLineage
+ from airflow.providers.common.compat.sdk import HookLineage as
_BaseHookLineage
# Add `extra` to HookLineage returned by `collected_assets` property
@attr.define
@@ -229,7 +229,7 @@ def get_hook_lineage_collector():
Airflow 3.0–3.1: Collector has asset-based methods but lacks `add_extra` -
apply single layer.
Airflow 3.2+: Collector has asset-based methods and `add_extra` support -
no action required.
"""
- from airflow.lineage.hook import get_hook_lineage_collector as
get_global_collector
+ from airflow.providers.common.compat.sdk import get_hook_lineage_collector
as get_global_collector
global_collector = get_global_collector()
diff --git a/providers/common/compat/src/airflow/providers/common/compat/sdk.py
b/providers/common/compat/src/airflow/providers/common/compat/sdk.py
index ccb4e715abf..0cfa54dbd35 100644
--- a/providers/common/compat/src/airflow/providers/common/compat/sdk.py
+++ b/providers/common/compat/src/airflow/providers/common/compat/sdk.py
@@ -95,6 +95,13 @@ if TYPE_CHECKING:
TaskDeferred as TaskDeferred,
XComNotFound as XComNotFound,
)
+ from airflow.sdk.lineage import (
+ HookLineage as HookLineage,
+ HookLineageCollector as HookLineageCollector,
+ HookLineageReader as HookLineageReader,
+ NoOpCollector as NoOpCollector,
+ get_hook_lineage_collector as get_hook_lineage_collector,
+ )
from airflow.sdk.listener import get_listener_manager as
get_listener_manager
from airflow.sdk.log import redact as redact
from airflow.sdk.plugins_manager import AirflowPlugin as AirflowPlugin
@@ -126,6 +133,10 @@ _RENAME_MAP: dict[str, tuple[str, str, str]] = {
"AssetAny": ("airflow.sdk", "airflow.datasets", "DatasetAny"),
}
+# Airflow 3-only renames (not available in Airflow 2)
+_AIRFLOW_3_ONLY_RENAMES: dict[str, tuple[str, str, str]] = {}
+
+
# Import map for classes/functions/constants
# Format: class_name -> module_path(s)
# - str: single module path (no fallback)
@@ -235,6 +246,15 @@ _IMPORT_MAP: dict[str, str | tuple[str, ...]] = {
#
============================================================================
"XCOM_RETURN_KEY": "airflow.models.xcom",
#
============================================================================
+ # Lineage
+ #
============================================================================
+ "HookLineageCollector": ("airflow.sdk.lineage", "airflow.lineage.hook"),
+ "HookLineageReader": ("airflow.sdk.lineage", "airflow.lineage.hook"),
+ "get_hook_lineage_collector": ("airflow.sdk.lineage",
"airflow.lineage.hook"),
+ "HookLineage": ("airflow.sdk.lineage", "airflow.lineage.hook"),
+ # Note: AssetLineageInfo is handled by _RENAME_MAP (DatasetLineageInfo ->
AssetLineageInfo)
+ "NoOpCollector": ("airflow.sdk.lineage", "airflow.lineage.hook"),
+ #
============================================================================
# Exceptions (deprecated in airflow.exceptions, prefer SDK)
#
============================================================================
# Note: AirflowException and AirflowNotFoundException are not deprecated,
but exposing them
@@ -279,9 +299,14 @@ _AIRFLOW_3_ONLY_EXCEPTIONS: dict[str, tuple[str, ...]] = {
"DagRunTriggerException": ("airflow.sdk.exceptions", "airflow.exceptions"),
}
-# Add Airflow 3-only exceptions to _IMPORT_MAP if running Airflow 3+
+# Add Airflow 3-only exceptions and renames to _IMPORT_MAP if running Airflow
3+
if AIRFLOW_V_3_0_PLUS:
_IMPORT_MAP.update(_AIRFLOW_3_ONLY_EXCEPTIONS)
+ _RENAME_MAP.update(_AIRFLOW_3_ONLY_RENAMES)
+ # AssetLineageInfo exists in 3.0+ but location changed in 3.2
+ # 3.0-3.1: airflow.lineage.hook.AssetLineageInfo
+ # 3.2+: airflow.sdk.lineage.AssetLineageInfo
+ _IMPORT_MAP["AssetLineageInfo"] = ("airflow.sdk.lineage",
"airflow.lineage.hook")
# Module map: module_name -> module_path(s)
# For entire modules that have been moved (e.g., timezone)
diff --git
a/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py
b/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py
index 9045512531e..fa4612ef8ad 100644
--- a/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py
+++ b/providers/common/compat/tests/unit/common/compat/lineage/test_hook.py
@@ -27,30 +27,58 @@ from tests_common.test_utils.version_compat import
AIRFLOW_V_3_0_PLUS
@pytest.fixture
def collector():
- from airflow.lineage.hook import HookLineageCollector
+ from airflow.providers.common.compat.sdk import HookLineageCollector
- # Patch the "inner" function that the compat version will call
- with mock.patch(
- "airflow.lineage.hook.get_hook_lineage_collector",
- return_value=HookLineageCollector(),
- ):
+ from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS
+
+ hlc = HookLineageCollector()
+
+ if AIRFLOW_V_3_0_PLUS:
+ from unittest import mock
+
+ patch_target = "airflow.lineage.hook.get_hook_lineage_collector"
+ if AIRFLOW_V_3_2_PLUS:
+ patch_target = "airflow.sdk.lineage.get_hook_lineage_collector"
+
+ with mock.patch(patch_target, return_value=hlc):
+ from airflow.providers.common.compat.lineage.hook import
get_hook_lineage_collector
+
+ yield get_hook_lineage_collector()
+ else:
+ from airflow.lineage import hook
from airflow.providers.common.compat.lineage.hook import
get_hook_lineage_collector
+ hook._hook_lineage_collector = hlc
yield get_hook_lineage_collector()
+ hook._hook_lineage_collector = None
@pytest.fixture
def noop_collector():
- from airflow.lineage.hook import NoOpCollector
+ from airflow.providers.common.compat.sdk import NoOpCollector
+
+ from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS
+
+ noop = NoOpCollector()
+
+ if AIRFLOW_V_3_0_PLUS:
+ from unittest import mock
+
+ patch_target = "airflow.lineage.hook.get_hook_lineage_collector"
+ if AIRFLOW_V_3_2_PLUS:
+ patch_target = "airflow.sdk.lineage.get_hook_lineage_collector"
+
+ with mock.patch(patch_target, return_value=noop):
+ from airflow.providers.common.compat.lineage.hook import
get_hook_lineage_collector
- # Patch the "inner" function that the compat version will call
- with mock.patch(
- "airflow.lineage.hook.get_hook_lineage_collector",
- return_value=NoOpCollector(),
- ):
+ yield get_hook_lineage_collector()
+ else:
+ from airflow.lineage import hook
from airflow.providers.common.compat.lineage.hook import
get_hook_lineage_collector
+ hook._hook_lineage_collector = noop
yield get_hook_lineage_collector()
+ hook._hook_lineage_collector = None
@pytest.fixture(params=["collector", "noop_collector"])
diff --git a/providers/openlineage/pyproject.toml
b/providers/openlineage/pyproject.toml
index 06809372ca0..6342ff6a20e 100644
--- a/providers/openlineage/pyproject.toml
+++ b/providers/openlineage/pyproject.toml
@@ -60,7 +60,7 @@ requires-python = ">=3.10"
dependencies = [
"apache-airflow>=2.11.0",
"apache-airflow-providers-common-sql>=1.20.0",
- "apache-airflow-providers-common-compat>=1.13.0",
+ "apache-airflow-providers-common-compat>=1.13.0", # use next version
"attrs>=22.2",
"openlineage-integration-common>=1.41.0",
"openlineage-python>=1.41.0",
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
index 374d8b2f06b..375b0bbf2f1 100644
---
a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
+++
b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
@@ -21,7 +21,7 @@ from airflow.providers.openlineage import conf
# Conditional imports - only load expensive dependencies when plugin is enabled
if not conf.is_disabled():
- from airflow.lineage.hook import HookLineageReader
+ from airflow.providers.common.compat.sdk import HookLineageReader
from airflow.providers.openlineage.plugins.listener import
get_openlineage_listener
from airflow.providers.openlineage.plugins.macros import (
lineage_job_name,
diff --git
a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
index 3989144009e..9e2b1782b81 100644
--- a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
+++ b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
@@ -50,19 +50,25 @@ if TYPE_CHECKING:
@pytest.fixture
def hook_lineage_collector():
- from airflow.lineage import hook
- from airflow.providers.common.compat.lineage.hook import
get_hook_lineage_collector
+ from airflow.providers.common.compat.sdk import HookLineageCollector
- hlc = hook.HookLineageCollector()
+ from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS
+
+ hlc = HookLineageCollector()
+ patch_target = "airflow.lineage.hook.get_hook_lineage_collector"
+ if AIRFLOW_V_3_2_PLUS:
+ patch_target = "airflow.sdk.lineage.get_hook_lineage_collector"
if AIRFLOW_V_3_0_PLUS:
from unittest import mock
- with mock.patch(
- "airflow.lineage.hook.get_hook_lineage_collector",
- return_value=hlc,
- ):
+ with mock.patch(patch_target, return_value=hlc):
+ from airflow.providers.common.compat.lineage.hook import
get_hook_lineage_collector
+
yield get_hook_lineage_collector()
else:
+ from airflow.lineage import hook
+ from airflow.providers.common.compat.lineage.hook import
get_hook_lineage_collector
+
hook._hook_lineage_collector = hlc
yield get_hook_lineage_collector()
diff --git a/task-sdk/src/airflow/sdk/io/path.py
b/task-sdk/src/airflow/sdk/io/path.py
index 611dfe2c8df..3f87e6a1d95 100644
--- a/task-sdk/src/airflow/sdk/io/path.py
+++ b/task-sdk/src/airflow/sdk/io/path.py
@@ -44,7 +44,7 @@ class _TrackingFileWrapper:
self._obj = obj
def __getattr__(self, name):
- from airflow.lineage.hook import get_hook_lineage_collector
+ from airflow.sdk.lineage import get_hook_lineage_collector
if not callable(attr := getattr(self._obj, name)):
return attr
@@ -312,7 +312,7 @@ class ObjectStoragePath(CloudPath):
kwargs: Additional keyword arguments to be passed to the underlying
implementation.
"""
- from airflow.lineage.hook import get_hook_lineage_collector
+ from airflow.sdk.lineage import get_hook_lineage_collector
if isinstance(dst, str):
dst = ObjectStoragePath(dst)
@@ -380,7 +380,7 @@ class ObjectStoragePath(CloudPath):
kwargs: Additional keyword arguments to be passed to the underlying
implementation.
"""
- from airflow.lineage.hook import get_hook_lineage_collector
+ from airflow.sdk.lineage import get_hook_lineage_collector
if isinstance(path, str):
path = ObjectStoragePath(path)
diff --git a/airflow-core/src/airflow/lineage/hook.py
b/task-sdk/src/airflow/sdk/lineage.py
similarity index 97%
rename from airflow-core/src/airflow/lineage/hook.py
rename to task-sdk/src/airflow/sdk/lineage.py
index 7c22a367006..acdf284a596 100644
--- a/airflow-core/src/airflow/lineage/hook.py
+++ b/task-sdk/src/airflow/sdk/lineage.py
@@ -24,10 +24,11 @@ from functools import cache
from typing import TYPE_CHECKING, Any, TypeAlias
import attr
+import structlog
-from airflow.providers_manager import ProvidersManager
+from airflow.sdk.definitions._internal.logging_mixin import LoggingMixin
from airflow.sdk.definitions.asset import Asset
-from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime
if TYPE_CHECKING:
from pydantic.types import JsonValue
@@ -38,6 +39,9 @@ if TYPE_CHECKING:
LineageContext: TypeAlias = BaseHook | ObjectStoragePath
+log = structlog.getLogger(__name__)
+
+
# Maximum number of assets input or output that can be collected in a single
hook execution.
# Input assets and output assets are collected separately.
MAX_COLLECTED_ASSETS = 100
@@ -106,7 +110,7 @@ class HookLineageCollector(LoggingMixin):
self._outputs: dict[str, tuple[Asset, LineageContext]] = {}
self._input_counts: dict[str, int] = defaultdict(int)
self._output_counts: dict[str, int] = defaultdict(int)
- self._asset_factories = ProvidersManager().asset_factories
+ self._asset_factories = ProvidersManagerTaskRuntime().asset_factories
self._extra_counts: dict[str, int] = defaultdict(int)
self._extra: dict[str, tuple[str, Any, LineageContext]] = {}
@@ -335,7 +339,7 @@ class HookLineageReader(LoggingMixin):
@cache
def get_hook_lineage_collector() -> HookLineageCollector:
"""Get singleton lineage collector."""
- from airflow import plugins_manager
+ from airflow.sdk import plugins_manager
if plugins_manager.get_hook_lineage_readers_plugins():
return HookLineageCollector()
diff --git a/task-sdk/src/airflow/sdk/plugins_manager.py
b/task-sdk/src/airflow/sdk/plugins_manager.py
index 2cbdf1a44a4..4f4abbec68b 100644
--- a/task-sdk/src/airflow/sdk/plugins_manager.py
+++ b/task-sdk/src/airflow/sdk/plugins_manager.py
@@ -39,6 +39,7 @@ from airflow.sdk.providers_manager_runtime import
ProvidersManagerTaskRuntime
if TYPE_CHECKING:
from airflow.sdk._shared.listeners.listener import ListenerManager
+ from airflow.sdk.lineage import HookLineageReader
log = logging.getLogger(__name__)
@@ -131,3 +132,14 @@ def integrate_listener_plugins(listener_manager:
ListenerManager) -> None:
"""Add listeners from plugins."""
plugins, _ = _get_plugins()
_integrate_listener_plugins(listener_manager, plugins=plugins)
+
+
+@cache
+def get_hook_lineage_readers_plugins() -> list[type[HookLineageReader]]:
+ """Collect and get hook lineage reader classes registered by plugins."""
+ log.debug("Initialize hook lineage readers plugins")
+ result: list[type[HookLineageReader]] = []
+
+ for plugin in _get_plugins()[0]:
+ result.extend(plugin.hook_lineage_readers)
+ return result
diff --git a/task-sdk/tests/task_sdk/docs/test_public_api.py
b/task-sdk/tests/task_sdk/docs/test_public_api.py
index f53887ec5c9..e8d7dd68e9e 100644
--- a/task-sdk/tests/task_sdk/docs/test_public_api.py
+++ b/task-sdk/tests/task_sdk/docs/test_public_api.py
@@ -63,6 +63,7 @@ def test_airflow_sdk_no_unexpected_exports():
"listener",
"crypto",
"providers_manager_runtime",
+ "lineage",
}
unexpected = actual - public - ignore
assert not unexpected, f"Unexpected exports in airflow.sdk:
{sorted(unexpected)}"
diff --git a/airflow-core/tests/unit/lineage/test_hook.py
b/task-sdk/tests/task_sdk/test_lineage.py
similarity index 98%
rename from airflow-core/tests/unit/lineage/test_hook.py
rename to task-sdk/tests/task_sdk/test_lineage.py
index f5403159247..3bf706cabb4 100644
--- a/airflow-core/tests/unit/lineage/test_hook.py
+++ b/task-sdk/tests/task_sdk/test_lineage.py
@@ -21,9 +21,8 @@ from unittest.mock import MagicMock, patch
import pytest
-from airflow import plugins_manager
-from airflow.lineage import hook
-from airflow.lineage.hook import (
+from airflow.sdk import Asset, BaseHook, plugins_manager
+from airflow.sdk.lineage import (
AssetLineageInfo,
HookLineage,
HookLineageCollector,
@@ -31,8 +30,6 @@ from airflow.lineage.hook import (
NoOpCollector,
get_hook_lineage_collector,
)
-from airflow.sdk import BaseHook
-from airflow.sdk.definitions.asset import Asset
from tests_common.test_utils.mock_plugins import mock_plugin_manager
@@ -137,7 +134,7 @@ class TestHookLineageCollector:
],
)
- @patch("airflow.lineage.hook.Asset")
+ @patch("airflow.sdk.lineage.Asset")
def test_add_input_asset(self, mock_asset, collector):
asset = MagicMock(spec=Asset, extra={})
mock_asset.return_value = asset
@@ -196,7 +193,7 @@ class TestHookLineageCollector:
uri="myscheme://value_1/value_2", name="asset-value_1",
group="test", extra={"key": "value"}
)
- @patch("airflow.lineage.hook.ProvidersManager")
+ @patch("airflow.sdk.lineage.ProvidersManagerTaskRuntime")
def test_create_asset_no_factory(self, mock_providers_manager, collector):
test_scheme = "myscheme"
mock_providers_manager.return_value.asset_factories = {}
@@ -215,7 +212,7 @@ class TestHookLineageCollector:
is None
)
- @patch("airflow.lineage.hook.ProvidersManager")
+ @patch("airflow.sdk.lineage.ProvidersManagerTaskRuntime")
def test_create_asset_factory_exception(self, mock_providers_manager,
collector):
def create_asset(extra=None, **kwargs):
raise RuntimeError("Factory error")
@@ -873,7 +870,7 @@ class FakePlugin(plugins_manager.AirflowPlugin):
)
def test_get_hook_lineage_collector(has_readers, expected_class):
# reset cached instance
- hook.get_hook_lineage_collector.cache_clear()
+ get_hook_lineage_collector.cache_clear()
plugins = [FakePlugin()] if has_readers else []
with mock_plugin_manager(plugins=plugins):
assert isinstance(get_hook_lineage_collector(), expected_class)