This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cfb08bb3bef Remove auto lineage from Airflow (#48421)
cfb08bb3bef is described below
commit cfb08bb3befee1b1ed642ad8a50a3d0f3a53a100
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Mon Mar 31 17:35:17 2025 +0800
Remove auto lineage from Airflow (#48421)
This never really caught up, and we have other ways to do lineage now.
The airflow.lineage.hook module is still available.
Usage in Papermill is also kept since we want it to work on 2.x for now.
---
.../docs/administration-and-deployment/lineage.rst | 95 +---------
airflow-core/newsfragments/48388.significant.rst | 18 ++
airflow-core/src/airflow/lineage/__init__.py | 141 --------------
airflow-core/src/airflow/lineage/backend.py | 46 -----
airflow-core/src/airflow/models/baseoperator.py | 17 +-
airflow-core/tests/unit/lineage/test_lineage.py | 203 ---------------------
.../tests/unit/models/test_baseoperator.py | 64 ++-----
providers/openlineage/docs/guides/developer.rst | 97 ----------
.../providers/papermill/operators/papermill.py | 31 +---
.../system/papermill/example_papermill_verify.py | 7 +-
.../unit/papermill/operators/test_papermill.py | 4 -
task-sdk/src/airflow/sdk/bases/operator.py | 21 ---
.../src/airflow/sdk/definitions/mappedoperator.py | 3 -
13 files changed, 55 insertions(+), 692 deletions(-)
diff --git a/airflow-core/docs/administration-and-deployment/lineage.rst
b/airflow-core/docs/administration-and-deployment/lineage.rst
index 0219d7b7444..c914e0aa5b2 100644
--- a/airflow-core/docs/administration-and-deployment/lineage.rst
+++ b/airflow-core/docs/administration-and-deployment/lineage.rst
@@ -22,76 +22,6 @@ Lineage
.. note:: Lineage support is very experimental and subject to change.
-Airflow can help track origins of data, what happens to it and where it moves
over time. This can aid having
-audit trails and data governance, but also debugging of data flows.
-
-Airflow tracks data by means of inlets and outlets of the tasks. Let's work
from an example and see how it
-works.
-
-.. code-block:: python
-
- import datetime
- import pendulum
-
- from airflow.lineage import AUTO
- from airflow.models import DAG
- from airflow.providers.common.compat.lineage.entities import File
- from airflow.providers.standard.operators.bash import BashOperator
- from airflow.providers.standard.operators.empty import EmptyOperator
-
- FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]
-
- dag = DAG(
- dag_id="example_lineage",
- start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
- schedule="0 0 * * *",
- catchup=False,
- dagrun_timeout=datetime.timedelta(minutes=60),
- )
-
- f_final = File(url="/tmp/final")
- run_this_last = EmptyOperator(task_id="run_this_last", dag=dag,
inlets=AUTO, outlets=f_final)
-
- f_in = File(url="/tmp/whole_directory/")
- outlets = []
- for file in FILE_CATEGORIES:
- f_out = File(url="/tmp/{}/{{{{ data_interval_start }}}}".format(file))
- outlets.append(f_out)
-
- run_this = BashOperator(task_id="run_me_first", bash_command="echo 1",
dag=dag, inlets=f_in, outlets=outlets)
- run_this.set_downstream(run_this_last)
-
-Inlets can be a (list of) upstream task ids or statically defined as an attr
annotated object
-as is, for example, the ``File`` object. Outlets can only be attr annotated
object. Both are rendered
-at run time. However, the outlets of a task in case they are inlets to another
task will not be re-rendered
-for the downstream task.
-
-.. note:: Operators can add inlets and outlets automatically if the operator
supports it.
-
-In the example DAG task ``run_this`` (``task_id=run_me_first``) is a
BashOperator that takes 3 inlets: ``CAT1``, ``CAT2``, ``CAT3``, that are
-generated from a list. Note that ``data_interval_start`` is a templated field
and will be rendered when the task is running.
-
-.. note:: Behind the scenes Airflow prepares the lineage metadata as part of
the ``pre_execute`` method of a task. When the task
- has finished execution ``post_execute`` is called and lineage
metadata is pushed into XCOM. Thus if you are creating
- your own operators that override this method make sure to decorate
your method with ``prepare_lineage`` and ``apply_lineage``
- respectively.
-
-Shorthand notation
-------------------
-
-Shorthand notation is available as well, this works almost equal to unix
command line pipes, inputs and outputs.
-Note that operator precedence_ still applies. Also the ``|`` operator will
only work when the left hand side either
-has outlets defined (e.g. by using ``add_outlets(..)`` or has out of the box
support of lineage ``operator.supports_lineage == True``.
-
-.. code-block:: python
-
- f_in > run_this | (run_this_last > outlets)
-
-.. _precedence: https://docs.python.org/3/reference/expressions.html
-
-Hook Lineage
-------------
-
Airflow provides a powerful feature for tracking data lineage not only between
tasks but also from hooks used within those tasks.
This functionality helps you understand how data flows throughout your Airflow
pipelines.
@@ -101,7 +31,7 @@ The collector then uses this data to construct AIP-60
compliant Assets, a standa
.. code-block:: python
- from airflow.lineage.hook.lineage import get_hook_lineage_collector
+ from airflow.lineage.hook import get_hook_lineage_collector
class CustomHook(BaseHook):
@@ -131,26 +61,3 @@ which is registered in an Airflow plugin.
If no ``HookLineageReader`` is registered within Airflow, a default
``NoOpCollector`` is used instead.
This collector does not create AIP-60 compliant assets or collect lineage
information.
-
-
-Lineage Backend
----------------
-
-It's possible to push the lineage metrics to a custom backend by providing an
instance of a LineageBackend in the config:
-
-.. code-block:: ini
-
- [lineage]
- backend = my.lineage.CustomBackend
-
-The backend should inherit from ``airflow.lineage.LineageBackend``.
-
-.. code-block:: python
-
- from airflow.lineage.backend import LineageBackend
-
-
- class CustomBackend(LineageBackend):
- def send_lineage(self, operator, inlets=None, outlets=None,
context=None):
- ...
- # Send the info to some external service
diff --git a/airflow-core/newsfragments/48388.significant.rst
b/airflow-core/newsfragments/48388.significant.rst
new file mode 100644
index 00000000000..cc60bb76862
--- /dev/null
+++ b/airflow-core/newsfragments/48388.significant.rst
@@ -0,0 +1,18 @@
+Task-level auto lineage collection is removed
+
+The ``prepare_lineage``, ``apply_lineage`` mechanism, along with the custom
+lineage backend type that supports it, has been removed. This has been an
+experimental feature that never caught on.
+
+The ``airflow.lineage.hook`` submodule is not affected.
+
+* Types of change
+
+ * [x] Dag changes
+ * [ ] Config changes
+ * [ ] API changes
+ * [ ] CLI changes
+ * [ ] Behaviour changes
+ * [ ] Plugin changes
+ * [ ] Dependency changes
+ * [x] Code interface changes
diff --git a/airflow-core/src/airflow/lineage/__init__.py
b/airflow-core/src/airflow/lineage/__init__.py
index 2fedfbd57a9..217e5db9607 100644
--- a/airflow-core/src/airflow/lineage/__init__.py
+++ b/airflow-core/src/airflow/lineage/__init__.py
@@ -15,144 +15,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Provides lineage support functions."""
-
-from __future__ import annotations
-
-import logging
-from functools import wraps
-from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast
-
-from airflow.configuration import conf
-from airflow.lineage.backend import LineageBackend
-from airflow.utils.session import create_session
-
-if TYPE_CHECKING:
- from airflow.sdk.definitions.context import Context
-
-PIPELINE_OUTLETS = "pipeline_outlets"
-PIPELINE_INLETS = "pipeline_inlets"
-AUTO = "auto"
-
-log = logging.getLogger(__name__)
-
-
-def get_backend() -> LineageBackend | None:
- """Get the lineage backend if defined in the configs."""
- clazz = conf.getimport("lineage", "backend", fallback=None)
-
- if clazz:
- if not issubclass(clazz, LineageBackend):
- raise TypeError(
- f"Your custom Lineage class `{clazz.__name__}` "
- f"is not a subclass of `{LineageBackend.__name__}`."
- )
- else:
- return clazz()
-
- return None
-
-
-def _render_object(obj: Any, context: Context) -> dict:
- ti = context["ti"]
- if TYPE_CHECKING:
- assert ti.task
- return ti.task.render_template(obj, context)
-
-
-T = TypeVar("T", bound=Callable)
-
-
-def apply_lineage(func: T) -> T:
- """
- Conditionally send lineage to the backend.
-
- Saves the lineage to XCom and if configured to do so sends it
- to the backend.
- """
- _backend = get_backend()
-
- @wraps(func)
- def wrapper(self, context, *args, **kwargs):
- self.log.debug("Lineage called with inlets: %s, outlets: %s",
self.inlets, self.outlets)
-
- ret_val = func(self, context, *args, **kwargs)
-
- outlets = list(self.outlets)
- inlets = list(self.inlets)
-
- if outlets:
- self.xcom_push(context, key=PIPELINE_OUTLETS, value=outlets)
-
- if inlets:
- self.xcom_push(context, key=PIPELINE_INLETS, value=inlets)
-
- if _backend:
- _backend.send_lineage(operator=self, inlets=self.inlets,
outlets=self.outlets, context=context)
-
- return ret_val
-
- return cast("T", wrapper)
-
-
-def prepare_lineage(func: T) -> T:
- """
- Prepare the lineage inlets and outlets.
-
- Inlets can be:
-
- * "auto" -> picks up any outlets from direct upstream tasks that have
outlets defined, as such that
- if A -> B -> C and B does not have outlets but A does, these are
provided as inlets.
- * "list of task_ids" -> picks up outlets from the upstream task_ids
- * "list of datasets" -> manually defined list of dataset
-
- """
-
- @wraps(func)
- def wrapper(self, context, *args, **kwargs):
- from airflow.models.abstractoperator import AbstractOperator
-
- self.log.debug("Preparing lineage inlets and outlets")
-
- if isinstance(self.inlets, (str, AbstractOperator)):
- self.inlets = [self.inlets]
-
- if self.inlets and isinstance(self.inlets, list):
- # get task_ids that are specified as parameter and make sure they
are upstream
- task_ids = {o for o in self.inlets if isinstance(o, str)}.union(
- op.task_id for op in self.inlets if isinstance(op,
AbstractOperator)
- ).intersection(self.get_flat_relative_ids(upstream=True))
-
- # pick up unique direct upstream task_ids if AUTO is specified
- if AUTO.upper() in self.inlets or AUTO.lower() in self.inlets:
- task_ids =
task_ids.union(task_ids.symmetric_difference(self.upstream_task_ids))
-
- # Remove auto and task_ids
- self.inlets = [i for i in self.inlets if not isinstance(i, str)]
-
- # We manually create a session here since xcom_pull returns a
- # LazySelectSequence proxy. If we do not pass a session, a new one
- # will be created, but that session will not be properly closed.
- # After we are done iterating, we can safely close this session.
- with create_session() as session:
- _inlets = self.xcom_pull(
- context, task_ids=task_ids, dag_id=self.dag_id,
key=PIPELINE_OUTLETS, session=session
- )
- self.inlets.extend(i for it in _inlets for i in it)
-
- elif self.inlets:
- raise AttributeError("inlets is not a list, operator, string or
attr annotated object")
-
- if not isinstance(self.outlets, list):
- self.outlets = [self.outlets]
-
- # render inlets and outlets
- self.inlets = [_render_object(i, context) for i in self.inlets]
-
- self.outlets = [_render_object(i, context) for i in self.outlets]
-
- self.log.debug("inlets: %s, outlets: %s", self.inlets, self.outlets)
-
- return func(self, context, *args, **kwargs)
-
- return cast("T", wrapper)
diff --git a/airflow-core/src/airflow/lineage/backend.py
b/airflow-core/src/airflow/lineage/backend.py
deleted file mode 100644
index 25d1bd4b07e..00000000000
--- a/airflow-core/src/airflow/lineage/backend.py
+++ /dev/null
@@ -1,46 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""Sends lineage metadata to a backend."""
-
-from __future__ import annotations
-
-from typing import TYPE_CHECKING
-
-if TYPE_CHECKING:
- from airflow.models.baseoperator import BaseOperator
-
-
-class LineageBackend:
- """Sends lineage metadata to a backend."""
-
- def send_lineage(
- self,
- operator: BaseOperator,
- inlets: list | None = None,
- outlets: list | None = None,
- context: dict | None = None,
- ):
- """
- Send lineage metadata to a backend.
-
- :param operator: the operator executing a transformation on the inlets
and outlets
- :param inlets: the inlets to this operator
- :param outlets: the outlets from this operator
- :param context: the current context of the task instance
- """
- raise NotImplementedError()
diff --git a/airflow-core/src/airflow/models/baseoperator.py
b/airflow-core/src/airflow/models/baseoperator.py
index f052030f8df..80234bd7cf9 100644
--- a/airflow-core/src/airflow/models/baseoperator.py
+++ b/airflow-core/src/airflow/models/baseoperator.py
@@ -35,10 +35,7 @@ import pendulum
from sqlalchemy import select
from sqlalchemy.orm.exc import NoResultFound
-from airflow.exceptions import (
- AirflowException,
-)
-from airflow.lineage import apply_lineage, prepare_lineage
+from airflow.exceptions import AirflowException
# Keeping this file at all is a temp thing as we migrate the repo to the task
sdk as the base, but to keep
# main working and useful for others to develop against we use the TaskSDK
here but keep this file around
@@ -372,7 +369,6 @@ class BaseOperator(TaskSDKBaseOperator, AbstractOperator):
extended/overridden by subclasses.
"""
- @prepare_lineage
def pre_execute(self, context: Any):
"""Execute right before self.execute() is called."""
if self._pre_execute_hook is None:
@@ -386,7 +382,16 @@ class BaseOperator(TaskSDKBaseOperator, AbstractOperator):
logger=self.log,
).run(context)
- @apply_lineage
+ def execute(self, context: Context) -> Any:
+ """
+ Derive when creating an operator.
+
+ Context is the same dictionary used as when rendering jinja templates.
+
+ Refer to get_template_context for more context.
+ """
+ raise NotImplementedError()
+
def post_execute(self, context: Any, result: Any = None):
"""
Execute right after self.execute() is called.
diff --git a/airflow-core/tests/unit/lineage/test_lineage.py
b/airflow-core/tests/unit/lineage/test_lineage.py
deleted file mode 100644
index a45ec783b3d..00000000000
--- a/airflow-core/tests/unit/lineage/test_lineage.py
+++ /dev/null
@@ -1,203 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from unittest import mock
-
-import attr
-import pytest
-
-from airflow.lineage import AUTO, apply_lineage, get_backend, prepare_lineage
-from airflow.lineage.backend import LineageBackend
-from airflow.models import TaskInstance as TI
-from airflow.providers.common.compat.lineage.entities import File
-from airflow.providers.standard.operators.empty import EmptyOperator
-from airflow.sdk.definitions.context import Context
-from airflow.utils import timezone
-from airflow.utils.types import DagRunType
-
-from tests_common.test_utils.config import conf_vars
-
-pytestmark = pytest.mark.db_test
-
-
-DEFAULT_DATE = timezone.datetime(2016, 1, 1)
-
-
-# helper
[email protected]
-class A:
- pass
-
-
-class CustomLineageBackend(LineageBackend):
- def send_lineage(self, operator, inlets=None, outlets=None, context=None):
- pass
-
-
-class TestLineage:
- def test_lineage(self, dag_maker):
- f1s = "/tmp/does_not_exist_1-{}"
- f2s = "/tmp/does_not_exist_2-{}"
- f3s = "/tmp/does_not_exist_3"
- file1 = File(f1s.format("{{ ds }}"))
- file2 = File(f2s.format("{{ ds }}"))
- file3 = File(f3s)
-
- with dag_maker(dag_id="test_prepare_lineage", start_date=DEFAULT_DATE)
as dag:
- op1 = EmptyOperator(
- task_id="leave1",
- inlets=file1,
- outlets=[
- file2,
- ],
- )
- op2 = EmptyOperator(task_id="leave2")
- op3 = EmptyOperator(task_id="upstream_level_1", inlets=AUTO,
outlets=file3)
- op4 = EmptyOperator(task_id="upstream_level_2")
- op5 = EmptyOperator(task_id="upstream_level_3", inlets=["leave1",
"upstream_level_1"])
-
- op1.set_downstream(op3)
- op2.set_downstream(op3)
- op3.set_downstream(op4)
- op4.set_downstream(op5)
-
- dag.clear()
- dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
-
- ctx1 = Context({"ti": TI(task=op1, run_id=dag_run.run_id), "ds":
DEFAULT_DATE})
- ctx2 = Context({"ti": TI(task=op2, run_id=dag_run.run_id), "ds":
DEFAULT_DATE})
- ctx3 = Context({"ti": TI(task=op3, run_id=dag_run.run_id), "ds":
DEFAULT_DATE})
- ctx5 = Context({"ti": TI(task=op5, run_id=dag_run.run_id), "ds":
DEFAULT_DATE})
-
- # prepare with manual inlets and outlets
- op1.pre_execute(ctx1)
-
- assert len(op1.inlets) == 1
- assert op1.inlets[0].url == f1s.format(DEFAULT_DATE)
-
- assert len(op1.outlets) == 1
- assert op1.outlets[0].url == f2s.format(DEFAULT_DATE)
-
- # post process with no backend
- op1.post_execute(ctx1)
-
- op2.pre_execute(ctx2)
- assert len(op2.inlets) == 0
- op2.post_execute(ctx2)
-
- op3.pre_execute(ctx3)
- assert len(op3.inlets) == 1
- assert isinstance(op3.inlets[0], File)
- assert op3.inlets[0].url == f2s.format(DEFAULT_DATE)
- assert op3.outlets[0] == file3
- op3.post_execute(ctx3)
-
- # skip 4
-
- op5.pre_execute(ctx5)
- # Task IDs should be removed from the inlets, replaced with the
outlets of those tasks
- assert sorted(op5.inlets) == [file2, file3]
- op5.post_execute(ctx5)
-
- def test_lineage_render(self, dag_maker):
- # tests inlets / outlets are rendered if they are added
- # after initialization
- with dag_maker(dag_id="test_lineage_render", start_date=DEFAULT_DATE):
- op1 = EmptyOperator(task_id="task1")
- dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
-
- f1s = "/tmp/does_not_exist_1-{}"
- file1 = File(f1s.format("{{ ds }}"))
-
- op1.inlets.append(file1)
- op1.outlets.append(file1)
-
- # logical_date is set in the context in order to avoid creating task
instances
- ctx1 = Context({"ti": TI(task=op1, run_id=dag_run.run_id), "ds":
DEFAULT_DATE})
-
- op1.pre_execute(ctx1)
- assert op1.inlets[0].url == f1s.format(DEFAULT_DATE)
- assert op1.outlets[0].url == f1s.format(DEFAULT_DATE)
-
- def test_attr_outlet(self, dag_maker):
- a = A()
-
- f3s = "/tmp/does_not_exist_3"
- file3 = File(f3s)
-
- with dag_maker(dag_id="test_prepare_lineage", start_date=DEFAULT_DATE):
- op1 = EmptyOperator(
- task_id="leave1",
- outlets=[a, file3],
- )
- op2 = EmptyOperator(task_id="leave2", inlets="auto")
-
- op1 >> op2
-
- dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
-
- ctx1 = Context({"ti": TI(task=op1, run_id=dag_run.run_id), "ds":
DEFAULT_DATE})
- ctx2 = Context({"ti": TI(task=op2, run_id=dag_run.run_id), "ds":
DEFAULT_DATE})
-
- # prepare with manual inlets and outlets
- op1.pre_execute(ctx1)
- op1.post_execute(ctx1)
-
- op2.pre_execute(ctx2)
- assert op2.inlets == [a, file3]
- op2.post_execute(ctx2)
-
- @mock.patch("airflow.lineage.get_backend")
- def test_lineage_is_sent_to_backend(self, mock_get_backend, dag_maker):
- class TestBackend(LineageBackend):
- def send_lineage(self, operator, inlets=None, outlets=None,
context=None):
- assert len(inlets) == 1
- assert len(outlets) == 1
-
- func = mock.Mock()
- func.__name__ = "foo"
-
- mock_get_backend.return_value = TestBackend()
-
- with dag_maker(dag_id="test_lineage_is_sent_to_backend",
start_date=DEFAULT_DATE):
- op1 = EmptyOperator(task_id="task1")
- dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
-
- file1 = File("/tmp/some_file")
-
- op1.inlets.append(file1)
- op1.outlets.append(file1)
-
- (ti,) = dag_run.task_instances
- ctx1 = Context({"ti": ti, "ds": DEFAULT_DATE})
-
- prep = prepare_lineage(func)
- prep(op1, ctx1)
- post = apply_lineage(func)
- post(op1, ctx1)
-
- def test_empty_lineage_backend(self):
- backend = get_backend()
- assert backend is None
-
- @conf_vars({("lineage", "backend"):
"unit.lineage.test_lineage.CustomLineageBackend"})
- def test_resolve_lineage_class(self):
- backend = get_backend()
- assert issubclass(backend.__class__, LineageBackend)
- assert isinstance(backend, CustomLineageBackend)
diff --git a/airflow-core/tests/unit/models/test_baseoperator.py
b/airflow-core/tests/unit/models/test_baseoperator.py
index c8b01e22c1a..ea9d5162f5d 100644
--- a/airflow-core/tests/unit/models/test_baseoperator.py
+++ b/airflow-core/tests/unit/models/test_baseoperator.py
@@ -20,6 +20,7 @@ from __future__ import annotations
import copy
from collections import defaultdict
from datetime import datetime
+from unittest import mock
import pytest
@@ -32,7 +33,6 @@ from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.models.trigger import TriggerFailureReason
-from airflow.providers.common.compat.lineage.entities import File
from airflow.providers.common.sql.operators import sql
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule
@@ -113,53 +113,21 @@ class TestBaseOperator:
except Exception as e:
pytest.fail(f"Exception raised: {e}")
- def test_lineage_composition(self):
- """
- Test composition with lineage
- """
- inlet = File(url="in")
- outlet = File(url="out")
- dag = DAG("test-dag", schedule=None, start_date=DEFAULT_DATE)
- task1 = BaseOperator(task_id="op1", dag=dag)
- task2 = BaseOperator(task_id="op2", dag=dag)
-
- # mock
- task1.supports_lineage = True
-
- # note: operator precedence still applies
- inlet > task1 | (task2 > outlet)
-
- assert task1.get_inlet_defs() == [inlet]
- assert task2.get_inlet_defs() == [task1.task_id]
- assert task2.get_outlet_defs() == [outlet]
-
- fail = ClassWithCustomAttributes()
- with pytest.raises(TypeError):
- fail > task1
- with pytest.raises(TypeError):
- task1 > fail
- with pytest.raises(TypeError):
- fail | task1
- with pytest.raises(TypeError):
- task1 | fail
-
- task3 = BaseOperator(task_id="op3", dag=dag)
- extra = File(url="extra")
- [inlet, extra] > task3
-
- assert task3.get_inlet_defs() == [inlet, extra]
-
- task1.supports_lineage = False
- with pytest.raises(ValueError):
- task1 | task3
-
- assert task2.supports_lineage is False
- task2 | task3
- assert len(task3.get_inlet_defs()) == 3
-
- task4 = BaseOperator(task_id="op4", dag=dag)
- task4 > [inlet, outlet, extra]
- assert task4.get_outlet_defs() == [inlet, outlet, extra]
+ def test_pre_execute_hook(self):
+ hook = mock.MagicMock()
+
+ op = BaseOperator(task_id="test_task", pre_execute=hook)
+ op_copy = op.prepare_for_execution()
+ op_copy.pre_execute({})
+ assert hook.called
+
+ def test_post_execute_hook(self):
+ hook = mock.MagicMock()
+
+ op = BaseOperator(task_id="test_task", post_execute=hook)
+ op_copy = op.prepare_for_execution()
+ op_copy.post_execute({})
+ assert hook.called
def test_task_naive_datetime(self):
naive_datetime = DEFAULT_DATE.replace(tzinfo=None)
diff --git a/providers/openlineage/docs/guides/developer.rst
b/providers/openlineage/docs/guides/developer.rst
index b0722e1a248..0485e430482 100644
--- a/providers/openlineage/docs/guides/developer.rst
+++ b/providers/openlineage/docs/guides/developer.rst
@@ -31,10 +31,6 @@ There might be some Operators that you can not modify (f.e.
third party provider
To handle this situation, OpenLineage allows you to provide custom Extractor
for any Operator.
See :ref:`custom_extractors:openlineage` for more details.
-If all of the above can not be implemented, as a fallback, there is a way to
manually annotate lineage.
-Airflow allows Operators to track lineage by specifying the input and outputs
of the Operators via inlets and outlets.
-See :ref:`inlets_outlets:openlineage` for more details.
-
.. _extraction_precedence:openlineage:
Extraction precedence
@@ -360,99 +356,6 @@ For more examples of OpenLineage Extractors, check out the
source code of
`BashExtractor
<https://github.com/apache/airflow/blob/main/providers/amazon/aws/src/airflow/providers/openlineage/extractors/bash.py>`_
or
`PythonExtractor
<https://github.com/apache/airflow/blob/main/providers/amazon/aws/src/airflow/providers/openlineage/extractors/python.py>`_.
-.. _inlets_outlets:openlineage:
-
-Manually annotated lineage
-==========================
-
-This approach is rarely recommended, only in very specific cases, when it's
impossible to extract some lineage information from the Operator itself.
-If you want to extract lineage from your own Operators, you may prefer
directly implementing OpenLineage methods as described in
:ref:`openlineage_methods:openlineage`.
-When dealing with Operators that you can not modify (f.e. third party
providers), but still want the lineage to be extracted from them, see
:ref:`custom_extractors:openlineage`.
-
-Airflow allows Operators to track lineage by specifying the input and outputs
of the Operators via
-`inlets and outlets
<https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/lineage.html#lineage>`_.
-OpenLineage will, by default, use inlets and outlets as input/output datasets
if it cannot find any successful extraction from the OpenLineage methods or the
Extractors.
-
-Airflow supports inlets and outlets to be either a Table, Column, File or User
entity and so does OpenLineage.
-
-Example
-^^^^^^^
-
-An Operator inside the Airflow DAG can be annotated with inlets and outlets
like in the below example:
-
-.. code-block:: python
-
- """Example DAG demonstrating the usage of the extraction via Inlets and
Outlets."""
-
- import pendulum
-
- from airflow import DAG
- from airflow.providers.common.compat.lineage.entities import Table, File,
Column, User
- from airflow.providers.standard.operators.bash import BashOperator
-
-
- t1 = Table(
- cluster="c1",
- database="d1",
- name="t1",
- owners=[User(email="[email protected]", first_name="Joe", last_name="Doe")],
- )
- t2 = Table(
- cluster="c1",
- database="d1",
- name="t2",
- columns=[
- Column(name="col1", description="desc1", data_type="type1"),
- Column(name="col2", description="desc2", data_type="type2"),
- ],
- owners=[
- User(email="[email protected]", first_name="Mike",
last_name="Smith"),
- User(email="[email protected]", first_name="Theo"),
- User(email="[email protected]", last_name="Smith"),
- User(email="[email protected]"),
- ],
- )
- t3 = Table(
- cluster="c1",
- database="d1",
- name="t3",
- columns=[
- Column(name="col3", description="desc3", data_type="type3"),
- Column(name="col4", description="desc4", data_type="type4"),
- ],
- )
- t4 = Table(cluster="c1", database="d1", name="t4")
- f1 = File(url="s3://bucket/dir/file1")
-
-
- with DAG(
- dag_id="example_operator",
- schedule="@once",
- start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
- ) as dag:
- task1 = BashOperator(
- task_id="task_1_with_inlet_outlet",
- bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
- inlets=[t1, t2],
- outlets=[t3],
- )
-
- task2 = BashOperator(
- task_id="task_2_with_inlet_outlet",
- bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
- inlets=[t3, f1],
- outlets=[t4],
- )
-
- task1 >> task2
-
- if __name__ == "__main__":
- dag.cli()
-
-Conversion from Airflow Table entity to OpenLineage Dataset is made in the
following way:
-- ``CLUSTER`` of the table entity becomes the namespace of OpenLineage's
Dataset
-- The name of the dataset is formed by ``{{DATABASE}}.{{NAME}}`` where
``DATABASE`` and ``NAME`` are attributes specified by Airflow's Table entity.
-
.. _custom_facets:openlineage:
Custom Facets
diff --git
a/providers/papermill/src/airflow/providers/papermill/operators/papermill.py
b/providers/papermill/src/airflow/providers/papermill/operators/papermill.py
index 44f6db12e9c..4dffe10aa57 100644
--- a/providers/papermill/src/airflow/providers/papermill/operators/papermill.py
+++ b/providers/papermill/src/airflow/providers/papermill/operators/papermill.py
@@ -17,7 +17,6 @@
# under the License.
from __future__ import annotations
-import subprocess
from collections.abc import Collection, Sequence
from functools import cached_property
from typing import TYPE_CHECKING, ClassVar
@@ -27,6 +26,7 @@ import papermill as pm
from airflow.models import BaseOperator
from airflow.providers.common.compat.lineage.entities import File
+from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.papermill.hooks.kernel import REMOTE_KERNEL_ENGINE,
KernelHook
if TYPE_CHECKING:
@@ -60,6 +60,7 @@ class PapermillOperator(BaseOperator):
(ignores kernel name in the notebook document metadata)
"""
+ # TODO: Remove this when provider drops 2.x support.
supports_lineage = True
template_fields: Sequence[str] = (
@@ -108,8 +109,9 @@ class PapermillOperator(BaseOperator):
self.input_nb = NoteBook(url=self.input_nb,
parameters=self.parameters) # type: ignore[call-arg]
if not isinstance(self.output_nb, NoteBook):
self.output_nb = NoteBook(url=self.output_nb) # type:
ignore[call-arg]
- self.inlets.append(self.input_nb)
- self.outlets.append(self.output_nb)
+ if not AIRFLOW_V_3_0_PLUS:
+ self.inlets.append(self.input_nb)
+ self.outlets.append(self.output_nb)
remote_kernel_kwargs = {}
kernel_hook = self.hook
if kernel_hook:
@@ -139,28 +141,7 @@ class PapermillOperator(BaseOperator):
**remote_kernel_kwargs,
)
- # Convert the executed notebook to HTML using nbconvert
- if self.nbconvert:
- nbconvert_args = self.nbconvert_args or []
- if not isinstance(nbconvert_args, list):
- raise ValueError("nbconvert_args must be a list")
-
- # Build the nbconvert command
- command = [
- "jupyter",
- "nbconvert",
- "--to",
- "html",
- "--log-level",
- "WARN",
- self.output_nb.url,
- ] + nbconvert_args
- try:
- subprocess.run(command, check=True)
- self.log.info("Output HTML: %s",
self.output_nb.url.replace(".ipynb", ".html"))
- except subprocess.CalledProcessError as e:
- self.log.error("nbconvert failed with output:\n%s", e.stdout)
- raise
+ return self.output_nb
@cached_property
def hook(self) -> KernelHook | None:
diff --git
a/providers/papermill/tests/system/papermill/example_papermill_verify.py
b/providers/papermill/tests/system/papermill/example_papermill_verify.py
index 347eab2184e..3e5648d9e0b 100644
--- a/providers/papermill/tests/system/papermill/example_papermill_verify.py
+++ b/providers/papermill/tests/system/papermill/example_papermill_verify.py
@@ -30,7 +30,6 @@ import scrapbook as sb
from airflow import DAG
from airflow.decorators import task
-from airflow.lineage import AUTO
from airflow.providers.papermill.operators.papermill import PapermillOperator
START_DATE = datetime(2021, 1, 1)
@@ -42,11 +41,11 @@ DAG_ID = "example_papermill_operator_verify"
# [START howto_verify_operator_papermill]
@task
-def check_notebook(inlets, logical_date):
+def check_notebook(output, logical_date):
"""
Verify the message in the notebook
"""
- notebook = sb.read_notebook(inlets[0].url)
+ notebook = sb.read_notebook(output.url)
message = notebook.scraps["message"]
print(f"Message in notebook {message} for {logical_date}")
@@ -70,7 +69,7 @@ with DAG(
parameters={"msgs": "Ran from Airflow at {{ logical_date }}!"},
)
- run_this >> check_notebook(inlets=AUTO, logical_date="{{ logical_date }}")
+ check_notebook(output=run_this.output, logical_date="{{ logical_date }}")
# [END howto_verify_operator_papermill]
from tests_common.test_utils.system_tests import get_test_run # noqa: E402
diff --git
a/providers/papermill/tests/unit/papermill/operators/test_papermill.py
b/providers/papermill/tests/unit/papermill/operators/test_papermill.py
index 94d0b980ef6..82e75be90f6 100644
--- a/providers/papermill/tests/unit/papermill/operators/test_papermill.py
+++ b/providers/papermill/tests/unit/papermill/operators/test_papermill.py
@@ -88,10 +88,6 @@ class TestPapermillOperator:
assert op.input_nb.url == TEST_INPUT_URL # type: ignore
assert op.output_nb.url == TEST_OUTPUT_URL # type: ignore
- # Test render Lineage inlets/outlets
- assert op.inlets[0] == op.input_nb
- assert op.outlets[0] == op.output_nb
-
@patch("airflow.providers.papermill.operators.papermill.pm")
def test_execute(self, mock_papermill):
in_nb = "/tmp/does_not_exist"
diff --git a/task-sdk/src/airflow/sdk/bases/operator.py
b/task-sdk/src/airflow/sdk/bases/operator.py
index 87e0e79c61e..6450560b346 100644
--- a/task-sdk/src/airflow/sdk/bases/operator.py
+++ b/task-sdk/src/airflow/sdk/bases/operator.py
@@ -891,9 +891,6 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
"executor",
}
- # Defines if the operator supports lineage without manual definitions
- supports_lineage: bool = False
-
# If True, the Rendered Template fields will be overwritten in DB after
execution
# This is useful for Taskflow decorators that modify the template fields
during execution like
# @task.bash decorator.
@@ -1200,24 +1197,6 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
hash_components.append(repr(val))
return hash(tuple(hash_components))
- # including lineage information
- def __or__(self, other):
- """
- Return [This Operator] | [Operator].
-
- The inlets of other will be set to pick up the outlets from this
operator.
- Other will be set as a downstream task of this operator.
- """
- if isinstance(other, BaseOperator):
- if not self.outlets and not self.supports_lineage:
- raise ValueError("No outlets defined for this operator")
- other.add_inlets([self.task_id])
- self.set_downstream(other)
- else:
- raise TypeError(f"Right hand side ({other}) is not an Operator")
-
- return self
-
# /Composing Operators ---------------------------------------------
def __gt__(self, other):
diff --git a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
index 9fed450ca75..0b35bdffb30 100644
--- a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
+++ b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
@@ -321,8 +321,6 @@ class MappedOperator(AbstractOperator):
This should be a name to call ``getattr()`` on.
"""
- supports_lineage: bool = False
-
HIDE_ATTRS_FROM_UI: ClassVar[frozenset[str]] =
AbstractOperator.HIDE_ATTRS_FROM_UI | frozenset(
("parse_time_mapped_ti_count", "operator_class", "start_trigger_args",
"start_from_trigger")
)
@@ -365,7 +363,6 @@ class MappedOperator(AbstractOperator):
"expand_input", # This is needed to be able to accept XComArg.
"task_group",
"upstream_task_ids",
- "supports_lineage",
"_is_setup",
"_is_teardown",
"_on_failure_fail_dagrun",