This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-8-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 4454fc870cda6924cdac100c3ca8e81d53716d40 Author: Andrey Anshin <andrey.ans...@taragol.is> AuthorDate: Tue Nov 21 21:10:40 2023 +0400 Move `BaseOperatorLink` into the separate module (#35032) * Move `BaseOperatorLink` into the separate module * Add `airflow.models.baseoperatorlink` as part of Public Interface of Airflow * Ban `airflow.models.baseoperator.BaseOperatorLink` usage in codebase * Return back check-base-operator-usage pre-commit hooks --- .pre-commit-config.yaml | 23 +++++++-- STATIC_CODE_CHECKS.rst | 3 +- airflow/models/__init__.py | 5 +- airflow/models/abstractoperator.py | 3 +- airflow/models/baseoperator.py | 49 +++++++++---------- airflow/models/baseoperatorlink.py | 57 ++++++++++++++++++++++ airflow/models/mappedoperator.py | 3 +- airflow/operators/trigger_dagrun.py | 3 +- airflow/sensors/external_task.py | 2 +- airflow/serialization/serialized_objects.py | 2 +- docs/apache-airflow/howto/define-extra-link.rst | 9 ++-- docs/apache-airflow/public-airflow-interface.rst | 7 +++ docs/conf.py | 1 + pyproject.toml | 1 + .../endpoints/test_extra_link_endpoint.py | 2 +- .../endpoints/test_plugin_endpoint.py | 2 +- tests/api_connexion/schemas/test_plugin_schema.py | 2 +- tests/serialization/test_dag_serialization.py | 3 +- tests/test_utils/mock_operators.py | 3 +- tests/www/views/test_views_extra_links.py | 3 +- 20 files changed, 135 insertions(+), 48 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 606f56947e..f7e886afd1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -576,9 +576,24 @@ repos: files: ^airflow/models/dag\.py$|^airflow/(?:decorators|utils)/task_group\.py$ - id: check-base-operator-usage language: pygrep - name: Check BaseOperator[Link] core imports - description: Make sure BaseOperator[Link] is imported from airflow.models.baseoperator in core - entry: "from airflow\\.models import.* BaseOperator" + name: Check BaseOperator core imports + description: Make sure BaseOperator is imported from airflow.models.baseoperator in core + entry: "from airflow\\.models import.* BaseOperator\\b" + files: \.py$ + pass_filenames: true + exclude: > + (?x) + ^airflow/decorators/.*$| + ^airflow/hooks/.*$| + ^airflow/operators/.*$| + ^airflow/providers/.*$| + ^airflow/sensors/.*$| + ^dev/provider_packages/.*$ + - id: check-base-operator-usage + language: pygrep + name: Check BaseOperatorLink core imports + description: Make sure BaseOperatorLink is imported from airflow.models.baseoperatorlink in core + entry: "from airflow\\.models import.* BaseOperatorLink" files: \.py$ pass_filenames: true exclude: > @@ -593,7 +608,7 @@ repos: language: pygrep name: Check BaseOperator[Link] other imports description: Make sure BaseOperator[Link] is imported from airflow.models outside of core - entry: "from airflow\\.models\\.baseoperator import.* BaseOperator" + entry: "from airflow\\.models\\.baseoperator(link)? import.* BaseOperator" pass_filenames: true files: > (?x) diff --git a/STATIC_CODE_CHECKS.rst b/STATIC_CODE_CHECKS.rst index da606c9b40..56533885de 100644 --- a/STATIC_CODE_CHECKS.rst +++ b/STATIC_CODE_CHECKS.rst @@ -154,7 +154,8 @@ require Breeze Docker image to be built locally. +-----------------------------------------------------------+--------------------------------------------------------------+---------+ | check-base-operator-partial-arguments | Check BaseOperator and partial() arguments | | +-----------------------------------------------------------+--------------------------------------------------------------+---------+ -| check-base-operator-usage | * Check BaseOperator[Link] core imports | | +| check-base-operator-usage | * Check BaseOperator core imports | | +| | * Check BaseOperatorLink core imports | | | | * Check BaseOperator[Link] other imports | | +-----------------------------------------------------------+--------------------------------------------------------------+---------+ | check-boring-cyborg-configuration | Checks for Boring Cyborg configuration consistency | | diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index cb08730f53..a98147f416 100644 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -84,7 +84,7 @@ __lazy_imports = { "ID_LEN": "airflow.models.base", "Base": "airflow.models.base", "BaseOperator": "airflow.models.baseoperator", - "BaseOperatorLink": "airflow.models.baseoperator", + "BaseOperatorLink": "airflow.models.baseoperatorlink", "Connection": "airflow.models.connection", "DagBag": "airflow.models.dagbag", "DagModel": "airflow.models.dag", @@ -115,7 +115,8 @@ if TYPE_CHECKING: # I was unable to get mypy to respect a airflow/models/__init__.pyi, so # having to resort back to this hacky method from airflow.models.base import ID_LEN, Base - from airflow.models.baseoperator import BaseOperator, BaseOperatorLink + from airflow.models.baseoperator import BaseOperator + from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.connection import Connection from airflow.models.dag import DAG, DagModel, DagTag from airflow.models.dagbag import DagBag diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py index aa79555557..df0e6cb349 100644 --- a/airflow/models/abstractoperator.py +++ b/airflow/models/abstractoperator.py @@ -48,7 +48,8 @@ if TYPE_CHECKING: import jinja2 # Slow import. from sqlalchemy.orm import Session - from airflow.models.baseoperator import BaseOperator, BaseOperatorLink + from airflow.models.baseoperator import BaseOperator + from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.dag import DAG from airflow.models.mappedoperator import MappedOperator from airflow.models.operator import Operator diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 61fb6e07de..2ba7ec8ad1 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -26,7 +26,6 @@ import functools import logging import sys import warnings -from abc import ABCMeta, abstractmethod from datetime import datetime, timedelta from inspect import signature from types import FunctionType @@ -34,7 +33,6 @@ from typing import ( TYPE_CHECKING, Any, Callable, - ClassVar, Collection, Iterable, Sequence, @@ -101,12 +99,10 @@ if TYPE_CHECKING: import jinja2 # Slow import. from sqlalchemy.orm import Session - from airflow.models.abstractoperator import ( - TaskStateChangeCallback, - ) + from airflow.models.abstractoperator import TaskStateChangeCallback + from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.dag import DAG from airflow.models.operator import Operator - from airflow.models.taskinstancekey import TaskInstanceKey from airflow.models.xcom_arg import XComArg from airflow.ti_deps.deps.base_ti_dep import BaseTIDep from airflow.triggers.base import BaseTrigger @@ -1896,31 +1892,30 @@ def chain_linear(*elements: DependencyMixin | Sequence[DependencyMixin]): raise ValueError("No dependencies were set. Did you forget to expand with `*`?") -@attr.s(auto_attribs=True) -class BaseOperatorLink(metaclass=ABCMeta): - """Abstract base class that defines how we get an operator link.""" - - operators: ClassVar[list[type[BaseOperator]]] = [] +def __getattr__(name): """ - This property will be used by Airflow Plugins to find the Operators to which you want - to assign this Operator Link + PEP-562: Lazy loaded attributes on python modules. - :return: List of Operator classes used by task for which you want to create extra link + :meta private: """ + path = __deprecated_imports.get(name) + if not path: + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") - @property - @abstractmethod - def name(self) -> str: - """Name of the link. This will be the button name on the task UI.""" + from airflow.utils.module_loading import import_string - @abstractmethod - def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str: - """Link to external system. + warnings.warn( + f"Import `{__name__}.{name}` is deprecated. Please use `{path}.{name}`.", + RemovedInAirflow3Warning, + stacklevel=2, + ) + val = import_string(f"{path}.{name}") - Note: The old signature of this function was ``(self, operator, dttm: datetime)``. That is still - supported at runtime but is deprecated. + # Store for next time + globals()[name] = val + return val - :param operator: The Airflow operator object this link is associated to. - :param ti_key: TaskInstance ID to return link for. - :return: link to external system - """ + +__deprecated_imports = { + "BaseOperatorLink": "airflow.models.baseoperatorlink", +} diff --git a/airflow/models/baseoperatorlink.py b/airflow/models/baseoperatorlink.py new file mode 100644 index 0000000000..a1d1e02dd4 --- /dev/null +++ b/airflow/models/baseoperatorlink.py @@ -0,0 +1,57 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from abc import ABCMeta, abstractmethod +from typing import TYPE_CHECKING, ClassVar + +import attr + +if TYPE_CHECKING: + from airflow.models.baseoperator import BaseOperator + from airflow.models.taskinstancekey import TaskInstanceKey + + +@attr.s(auto_attribs=True) +class BaseOperatorLink(metaclass=ABCMeta): + """Abstract base class that defines how we get an operator link.""" + + operators: ClassVar[list[type[BaseOperator]]] = [] + """ + This property will be used by Airflow Plugins to find the Operators to which you want + to assign this Operator Link + + :return: List of Operator classes used by task for which you want to create extra link + """ + + @property + @abstractmethod + def name(self) -> str: + """Name of the link. This will be the button name on the task UI.""" + + @abstractmethod + def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str: + """Link to external system. + + Note: The old signature of this function was ``(self, operator, dttm: datetime)``. That is still + supported at runtime but is deprecated. + + :param operator: The Airflow operator object this link is associated to. + :param ti_key: TaskInstance ID to return link for. + :return: link to external system + """ diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py index ba49924b88..8174db145a 100644 --- a/airflow/models/mappedoperator.py +++ b/airflow/models/mappedoperator.py @@ -66,7 +66,8 @@ if TYPE_CHECKING: from airflow.models.abstractoperator import ( TaskStateChangeCallback, ) - from airflow.models.baseoperator import BaseOperator, BaseOperatorLink + from airflow.models.baseoperator import BaseOperator + from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.dag import DAG from airflow.models.expandinput import ( ExpandInput, diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py index e0d9de9f43..0bca11a801 100644 --- a/airflow/operators/trigger_dagrun.py +++ b/airflow/operators/trigger_dagrun.py @@ -28,7 +28,8 @@ from sqlalchemy.orm.exc import NoResultFound from airflow.api.common.trigger_dag import trigger_dag from airflow.configuration import conf from airflow.exceptions import AirflowException, DagNotFound, DagRunAlreadyExists -from airflow.models.baseoperator import BaseOperator, BaseOperatorLink +from airflow.models.baseoperator import BaseOperator +from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.dag import DagModel from airflow.models.dagbag import DagBag from airflow.models.dagrun import DagRun diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py index 3090b8a50f..79734d25a3 100644 --- a/airflow/sensors/external_task.py +++ b/airflow/sensors/external_task.py @@ -27,7 +27,7 @@ from sqlalchemy import func from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowSkipException, RemovedInAirflow3Warning -from airflow.models.baseoperator import BaseOperatorLink +from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.dag import DagModel from airflow.models.dagbag import DagBag from airflow.models.dagrun import DagRun diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index d2a72cb1ca..889177f4c1 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -70,7 +70,7 @@ if TYPE_CHECKING: from pydantic import BaseModel - from airflow.models.baseoperator import BaseOperatorLink + from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.expandinput import ExpandInput from airflow.models.operator import Operator from airflow.models.taskmixin import DAGNode diff --git a/docs/apache-airflow/howto/define-extra-link.rst b/docs/apache-airflow/howto/define-extra-link.rst index bfa5679d34..c934d52fa4 100644 --- a/docs/apache-airflow/howto/define-extra-link.rst +++ b/docs/apache-airflow/howto/define-extra-link.rst @@ -27,7 +27,8 @@ The following code shows how to add extra links to an operator via Plugins: .. code-block:: python - from airflow.models.baseoperator import BaseOperator, BaseOperatorLink + from airflow.models.baseoperator import BaseOperator + from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.taskinstancekey import TaskInstanceKey from airflow.plugins_manager import AirflowPlugin @@ -80,7 +81,8 @@ tasks using :class:`~airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Ope .. code-block:: python - from airflow.models.baseoperator import BaseOperator, BaseOperatorLink + from airflow.models.baseoperator import BaseOperator + from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.taskinstancekey import TaskInstanceKey from airflow.plugins_manager import AirflowPlugin from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator @@ -118,7 +120,8 @@ Console, but if we wanted to change that link we could: .. code-block:: python - from airflow.models.baseoperator import BaseOperatorLink, BaseOperator + from airflow.models.baseoperator import BaseOperator + from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.taskinstancekey import TaskInstanceKey from airflow.models.xcom import XCom from airflow.plugins_manager import AirflowPlugin diff --git a/docs/apache-airflow/public-airflow-interface.rst b/docs/apache-airflow/public-airflow-interface.rst index 93e68fa797..f16cc5aab0 100644 --- a/docs/apache-airflow/public-airflow-interface.rst +++ b/docs/apache-airflow/public-airflow-interface.rst @@ -312,6 +312,13 @@ Extra Links Extra links are dynamic links that could be added to Airflow independently from custom Operators. Normally they can be defined by the Operators, but plugins allow you to override the links on a global level. +.. toctree:: + :includehidden: + :glob: + :maxdepth: 1 + + _api/airflow/models/baseoperatorlink/index + You can read more about the Extra Links in :doc:`/howto/define-extra-link`. Using Public Interface to integrate with external services and applications diff --git a/docs/conf.py b/docs/conf.py index 3c7cfa26b0..e832e1e012 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -248,6 +248,7 @@ if PACKAGE_NAME == "apache-airflow": models_included: set[str] = { "baseoperator.py", + "baseoperatorlink.py", "connection.py", "dag.py", "dagrun.py", diff --git a/pyproject.toml b/pyproject.toml index 9722a0d7ac..3fbc95e88b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -149,6 +149,7 @@ combine-as-imports = true [tool.ruff.flake8-tidy-imports.banned-api] "airflow.AirflowException".msg = "Use airflow.exceptions.AirflowException instead." "airflow.Dataset".msg = "Use airflow.datasets.Dataset instead." +"airflow.models.baseoperator.BaseOperatorLink".msg = "Use airflow.models.baseoperatorlink.BaseOperatorLink" [tool.ruff.flake8-tidy-imports] # Ban certain modules from being imported at module level, instead requiring diff --git a/tests/api_connexion/endpoints/test_extra_link_endpoint.py b/tests/api_connexion/endpoints/test_extra_link_endpoint.py index a52c034268..3e803a4bf4 100644 --- a/tests/api_connexion/endpoints/test_extra_link_endpoint.py +++ b/tests/api_connexion/endpoints/test_extra_link_endpoint.py @@ -22,7 +22,7 @@ from urllib.parse import quote_plus import pytest from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP -from airflow.models.baseoperator import BaseOperatorLink +from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.dag import DAG from airflow.models.dagbag import DagBag from airflow.models.xcom import XCom diff --git a/tests/api_connexion/endpoints/test_plugin_endpoint.py b/tests/api_connexion/endpoints/test_plugin_endpoint.py index 4d9b8be847..57c3387ae0 100644 --- a/tests/api_connexion/endpoints/test_plugin_endpoint.py +++ b/tests/api_connexion/endpoints/test_plugin_endpoint.py @@ -23,7 +23,7 @@ from flask import Blueprint from flask_appbuilder import BaseView from airflow.hooks.base import BaseHook -from airflow.models.baseoperator import BaseOperatorLink +from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.plugins_manager import AirflowPlugin from airflow.security import permissions from airflow.ti_deps.deps.base_ti_dep import BaseTIDep diff --git a/tests/api_connexion/schemas/test_plugin_schema.py b/tests/api_connexion/schemas/test_plugin_schema.py index 1472fd2db7..0511072585 100644 --- a/tests/api_connexion/schemas/test_plugin_schema.py +++ b/tests/api_connexion/schemas/test_plugin_schema.py @@ -25,7 +25,7 @@ from airflow.api_connexion.schemas.plugin_schema import ( plugin_schema, ) from airflow.hooks.base import BaseHook -from airflow.models.baseoperator import BaseOperatorLink +from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.plugins_manager import AirflowPlugin diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index a9c63c80fe..30407eb945 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -43,7 +43,8 @@ from airflow.decorators import teardown from airflow.decorators.base import DecoratedOperator from airflow.exceptions import AirflowException, SerializationError from airflow.hooks.base import BaseHook -from airflow.models.baseoperator import BaseOperator, BaseOperatorLink +from airflow.models.baseoperator import BaseOperator +from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.connection import Connection from airflow.models.dag import DAG from airflow.models.dagbag import DagBag diff --git a/tests/test_utils/mock_operators.py b/tests/test_utils/mock_operators.py index 4aa59120a8..e9a05c9edb 100644 --- a/tests/test_utils/mock_operators.py +++ b/tests/test_utils/mock_operators.py @@ -21,7 +21,8 @@ from typing import TYPE_CHECKING, Any, Sequence import attr -from airflow.models.baseoperator import BaseOperator, BaseOperatorLink +from airflow.models.baseoperator import BaseOperator +from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.xcom import XCom if TYPE_CHECKING: diff --git a/tests/www/views/test_views_extra_links.py b/tests/www/views/test_views_extra_links.py index eb3b17517a..a959fa9fb8 100644 --- a/tests/www/views/test_views_extra_links.py +++ b/tests/www/views/test_views_extra_links.py @@ -23,7 +23,8 @@ from unittest import mock import pytest -from airflow.models.baseoperator import BaseOperator, BaseOperatorLink +from airflow.models.baseoperator import BaseOperator +from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.dag import DAG from airflow.utils import timezone from airflow.utils.state import DagRunState