This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new c2a733aa95 Add decorators for external and venv python branching operators (#35043) c2a733aa95 is described below commit c2a733aa95d45c8b6af4e5b05621aca28515946e Author: Jens Scheffler <95105677+jens-scheffler-bo...@users.noreply.github.com> AuthorDate: Sat Oct 21 00:33:01 2023 +0200 Add decorators for external and venv python branching operators (#35043) --- airflow/decorators/__init__.py | 6 ++ airflow/decorators/__init__.pyi | 97 ++++++++++++++++++ airflow/decorators/branch_external_python.py | 56 ++++++++++ airflow/decorators/branch_virtualenv.py | 56 ++++++++++ airflow/example_dags/example_branch_operator.py | 114 +++++++++++++++++++-- .../example_branch_operator_decorator.py | 104 +++++++++++++++++-- airflow/operators/python.py | 8 ++ docs/apache-airflow/core-concepts/dags.rst | 2 + docs/apache-airflow/howto/operator/python.rst | 106 ++++++++++++++++++- tests/decorators/test_branch_external_python.py | 90 ++++++++++++++++ tests/decorators/test_branch_virtualenv.py | 94 +++++++++++++++++ tests/jobs/test_backfill_job.py | 20 +++- 12 files changed, 733 insertions(+), 20 deletions(-) diff --git a/airflow/decorators/__init__.py b/airflow/decorators/__init__.py index 2b2fccf8fa..31bcfb263c 100644 --- a/airflow/decorators/__init__.py +++ b/airflow/decorators/__init__.py @@ -19,7 +19,9 @@ from __future__ import annotations from typing import Any, Callable from airflow.decorators.base import TaskDecorator +from airflow.decorators.branch_external_python import branch_external_python_task from airflow.decorators.branch_python import branch_task +from airflow.decorators.branch_virtualenv import branch_virtualenv_task from airflow.decorators.external_python import external_python_task from airflow.decorators.python import python_task from airflow.decorators.python_virtualenv import virtualenv_task @@ -41,6 +43,8 @@ __all__ = [ "virtualenv_task", "external_python_task", "branch_task", + "branch_virtualenv_task", + "branch_external_python_task", "short_circuit_task", "sensor_task", "setup", @@ -55,6 +59,8 @@ class TaskDecoratorCollection: virtualenv = staticmethod(virtualenv_task) external_python = staticmethod(external_python_task) branch = staticmethod(branch_task) + branch_virtualenv = staticmethod(branch_virtualenv_task) + branch_external_python = staticmethod(branch_external_python_task) short_circuit = staticmethod(short_circuit_task) sensor = staticmethod(sensor_task) diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi index e41ae930e1..0c3e94bf5c 100644 --- a/airflow/decorators/__init__.pyi +++ b/airflow/decorators/__init__.pyi @@ -26,7 +26,9 @@ from typing import Any, Callable, Collection, Container, Iterable, Mapping, over from kubernetes.client import models as k8s from airflow.decorators.base import FParams, FReturn, Task, TaskDecorator +from airflow.decorators.branch_external_python import branch_external_python_task from airflow.decorators.branch_python import branch_task +from airflow.decorators.branch_virtualenv import branch_virtualenv_task from airflow.decorators.external_python import external_python_task from airflow.decorators.python import python_task from airflow.decorators.python_virtualenv import virtualenv_task @@ -47,6 +49,8 @@ __all__ = [ "virtualenv_task", "external_python_task", "branch_task", + "branch_virtualenv_task", + "branch_external_python_task", "short_circuit_task", "sensor_task", "setup", @@ -194,6 +198,99 @@ class TaskDecoratorCollection: @overload def branch(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ... @overload + def branch_virtualenv( + self, + *, + multiple_outputs: bool | None = None, + # 'python_callable', 'op_args' and 'op_kwargs' since they are filled by + # _PythonVirtualenvDecoratedOperator. + requirements: None | Iterable[str] | str = None, + python_version: None | str | int | float = None, + use_dill: bool = False, + system_site_packages: bool = True, + templates_dict: Mapping[str, Any] | None = None, + pip_install_options: list[str] | None = None, + skip_on_exit_code: int | Container[int] | None = None, + index_urls: None | Collection[str] | str = None, + venv_cache_path: None | str = None, + show_return_value_in_logs: bool = True, + **kwargs, + ) -> TaskDecorator: + """Create a decorator to wrap the decorated callable into a BranchPythonVirtualenvOperator. + + For more information on how to use this decorator, see :ref:`concepts:branching`. + Accepts arbitrary for operator kwarg. Can be reused in a single DAG. + + :param multiple_outputs: If set, function return value will be unrolled to multiple XCom values. + Dict will unroll to XCom values with keys as XCom keys. Defaults to False. + :param requirements: Either a list of requirement strings, or a (templated) + "requirements file" as specified by pip. + :param python_version: The Python version to run the virtual environment with. Note that + both 2 and 2.7 are acceptable forms. + :param use_dill: Whether to use dill to serialize + the args and result (pickle is default). This allow more complex types + but requires you to include dill in your requirements. + :param system_site_packages: Whether to include + system_site_packages in your virtual environment. + See virtualenv documentation for more information. + :param pip_install_options: a list of pip install options when installing requirements + See 'pip install -h' for available options + :param skip_on_exit_code: If python_callable exits with this exit code, leave the task + in ``skipped`` state (default: None). If set to ``None``, any non-zero + exit code will be treated as a failure. + :param index_urls: an optional list of index urls to load Python packages from. + If not provided the system pip conf will be used to source packages from. + :param venv_cache_path: Optional path to the virtual environment parent folder in which the + virtual environment will be cached, creates a sub-folder venv-{hash} whereas hash will be replaced + with a checksum of requirements. If not provided the virtual environment will be created and + deleted in a temp folder for every execution. + :param show_return_value_in_logs: a bool value whether to show return_value + logs. Defaults to True, which allows return value log output. + It can be set to False to prevent log output of return value when you return huge data + such as transmission a large amount of XCom to TaskAPI. + """ + @overload + def branch_virtualenv(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ... + @overload + def branch_external_python( + self, + *, + python: str, + multiple_outputs: bool | None = None, + # 'python_callable', 'op_args' and 'op_kwargs' since they are filled by + # _PythonVirtualenvDecoratedOperator. + use_dill: bool = False, + templates_dict: Mapping[str, Any] | None = None, + show_return_value_in_logs: bool = True, + **kwargs, + ) -> TaskDecorator: + """Create a decorator to wrap the decorated callable into a BranchExternalPythonOperator. + + For more information on how to use this decorator, see :ref:`concepts:branching`. + Accepts arbitrary for operator kwarg. Can be reused in a single DAG. + + :param python: Full path string (file-system specific) that points to a Python binary inside + a virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path + (so usually start with "/" or "X:/" depending on the filesystem/os used). + :param multiple_outputs: If set, function return value will be unrolled to multiple XCom values. + Dict will unroll to XCom values with keys as XCom keys. Defaults to False. + :param use_dill: Whether to use dill to serialize + the args and result (pickle is default). This allow more complex types + but requires you to include dill in your requirements. + :param templates_dict: a dictionary where the values are templates that + will get templated by the Airflow engine sometime between + ``__init__`` and ``execute`` takes place and are made available + in your callable's context after the template has been applied. + :param show_return_value_in_logs: a bool value whether to show return_value + logs. Defaults to True, which allows return value log output. + It can be set to False to prevent log output of return value when you return huge data + such as transmission a large amount of XCom to TaskAPI. + """ + @overload + def branch_external_python( + self, python_callable: Callable[FParams, FReturn] + ) -> Task[FParams, FReturn]: ... + @overload def short_circuit( self, *, diff --git a/airflow/decorators/branch_external_python.py b/airflow/decorators/branch_external_python.py new file mode 100644 index 0000000000..8e945541c5 --- /dev/null +++ b/airflow/decorators/branch_external_python.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, Callable + +from airflow.decorators.base import task_decorator_factory +from airflow.decorators.python import _PythonDecoratedOperator +from airflow.operators.python import BranchExternalPythonOperator + +if TYPE_CHECKING: + from airflow.decorators.base import TaskDecorator + + +class _BranchExternalPythonDecoratedOperator(_PythonDecoratedOperator, BranchExternalPythonOperator): + """Wraps a Python callable and captures args/kwargs when called for execution.""" + + custom_operator_name: str = "@task.branch_external_python" + + +def branch_external_python_task( + python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs +) -> TaskDecorator: + """ + Wrap a python function into a BranchExternalPythonOperator. + + For more information on how to use this operator, take a look at the guide: + :ref:`concepts:branching` + + Accepts kwargs for operator kwarg. Can be reused in a single DAG. + + :param python_callable: Function to decorate + :param multiple_outputs: if set, function return value will be + unrolled to multiple XCom values. Dict will unroll to xcom values with keys as XCom keys. + Defaults to False. + """ + return task_decorator_factory( + python_callable=python_callable, + multiple_outputs=multiple_outputs, + decorated_operator_class=_BranchExternalPythonDecoratedOperator, + **kwargs, + ) diff --git a/airflow/decorators/branch_virtualenv.py b/airflow/decorators/branch_virtualenv.py new file mode 100644 index 0000000000..3e4c3fcaf1 --- /dev/null +++ b/airflow/decorators/branch_virtualenv.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, Callable + +from airflow.decorators.base import task_decorator_factory +from airflow.decorators.python import _PythonDecoratedOperator +from airflow.operators.python import BranchPythonVirtualenvOperator + +if TYPE_CHECKING: + from airflow.decorators.base import TaskDecorator + + +class _BranchPythonVirtualenvDecoratedOperator(_PythonDecoratedOperator, BranchPythonVirtualenvOperator): + """Wraps a Python callable and captures args/kwargs when called for execution.""" + + custom_operator_name: str = "@task.branch_virtualenv" + + +def branch_virtualenv_task( + python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs +) -> TaskDecorator: + """ + Wrap a python function into a BranchPythonVirtualenvOperator. + + For more information on how to use this operator, take a look at the guide: + :ref:`concepts:branching` + + Accepts kwargs for operator kwarg. Can be reused in a single DAG. + + :param python_callable: Function to decorate + :param multiple_outputs: if set, function return value will be + unrolled to multiple XCom values. Dict will unroll to xcom values with keys as XCom keys. + Defaults to False. + """ + return task_decorator_factory( + python_callable=python_callable, + multiple_outputs=multiple_outputs, + decorated_operator_class=_BranchPythonVirtualenvDecoratedOperator, + **kwargs, + ) diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py index 92fdf3b250..594c6a4cb1 100644 --- a/airflow/example_dags/example_branch_operator.py +++ b/airflow/example_dags/example_branch_operator.py @@ -15,36 +15,56 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Example DAG demonstrating the usage of the BranchPythonOperator.""" +"""Example DAG demonstrating the usage of the Classic branching Python operators. + +It is showcasing the basic BranchPythonOperator and its sisters BranchExternalPythonOperator +and BranchPythonVirtualenvOperator.""" from __future__ import annotations import random +import sys +import tempfile +from pathlib import Path import pendulum from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator -from airflow.operators.python import BranchPythonOperator +from airflow.operators.python import ( + BranchExternalPythonOperator, + BranchPythonOperator, + BranchPythonVirtualenvOperator, + ExternalPythonOperator, + PythonOperator, + PythonVirtualenvOperator, +) from airflow.utils.edgemodifier import Label from airflow.utils.trigger_rule import TriggerRule +PATH_TO_PYTHON_BINARY = sys.executable + with DAG( dag_id="example_branch_operator", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule="@daily", tags=["example", "example2"], + orientation="TB", ) as dag: run_this_first = EmptyOperator( task_id="run_this_first", ) - options = ["branch_a", "branch_b", "branch_c", "branch_d"] + options = ["a", "b", "c", "d"] + + # Example branching on standard Python tasks + # [START howto_operator_branch_python] branching = BranchPythonOperator( task_id="branching", - python_callable=lambda: random.choice(options), + python_callable=lambda: f"branch_{random.choice(options)}", ) + # [END howto_operator_branch_python] run_this_first >> branching join = EmptyOperator( @@ -53,8 +73,9 @@ with DAG( ) for option in options: - t = EmptyOperator( - task_id=option, + t = PythonOperator( + task_id=f"branch_{option}", + python_callable=lambda: print("Hello World"), ) empty_follow = EmptyOperator( @@ -63,3 +84,84 @@ with DAG( # Label is optional here, but it can help identify more complex branches branching >> Label(option) >> t >> empty_follow >> join + + # Example the same with external Python calls + + # [START howto_operator_branch_ext_py] + def branch_with_external_python(choices): + import random + + return f"ext_py_{random.choice(choices)}" + + branching_ext_py = BranchExternalPythonOperator( + task_id="branching_ext_python", + python=PATH_TO_PYTHON_BINARY, + python_callable=branch_with_external_python, + op_args=[options], + ) + # [END howto_operator_branch_ext_py] + join >> branching_ext_py + + join_ext_py = EmptyOperator( + task_id="join_ext_python", + trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + ) + + def hello_world_with_external_python(): + print("Hello World from external Python") + + for option in options: + t = ExternalPythonOperator( + task_id=f"ext_py_{option}", + python=PATH_TO_PYTHON_BINARY, + python_callable=hello_world_with_external_python, + ) + + # Label is optional here, but it can help identify more complex branches + branching_ext_py >> Label(option) >> t >> join_ext_py + + # Example the same with Python virtual environments + + # [START howto_operator_branch_virtualenv] + # Note: Passing a caching dir allows to keep the virtual environment over multiple runs + # Run the example a second time and see that it re-uses it and is faster. + VENV_CACHE_PATH = Path(tempfile.gettempdir()) + + def branch_with_venv(choices): + import random + + import numpy as np + + print(f"Some numpy stuff: {np.arange(6)}") + return f"venv_{random.choice(choices)}" + + branching_venv = BranchPythonVirtualenvOperator( + task_id="branching_venv", + requirements=["numpy~=1.24.4"], + venv_cache_path=VENV_CACHE_PATH, + python_callable=branch_with_venv, + op_args=[options], + ) + # [END howto_operator_branch_virtualenv] + join_ext_py >> branching_venv + + join_venv = EmptyOperator( + task_id="join_venv", + trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + ) + + def hello_world_with_venv(): + import numpy as np + + print(f"Hello World with some numpy stuff: {np.arange(6)}") + + for option in options: + t = PythonVirtualenvOperator( + task_id=f"venv_{option}", + requirements=["numpy~=1.24.4"], + venv_cache_path=VENV_CACHE_PATH, + python_callable=hello_world_with_venv, + ) + + # Label is optional here, but it can help identify more complex branches + branching_venv >> Label(option) >> t >> join_venv diff --git a/airflow/example_dags/example_branch_operator_decorator.py b/airflow/example_dags/example_branch_operator_decorator.py index b250c12073..5d42ff6b27 100644 --- a/airflow/example_dags/example_branch_operator_decorator.py +++ b/airflow/example_dags/example_branch_operator_decorator.py @@ -15,10 +15,17 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Example DAG demonstrating the usage of the ``@task.branch`` TaskFlow API decorator.""" +"""Example DAG demonstrating the usage of the branching TaskFlow API decorators. + +It shows how to use standard Python ``@task.branch`` as well as the external Python +version ``@task.branch_external_python`` which calls an external Python interpreter and +the ``@task.branch_virtualenv`` which builds a temporary Python virtual environment. +""" from __future__ import annotations import random +import sys +import tempfile import pendulum @@ -28,31 +35,110 @@ from airflow.operators.empty import EmptyOperator from airflow.utils.edgemodifier import Label from airflow.utils.trigger_rule import TriggerRule +PATH_TO_PYTHON_BINARY = sys.executable + with DAG( dag_id="example_branch_python_operator_decorator", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule="@daily", tags=["example", "example2"], + orientation="TB", ) as dag: run_this_first = EmptyOperator(task_id="run_this_first") - options = ["branch_a", "branch_b", "branch_c", "branch_d"] + options = ["a", "b", "c", "d"] + + # Example branching on standard Python tasks - @task.branch(task_id="branching") - def random_choice(choices: list[str]) -> str: - return random.choice(choices) + # [START howto_operator_branch_python] + @task.branch() + def branching(choices: list[str]) -> str: + return f"branch_{random.choice(choices)}" - random_choice_instance = random_choice(choices=options) + # [END howto_operator_branch_python] + + random_choice_instance = branching(choices=options) run_this_first >> random_choice_instance join = EmptyOperator(task_id="join", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) for option in options: - t = EmptyOperator(task_id=option) - empty_follow = EmptyOperator(task_id="follow_" + option) + @task(task_id=f"branch_{option}") + def some_task(): + print("doing something in Python") + + t = some_task() + empty = EmptyOperator(task_id=f"follow_{option}") + + # Label is optional here, but it can help identify more complex branches + random_choice_instance >> Label(option) >> t >> empty >> join + + # Example the same with external Python calls + + # [START howto_operator_branch_ext_py] + @task.branch_external_python(python=PATH_TO_PYTHON_BINARY) + def branching_ext_python(choices) -> str: + import random + + return f"ext_py_{random.choice(choices)}" + + # [END howto_operator_branch_ext_py] + + random_choice_ext_py = branching_ext_python(choices=options) + + join >> random_choice_ext_py + + join_ext_py = EmptyOperator(task_id="join_ext_py", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + + for option in options: + + @task.external_python(task_id=f"ext_py_{option}", python=PATH_TO_PYTHON_BINARY) + def some_ext_py_task(): + print("doing something in external Python") + + t = some_ext_py_task() + + # Label is optional here, but it can help identify more complex branches + random_choice_ext_py >> Label(option) >> t >> join_ext_py + + # Example the same with Python virtual environments + + # [START howto_operator_branch_virtualenv] + # Note: Passing a caching dir allows to keep the virtual environment over multiple runs + # Run the example a second time and see that it re-uses it and is faster. + VENV_CACHE_PATH = tempfile.gettempdir() + + @task.branch_virtualenv(requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH) + def branching_virtualenv(choices) -> str: + import random + + import numpy as np + + print(f"Some numpy stuff: {np.arange(6)}") + return f"venv_{random.choice(choices)}" + + # [END howto_operator_branch_virtualenv] + + random_choice_venv = branching_virtualenv(choices=options) + + join_ext_py >> random_choice_venv + + join_venv = EmptyOperator(task_id="join_venv", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + + for option in options: + + @task.virtualenv( + task_id=f"venv_{option}", requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH + ) + def some_venv_task(): + import numpy as np + + print(f"Some numpy stuff: {np.arange(6)}") + + t = some_venv_task() # Label is optional here, but it can help identify more complex branches - random_choice_instance >> Label(option) >> t >> empty_follow >> join + random_choice_venv >> Label(option) >> t >> join_venv diff --git a/airflow/operators/python.py b/airflow/operators/python.py index 118d1d4bdb..0eadb5441a 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -729,6 +729,10 @@ class BranchPythonVirtualenvOperator(PythonVirtualenvOperator, BranchMixIn): these paths can't move forward. The ``skipped`` states are propagated downstream to allow for the DAG state to fill up and the DAG run's state to be inferred. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:BranchPythonVirtualenvOperator` """ def execute(self, context: Context) -> Any: @@ -910,6 +914,10 @@ class BranchExternalPythonOperator(ExternalPythonOperator, BranchMixIn): Extends ExternalPythonOperator, so expects to get Python: virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path, so it can run on separate virtual environment similarly to ExternalPythonOperator. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:BranchExternalPythonOperator` """ def execute(self, context: Context) -> Any: diff --git a/docs/apache-airflow/core-concepts/dags.rst b/docs/apache-airflow/core-concepts/dags.rst index 8ca2689320..86f4aaad2e 100644 --- a/docs/apache-airflow/core-concepts/dags.rst +++ b/docs/apache-airflow/core-concepts/dags.rst @@ -371,6 +371,8 @@ As with the callable for ``@task.branch``, this method can return the ID of a do else: return None +Similar like ``@task.branch`` decorator for regular Python code there are also branch decorators which use a virtual environment called ``@task.branch_virtualenv`` or external python called ``@task.branch_external_python``. + .. _concepts:latest-only: diff --git a/docs/apache-airflow/howto/operator/python.rst b/docs/apache-airflow/howto/operator/python.rst index 6adbbe8b9e..6cfb5a335d 100644 --- a/docs/apache-airflow/howto/operator/python.rst +++ b/docs/apache-airflow/howto/operator/python.rst @@ -103,7 +103,15 @@ Otherwise you won't have access to the most context variables of Airflow in ``op If you want the context related to datetime objects like ``data_interval_start`` you can add ``pendulum`` and ``lazy_object_proxy``. -If additional parameters for package installation are needed pass them in ``requirements.txt`` as in the example below: +.. warning:: + The Python function body defined to be executed is cut out of the DAG into a temporary file w/o surrounding code. + As in the examples you need to add all imports again and you can not rely on variables from the global Python context. + + If you want to pass variables into the classic :class:`~airflow.operators.python.PythonVirtualenvOperator` use + ``op_args`` and ``op_kwargs``. + +If additional parameters for package installation are needed pass them in via the ``pip_install_options`` parameter or use a +``requirements.txt`` as in the example below: .. code-block:: @@ -196,6 +204,102 @@ Otherwise you won't have access to the most context variables of Airflow in ``op If you want the context related to datetime objects like ``data_interval_start`` you can add ``pendulum`` and ``lazy_object_proxy`` to your virtual environment. +.. warning:: + The Python function body defined to be executed is cut out of the DAG into a temporary file w/o surrounding code. + As in the examples you need to add all imports again and you can not rely on variables from the global Python context. + + If you want to pass variables into the classic :class:`~airflow.operators.python.ExternalPythonOperator` use + ``op_args`` and ``op_kwargs``. + +.. _howto/operator:PythonBranchOperator: + +PythonBranchOperator +==================== + +Use the ``@task.branch`` decorator to execute Python :ref:`branching <concepts:branching>` tasks. + +.. warning:: + The ``@task.branch`` decorator is recommended over the classic :class:`~airflow.operators.python.PythonBranchOperator` + to execute Python code. + +TaskFlow example of using the operator: + +.. exampleinclude:: /../../airflow/example_dags/example_branch_operator_decorator.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_branch_python] + :end-before: [END howto_operator_branch_python] + +Classic example of using the operator: + +.. exampleinclude:: /../../airflow/example_dags/example_branch_operator.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_branch_python] + :end-before: [END howto_operator_branch_python] + +Argument passing and templating options are the same like with :ref:`howto/operator:PythonOperator`. + +.. _howto/operator:BranchPythonVirtualenvOperator: + +BranchPythonVirtualenvOperator +============================== + +Use the ``@task.branch_virtualenv`` decorator to execute Python :ref:`branching <concepts:branching>` tasks and is a hybrid of +the branch decorator with execution in a virtual environment. + +.. warning:: + The ``@task.branch_virtualenv`` decorator is recommended over the classic + :class:`~airflow.operators.python.BranchPythonVirtualenvOperator` to execute Python code. + +TaskFlow example of using the operator: + +.. exampleinclude:: /../../airflow/example_dags/example_branch_operator_decorator.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_branch_virtualenv] + :end-before: [END howto_operator_branch_virtualenv] + +Classic example of using the operator: + +.. exampleinclude:: /../../airflow/example_dags/example_branch_operator.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_branch_virtualenv] + :end-before: [END howto_operator_branch_virtualenv] + +Argument passing and templating options are the same like with :ref:`howto/operator:PythonVirtualenvOperator`. + +.. _howto/operator:BranchExternalPythonOperator: + +BranchExternalPythonOperator +============================ + +Use the ``@task.branch_external_python`` decorator to execute Python :ref:`branching <concepts:branching>` tasks and is a hybrid of +the branch decorator with execution in an external Python environment. + +.. warning:: + The ``@task.branch_external_python`` decorator is recommended over the classic + :class:`~airflow.operators.python.BranchExternalPythonOperator` to execute Python code. + +TaskFlow example of using the operator: + +.. exampleinclude:: /../../airflow/example_dags/example_branch_operator_decorator.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_branch_ext_py] + :end-before: [END howto_operator_branch_ext_py] + +Classic example of using the operator: + +.. exampleinclude:: /../../airflow/example_dags/example_branch_operator.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_branch_ext_py] + :end-before: [END howto_operator_branch_ext_py] + +Argument passing and templating options are the same like with :ref:`howto/operator:ExternalPythonOperator`. + .. _howto/operator:ShortCircuitOperator: ShortCircuitOperator diff --git a/tests/decorators/test_branch_external_python.py b/tests/decorators/test_branch_external_python.py new file mode 100644 index 0000000000..01e1f9b968 --- /dev/null +++ b/tests/decorators/test_branch_external_python.py @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import sys + +import pytest + +from airflow.decorators import task +from airflow.utils.state import State + + +class Test_BranchPythonDecoratedOperator: + @pytest.mark.parametrize("branch_task_name", ["task_1", "task_2"]) + def test_branch_one(self, dag_maker, branch_task_name): + @task + def dummy_f(): + pass + + @task + def task_1(): + pass + + @task + def task_2(): + pass + + if ( + branch_task_name == "task_1" + ): # Note we manually need to carry the literal value into the venc code :-( + + @task.branch_external_python(task_id="branching", python=sys.executable) + def branch_operator(): + return "task_1" + + else: + + @task.branch_external_python(task_id="branching", python=sys.executable) + def branch_operator(): + return "task_2" + + with dag_maker(): + branchoperator = branch_operator() + df = dummy_f() + task_1 = task_1() + task_2 = task_2() + + df.set_downstream(branchoperator) + branchoperator.set_downstream(task_1) + branchoperator.set_downstream(task_2) + + dr = dag_maker.create_dagrun() + df.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True) + branchoperator.operator.run( + start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True + ) + task_1.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True) + task_2.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True) + tis = dr.get_task_instances() + + for ti in tis: + if ti.task_id == "dummy_f": + assert ti.state == State.SUCCESS + if ti.task_id == "branching": + assert ti.state == State.SUCCESS + + if ti.task_id == "task_1" and branch_task_name == "task_1": + assert ti.state == State.SUCCESS + elif ti.task_id == "task_1": + assert ti.state == State.SKIPPED + + if ti.task_id == "task_2" and branch_task_name == "task_2": + assert ti.state == State.SUCCESS + elif ti.task_id == "task_2": + assert ti.state == State.SKIPPED diff --git a/tests/decorators/test_branch_virtualenv.py b/tests/decorators/test_branch_virtualenv.py new file mode 100644 index 0000000000..861ba154af --- /dev/null +++ b/tests/decorators/test_branch_virtualenv.py @@ -0,0 +1,94 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from airflow.decorators import task +from airflow.utils.state import State + + +class Test_BranchPythonDecoratedOperator: + @pytest.mark.parametrize("branch_task_name", ["task_1", "task_2"]) + def test_branch_one(self, dag_maker, branch_task_name): + @task + def dummy_f(): + pass + + @task + def task_1(): + pass + + @task + def task_2(): + pass + + if ( + branch_task_name == "task_1" + ): # Note we manually need to carry the literal value into the venc code :-( + + @task.branch_virtualenv(task_id="branching", requirements=["funcsigs"]) + def branch_operator(): + import funcsigs + + print(f"We successfully imported funcsigs version {funcsigs.__version__}") + return "task_1" + + else: + + @task.branch_virtualenv(task_id="branching", requirements=["funcsigs"]) + def branch_operator(): + import funcsigs + + print(f"We successfully imported funcsigs version {funcsigs.__version__}") + return "task_2" + + with dag_maker(): + branchoperator = branch_operator() + df = dummy_f() + task_1 = task_1() + task_2 = task_2() + + df.set_downstream(branchoperator) + branchoperator.set_downstream(task_1) + branchoperator.set_downstream(task_2) + + dr = dag_maker.create_dagrun() + df.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True) + branchoperator.operator.run( + start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True + ) + task_1.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True) + task_2.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True) + tis = dr.get_task_instances() + + for ti in tis: + if ti.task_id == "dummy_f": + assert ti.state == State.SUCCESS + if ti.task_id == "branching": + assert ti.state == State.SUCCESS + + if ti.task_id == "task_1" and branch_task_name == "task_1": + assert ti.state == State.SUCCESS + elif ti.task_id == "task_1": + assert ti.state == State.SKIPPED + + if ti.task_id == "task_2" and branch_task_name == "task_2": + assert ti.state == State.SUCCESS + elif ti.task_id == "task_2": + assert ti.state == State.SKIPPED diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py index 8d592dfe65..195a959ee9 100644 --- a/tests/jobs/test_backfill_job.py +++ b/tests/jobs/test_backfill_job.py @@ -244,11 +244,23 @@ class TestBackfillJob: "branch_b", "branch_c", "branch_d", - "follow_branch_a", - "follow_branch_b", - "follow_branch_c", - "follow_branch_d", + "follow_a", + "follow_b", + "follow_c", + "follow_d", "join", + "branching_ext_python", + "ext_py_a", + "ext_py_b", + "ext_py_c", + "ext_py_d", + "join_ext_python", + "branching_venv", + "venv_a", + "venv_b", + "venv_c", + "venv_d", + "join_venv", ), ], [