This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit d2ae684a09c85fc557b24e2fb4421df7da79a9b0 Author: Josh Fell <[email protected]> AuthorDate: Mon Jan 10 11:14:22 2022 -0500 Enhance `multiple_outputs` inference of dict typing (#19608) (cherry picked from commit 4198550bba474e7942705a4c6df2ad916fb76561) --- .pre-commit-config.yaml | 1 + airflow/decorators/base.py | 27 +++++++++++++++++---------- airflow/decorators/python.py | 16 ++++++---------- airflow/decorators/python_virtualenv.py | 10 ++++------ tests/decorators/test_python.py | 23 ++++++++++++++++++----- 5 files changed, 46 insertions(+), 31 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0003071..08d60ab 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -195,6 +195,7 @@ repos: - "4" files: ^chart/values\.schema\.json$|^chart/values_schema\.schema\.json$ pass_filenames: true + # TODO: Bump to Python 3.7 when support for Python 3.6 is dropped in Airflow 2.3. - repo: https://github.com/asottile/pyupgrade rev: v2.29.0 hooks: diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py index 229a114..cd76839 100644 --- a/airflow/decorators/base.py +++ b/airflow/decorators/base.py @@ -18,6 +18,7 @@ import functools import inspect import re +import sys from inspect import signature from typing import Any, Callable, Dict, Optional, Tuple, TypeVar, cast @@ -91,9 +92,8 @@ class DecoratedOperator(BaseOperator): :param op_args: a list of positional arguments that will get unpacked when calling your callable (templated) :type op_args: list - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys. - Defaults to False. + :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to + multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. :type multiple_outputs: bool :param kwargs_to_upstream: For certain operators, we might need to upstream certain arguments that would otherwise be absorbed by the DecoratedOperator (for example python_callable for the @@ -189,10 +189,8 @@ def task_decorator_factory( :param python_callable: Function to decorate :type python_callable: Optional[Callable] - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. List/Tuples will unroll to xcom values - with index as key. Dict will unroll to xcom values with keys as XCom keys. - Defaults to False. + :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to + multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. :type multiple_outputs: bool :param decorated_operator_class: The operator that executes the logic needed to run the python function in the correct environment @@ -201,10 +199,19 @@ def task_decorator_factory( """ # try to infer from type annotation if python_callable and multiple_outputs is None: - sig = signature(python_callable).return_annotation - ttype = getattr(sig, "__origin__", None) + return_type = signature(python_callable).return_annotation + + # If the return type annotation is already the builtins ``dict`` type, use it for the inference. + if return_type == dict: + ttype = return_type + # Checking if Python 3.6, ``__origin__`` attribute does not exist until 3.7; need to use ``__extra__`` + # TODO: Remove check when support for Python 3.6 is dropped in Airflow 2.3. + elif sys.version_info < (3, 7): + ttype = getattr(return_type, "__extra__", None) + else: + ttype = getattr(return_type, "__origin__", None) - multiple_outputs = sig != inspect.Signature.empty and ttype in (dict, Dict) + multiple_outputs = return_type != inspect.Signature.empty and ttype in (dict, Dict) def wrapper(f: T): """ diff --git a/airflow/decorators/python.py b/airflow/decorators/python.py index 7dc6c1b..2411761 100644 --- a/airflow/decorators/python.py +++ b/airflow/decorators/python.py @@ -33,9 +33,8 @@ class _PythonDecoratedOperator(DecoratedOperator, PythonOperator): :param op_args: a list of positional arguments that will get unpacked when calling your callable (templated) :type op_args: list - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys. - Defaults to False. + :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to + multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. :type multiple_outputs: bool """ @@ -85,9 +84,8 @@ class PythonDecoratorMixin: :param python_callable: Function to decorate :type python_callable: Optional[Callable] - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. List/Tuples will unroll to xcom values - with index as key. Dict will unroll to xcom values with keys as XCom keys. + :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to + multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. :type multiple_outputs: bool """ @@ -109,10 +107,8 @@ def python_task( :param python_callable: Function to decorate :type python_callable: Optional[Callable] - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. List/Tuples will unroll to xcom values - with index as key. Dict will unroll to xcom values with keys as XCom keys. - Defaults to False. + :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to + multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. :type multiple_outputs: bool """ return task_decorator_factory( diff --git a/airflow/decorators/python_virtualenv.py b/airflow/decorators/python_virtualenv.py index 8024e5a..d412344 100644 --- a/airflow/decorators/python_virtualenv.py +++ b/airflow/decorators/python_virtualenv.py @@ -36,9 +36,8 @@ class _PythonVirtualenvDecoratedOperator(DecoratedOperator, PythonVirtualenvOper :param op_args: a list of positional arguments that will get unpacked when calling your callable (templated) :type op_args: list - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys. - Defaults to False. + :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to + multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. :type multiple_outputs: bool """ @@ -88,9 +87,8 @@ class PythonVirtualenvDecoratorMixin: :param python_callable: Function to decorate :type python_callable: Optional[Callable] - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. List/Tuples will unroll to xcom values - with index as key. Dict will unroll to xcom values with keys as XCom keys. + :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to + multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. :type multiple_outputs: bool """ diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py index 8782999..798d877 100644 --- a/tests/decorators/test_python.py +++ b/tests/decorators/test_python.py @@ -15,12 +15,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import sys import unittest.mock from collections import namedtuple from datetime import date, timedelta from typing import Dict, Tuple import pytest +from parameterized import parameterized from airflow.decorators import task as task_decorator from airflow.exceptions import AirflowException @@ -112,13 +114,24 @@ class TestAirflowTaskDecorator(TestPythonBase): with pytest.raises(AirflowException): task_decorator(not_callable, dag=self.dag) - def test_infer_multiple_outputs_using_typing(self): - @task_decorator - def identity_dict(x: int, y: int) -> Dict[str, int]: - return {"x": x, "y": y} + @parameterized.expand([["dict"], ["dict[str, int]"], ["Dict"], ["Dict[str, int]"]]) + def test_infer_multiple_outputs_using_dict_typing(self, test_return_annotation): + if sys.version_info < (3, 9) and test_return_annotation == "dict[str, int]": + self.skipTest("dict[...] not a supported typing prior to Python 3.9") + + @task_decorator + def identity_dict(x: int, y: int) -> eval(test_return_annotation): + return {"x": x, "y": y} + + assert identity_dict(5, 5).operator.multiple_outputs is True + + @task_decorator + def identity_dict_stringified(x: int, y: int) -> test_return_annotation: + return {"x": x, "y": y} - assert identity_dict(5, 5).operator.multiple_outputs is True + assert identity_dict_stringified(5, 5).operator.multiple_outputs is True + def test_infer_multiple_outputs_using_other_typing(self): @task_decorator def identity_tuple(x: int, y: int) -> Tuple[int, int]: return x, y
