This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e49adc  Refactor Taskflow decorator for extensibility (#14709)
4e49adc is described below

commit 4e49adc02c3c55f54172c2e09095205a2cdd35c1
Author: Daniel Imberman <daniel.imber...@gmail.com>
AuthorDate: Fri Mar 12 13:43:32 2021 -0800

    Refactor Taskflow decorator for extensibility (#14709)
    
    * Refactor Taskflow decorator for extensibility
    
    The Taskflow API is a very clean way to turn python functions into
    Airflow tasks, but it is currently limited to only running in the
    virtualenv of the parent worker. We eventually want to offer the ability
    to use decorators that tie to docker images, kubernetes pods, etc.
    
    This commit seperates out the python specific code and creates a
    decorators directory, which will allow us to both build in more "core"
    decorators as well as decorators in providers.
    
    * remove unecessary file
    
    * fix backcompat
    
    * add comment
    
    * remove circular import
    
    * fix imports
    
    * @ashb fixes
    
    * @ashb fixes
    
    * handle backcompat
    
    * Update airflow/decorators/__init__.py
    
    Co-authored-by: Ash Berlin-Taylor <ash_git...@firemirror.com>
    
    * Update airflow/decorators/__init__.py
    
    Co-authored-by: Ash Berlin-Taylor <ash_git...@firemirror.com>
    
    * simplify API
    
    * fix docs
    
    * Add docs
    
    * Update airflow/decorators/python.py
    
    Co-authored-by: Kaxil Naik <kaxiln...@gmail.com>
    
    * arbitrary change to restart CI
    
    Co-authored-by: Ash Berlin-Taylor <ash_git...@firemirror.com>
    Co-authored-by: Kaxil Naik <kaxiln...@gmail.com>
---
 .pre-commit-config.yaml                            |   1 +
 airflow/decorators/__init__.py                     |  59 +++
 airflow/decorators/base.py                         | 194 +++++++++
 airflow/decorators/python.py                       | 105 +++++
 airflow/models/dag.py                              |   2 +-
 airflow/operators/python.py                        | 230 ++--------
 .../decorators.py => tests/decorators/__init__.py  |   3 -
 tests/decorators/test_python.py                    | 485 +++++++++++++++++++++
 tests/operators/test_python.py                     | 400 +----------------
 9 files changed, 890 insertions(+), 589 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 409cf7d..e586c8c 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -338,6 +338,7 @@ repos:
         pass_filenames: true
         exclude: >
           (?x)
+          ^airflow/decorators/.*$|
           ^airflow/gcp/.*$|
           ^airflow/hooks/.*$|
           ^airflow/operators/.*$|
diff --git a/airflow/decorators/__init__.py b/airflow/decorators/__init__.py
new file mode 100644
index 0000000..8295355
--- /dev/null
+++ b/airflow/decorators/__init__.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Callable, Optional
+
+from airflow.decorators.python import python_task
+from airflow.models.dag import dag  # noqa # pylint: disable=unused-import
+
+
+class _TaskDecorator:
+    def __call__(
+        self, python_callable: Optional[Callable] = None, multiple_outputs: 
Optional[bool] = None, **kwargs
+    ):
+        """
+        Python operator decorator. Wraps a function into an Airflow operator.
+        Accepts kwargs for operator kwarg. This decorator can be reused in a 
single DAG.
+
+        :param python_callable: Function to decorate
+        :type python_callable: Optional[Callable]
+        :param multiple_outputs: if set, function return value will be
+            unrolled to multiple XCom values. List/Tuples will unroll to xcom 
values
+            with index as key. Dict will unroll to xcom values with keys as 
XCom keys.
+            Defaults to False.
+        :type multiple_outputs: bool
+        """
+        return self.python(python_callable=python_callable, 
multiple_outputs=multiple_outputs, **kwargs)
+
+    @staticmethod
+    def python(python_callable: Optional[Callable] = None, multiple_outputs: 
Optional[bool] = None, **kwargs):
+        """
+        Python operator decorator. Wraps a function into an Airflow operator.
+        Accepts kwargs for operator kwarg. This decorator can be reused in a 
single DAG.
+
+        :param python_callable: Function to decorate
+        :type python_callable: Optional[Callable]
+        :param multiple_outputs: if set, function return value will be
+            unrolled to multiple XCom values. List/Tuples will unroll to xcom 
values
+            with index as key. Dict will unroll to xcom values with keys as 
XCom keys.
+            Defaults to False.
+        :type multiple_outputs: bool
+        """
+        return python_task(python_callable=python_callable, 
multiple_outputs=multiple_outputs, **kwargs)
+
+
+task = _TaskDecorator()
diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
new file mode 100644
index 0000000..523452e
--- /dev/null
+++ b/airflow/decorators/base.py
@@ -0,0 +1,194 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import functools
+import inspect
+import re
+from inspect import signature
+from typing import Any, Callable, Dict, Optional, Tuple, TypeVar, cast
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.models.dag import DAG, DagContext
+from airflow.models.xcom_arg import XComArg
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.task_group import TaskGroup, TaskGroupContext
+
+
+class BaseDecoratedOperator(BaseOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with 
keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        python_callable: Callable,
+        task_id: str,
+        op_args: Tuple[Any],
+        op_kwargs: Dict[str, Any],
+        multiple_outputs: bool = False,
+        **kwargs,
+    ) -> None:
+        kwargs['task_id'] = self._get_unique_task_id(task_id, 
kwargs.get('dag'), kwargs.get('task_group'))
+        super().__init__(**kwargs)
+        self.python_callable = python_callable
+
+        # Check that arguments can be binded
+        signature(python_callable).bind(*op_args, **op_kwargs)
+        self.multiple_outputs = multiple_outputs
+        self.op_args = op_args
+        self.op_kwargs = op_kwargs
+
+    @staticmethod
+    def _get_unique_task_id(
+        task_id: str, dag: Optional[DAG] = None, task_group: 
Optional[TaskGroup] = None
+    ) -> str:
+        """
+        Generate unique task id given a DAG (or if run in a DAG context)
+        Ids are generated by appending a unique number to the end of
+        the original task id.
+
+        Example:
+          task_id
+          task_id__1
+          task_id__2
+          ...
+          task_id__20
+        """
+        dag = dag or DagContext.get_current_dag()
+        if not dag:
+            return task_id
+
+        # We need to check if we are in the context of TaskGroup as the 
task_id may
+        # already be altered
+        task_group = task_group or TaskGroupContext.get_current_task_group(dag)
+        tg_task_id = task_group.child_id(task_id) if task_group else task_id
+
+        if tg_task_id not in dag.task_ids:
+            return task_id
+        core = re.split(r'__\d+$', task_id)[0]
+        suffixes = sorted(
+            [
+                int(re.split(r'^.+__', task_id)[1])
+                for task_id in dag.task_ids
+                if re.match(rf'^{core}__\d+$', task_id)
+            ]
+        )
+        if not suffixes:
+            return f'{core}__1'
+        return f'{core}__{suffixes[-1] + 1}'
+
+    @staticmethod
+    def validate_python_callable(python_callable):
+        """
+        Validate that python callable can be wrapped by operator.
+        Raises exception if invalid.
+
+        :param python_callable: Python object to be validated
+        :raises: TypeError, AirflowException
+        """
+        if not callable(python_callable):
+            raise TypeError('`python_callable` param must be callable')
+        if 'self' in signature(python_callable).parameters.keys():
+            raise AirflowException('@task does not support methods')
+
+    def execute(self, context: Dict):
+        raise NotImplementedError()
+
+
+T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
+
+
+def task_decorator_factory(
+    python_callable: Optional[Callable] = None,
+    multiple_outputs: Optional[bool] = None,
+    decorated_operator_class: BaseDecoratedOperator = None,
+    **kwargs,
+) -> Callable[[T], T]:
+    """
+    A factory that generates a wrapper that raps a function into an Airflow 
operator.
+    Accepts kwargs for operator kwarg. Can be reused in a single DAG.
+
+    :param python_callable: Function to decorate
+    :type python_callable: Optional[Callable]
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. List/Tuples will unroll to xcom 
values
+        with index as key. Dict will unroll to xcom values with keys as XCom 
keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    :param decorated_operator_class: The operator that executes the logic 
needed to run the python function in
+        the correct environment
+    :type decorated_operator_class: BaseDecoratedOperator
+
+    """
+    # try to infer from  type annotation
+    if python_callable and multiple_outputs is None:
+        sig = signature(python_callable).return_annotation
+        ttype = getattr(sig, "__origin__", None)
+
+        multiple_outputs = sig != inspect.Signature.empty and ttype in (dict, 
Dict)
+
+    def wrapper(f: T):
+        """
+        Python wrapper to generate PythonDecoratedOperator out of simple 
python functions.
+        Used for Airflow Decorated interface
+        """
+        BaseDecoratedOperator.validate_python_callable(f)
+        kwargs.setdefault('task_id', f.__name__)
+
+        @functools.wraps(f)
+        def factory(*args, **f_kwargs):
+            op = decorated_operator_class(
+                python_callable=f,
+                op_args=args,
+                op_kwargs=f_kwargs,
+                multiple_outputs=multiple_outputs,
+                **kwargs,
+            )
+            if f.__doc__:
+                op.doc_md = f.__doc__
+            return XComArg(op)
+
+        return cast(T, factory)
+
+    if callable(python_callable):
+        return wrapper(python_callable)
+    elif python_callable is not None:
+        raise AirflowException('No args allowed while using @task, use kwargs 
instead')
+    return wrapper
diff --git a/airflow/decorators/python.py b/airflow/decorators/python.py
new file mode 100644
index 0000000..07a507a
--- /dev/null
+++ b/airflow/decorators/python.py
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Callable, Dict, Optional, TypeVar
+
+from airflow.decorators.base import BaseDecoratedOperator, 
task_decorator_factory
+from airflow.exceptions import AirflowException
+from airflow.utils.decorators import apply_defaults
+
+PYTHON_OPERATOR_UI_COLOR = '#ffefeb'
+
+
+class _PythonDecoratedOperator(BaseDecoratedOperator):
+    """
+    Wraps a Python callable and captures args/kwargs when called for execution.
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with 
keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+
+    template_fields = ('op_args', 'op_kwargs')
+    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+
+    ui_color = PYTHON_OPERATOR_UI_COLOR
+
+    # since we won't mutate the arguments, we should just do the shallow copy
+    # there are some cases we can't deepcopy the objects (e.g protobuf).
+    shallow_copy_attrs = ('python_callable',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+    def execute(self, context: Dict):
+        return_value = self.python_callable(*self.op_args, **self.op_kwargs)
+        self.log.debug("Done. Returned value was: %s", return_value)
+        if not self.multiple_outputs:
+            return return_value
+        if isinstance(return_value, dict):
+            for key in return_value.keys():
+                if not isinstance(key, str):
+                    raise AirflowException(
+                        'Returned dictionary keys must be strings when using '
+                        f'multiple_outputs, found {key} ({type(key)}) instead'
+                    )
+            for key, value in return_value.items():
+                self.xcom_push(context, key, value)
+        else:
+            raise AirflowException(
+                f'Returned output was type {type(return_value)} expected 
dictionary ' 'for multiple_outputs'
+            )
+        return return_value
+
+
+T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
+
+
+def python_task(
+    python_callable: Optional[Callable] = None, multiple_outputs: 
Optional[bool] = None, **kwargs
+):
+    """
+    Python operator decorator. Wraps a function into an Airflow operator.
+    Accepts kwargs for operator kwarg. Can be reused in a single DAG.
+
+    :param python_callable: Function to decorate
+    :type python_callable: Optional[Callable]
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. List/Tuples will unroll to xcom 
values
+        with index as key. Dict will unroll to xcom values with keys as XCom 
keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    """
+    return task_decorator_factory(
+        python_callable=python_callable,
+        multiple_outputs=multiple_outputs,
+        decorated_operator_class=_PythonDecoratedOperator,
+        **kwargs,
+    )
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 560ccd5..449c7e7b 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1574,7 +1574,7 @@ class DAG(LoggingMixin):
 
     @property
     def task(self):
-        from airflow.operators.python import task
+        from airflow.decorators import task
 
         return functools.partial(task, dag=self)
 
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index ff1c9a8..6613ecc 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -15,32 +15,68 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-import functools
 import inspect
 import os
 import pickle
-import re
 import sys
 import types
 import warnings
-from inspect import signature
 from tempfile import TemporaryDirectory
 from textwrap import dedent
-from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, 
TypeVar, Union, cast
+from typing import Any, Callable, Dict, Iterable, List, Optional, Union
 
 import dill
 
+# To maintain backwards compatibility, we import the task object into this file
+# This prevents breakages in dags that use `from airflow.operators.python 
import task`
+from airflow.decorators.python import (  # noqa # pylint: disable=unused-import
+    PYTHON_OPERATOR_UI_COLOR,
+    python_task,
+)
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
-from airflow.models.dag import DAG, DagContext
 from airflow.models.skipmixin import SkipMixin
 from airflow.models.taskinstance import _CURRENT_CONTEXT
-from airflow.models.xcom_arg import XComArg
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.operator_helpers import determine_kwargs
 from airflow.utils.process_utils import execute_in_subprocess
 from airflow.utils.python_virtualenv import prepare_virtualenv, 
write_python_script
-from airflow.utils.task_group import TaskGroup, TaskGroupContext
+
+
+def task(python_callable: Optional[Callable] = None, multiple_outputs: 
Optional[bool] = None, **kwargs):
+    """
+    Deprecated function that calls @task.python and allows users to turn a 
python function into
+    an Airflow task. Please use the following instead:
+
+    from airflow.decorators import task
+
+    @task
+    def my_task()
+
+    :param python_callable: A reference to an object that is callable
+    :type python_callable: python callable
+    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
+        in your function (templated)
+    :type op_kwargs: dict
+    :param op_args: a list of positional arguments that will get unpacked when
+        calling your callable (templated)
+    :type op_args: list
+    :param multiple_outputs: if set, function return value will be
+        unrolled to multiple XCom values. Dict will unroll to xcom values with 
keys as keys.
+        Defaults to False.
+    :type multiple_outputs: bool
+    :return:
+    """
+    warnings.warn(
+        """airflow.operators.python.task is deprecated. Please use the 
following instead
+
+        from airflow.decorators import task
+        @task
+        def my_task()""",
+        DeprecationWarning,
+        stacklevel=2,
+    )
+    return python_task(python_callable=python_callable, 
multiple_outputs=multiple_outputs, **kwargs)
 
 
 class PythonOperator(BaseOperator):
@@ -71,7 +107,7 @@ class PythonOperator(BaseOperator):
 
     template_fields = ('templates_dict', 'op_args', 'op_kwargs')
     template_fields_renderers = {"templates_dict": "json", "op_args": "py", 
"op_kwargs": "py"}
-    ui_color = '#ffefeb'
+    ui_color = PYTHON_OPERATOR_UI_COLOR
 
     # since we won't mutate the arguments, we should just do the shallow copy
     # there are some cases we can't deepcopy the objects(e.g protobuf).
@@ -128,184 +164,6 @@ class PythonOperator(BaseOperator):
         return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
-class _PythonDecoratedOperator(BaseOperator):
-    """
-    Wraps a Python callable and captures args/kwargs when called for execution.
-
-    :param python_callable: A reference to an object that is callable
-    :type python_callable: python callable
-    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
-        in your function (templated)
-    :type op_kwargs: dict
-    :param op_args: a list of positional arguments that will get unpacked when
-        calling your callable (templated)
-    :type op_args: list
-    :param multiple_outputs: if set, function return value will be
-        unrolled to multiple XCom values. Dict will unroll to xcom values with 
keys as keys.
-        Defaults to False.
-    :type multiple_outputs: bool
-    """
-
-    template_fields = ('op_args', 'op_kwargs')
-    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
-
-    ui_color = PythonOperator.ui_color
-
-    # since we won't mutate the arguments, we should just do the shallow copy
-    # there are some cases we can't deepcopy the objects (e.g protobuf).
-    shallow_copy_attrs = ('python_callable',)
-
-    @apply_defaults
-    def __init__(
-        self,
-        *,
-        python_callable: Callable,
-        task_id: str,
-        op_args: Tuple[Any],
-        op_kwargs: Dict[str, Any],
-        multiple_outputs: bool = False,
-        **kwargs,
-    ) -> None:
-        kwargs['task_id'] = self._get_unique_task_id(task_id, 
kwargs.get('dag'), kwargs.get('task_group'))
-        super().__init__(**kwargs)
-        self.python_callable = python_callable
-
-        # Check that arguments can be binded
-        signature(python_callable).bind(*op_args, **op_kwargs)
-        self.multiple_outputs = multiple_outputs
-        self.op_args = op_args
-        self.op_kwargs = op_kwargs
-
-    @staticmethod
-    def _get_unique_task_id(
-        task_id: str, dag: Optional[DAG] = None, task_group: 
Optional[TaskGroup] = None
-    ) -> str:
-        """
-        Generate unique task id given a DAG (or if run in a DAG context)
-        Ids are generated by appending a unique number to the end of
-        the original task id.
-
-        Example:
-          task_id
-          task_id__1
-          task_id__2
-          ...
-          task_id__20
-        """
-        dag = dag or DagContext.get_current_dag()
-        if not dag:
-            return task_id
-
-        # We need to check if we are in the context of TaskGroup as the 
task_id may
-        # already be altered
-        task_group = task_group or TaskGroupContext.get_current_task_group(dag)
-        tg_task_id = task_group.child_id(task_id) if task_group else task_id
-
-        if tg_task_id not in dag.task_ids:
-            return task_id
-        core = re.split(r'__\d+$', task_id)[0]
-        suffixes = sorted(
-            [
-                int(re.split(r'^.+__', task_id)[1])
-                for task_id in dag.task_ids
-                if re.match(rf'^{core}__\d+$', task_id)
-            ]
-        )
-        if not suffixes:
-            return f'{core}__1'
-        return f'{core}__{suffixes[-1] + 1}'
-
-    @staticmethod
-    def validate_python_callable(python_callable):
-        """
-        Validate that python callable can be wrapped by operator.
-        Raises exception if invalid.
-
-        :param python_callable: Python object to be validated
-        :raises: TypeError, AirflowException
-        """
-        if not callable(python_callable):
-            raise TypeError('`python_callable` param must be callable')
-        if 'self' in signature(python_callable).parameters.keys():
-            raise AirflowException('@task does not support methods')
-
-    def execute(self, context: Dict):
-        return_value = self.python_callable(*self.op_args, **self.op_kwargs)
-        self.log.debug("Done. Returned value was: %s", return_value)
-        if not self.multiple_outputs:
-            return return_value
-        if isinstance(return_value, dict):
-            for key in return_value.keys():
-                if not isinstance(key, str):
-                    raise AirflowException(
-                        'Returned dictionary keys must be strings when using '
-                        f'multiple_outputs, found {key} ({type(key)}) instead'
-                    )
-            for key, value in return_value.items():
-                self.xcom_push(context, key, value)
-        else:
-            raise AirflowException(
-                f'Returned output was type {type(return_value)} expected 
dictionary ' 'for multiple_outputs'
-            )
-        return return_value
-
-
-T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
-
-
-def task(
-    python_callable: Optional[Callable] = None, multiple_outputs: 
Optional[bool] = None, **kwargs
-) -> Callable[[T], T]:
-    """
-    Python operator decorator. Wraps a function into an Airflow operator.
-    Accepts kwargs for operator kwarg. Can be reused in a single DAG.
-
-    :param python_callable: Function to decorate
-    :type python_callable: Optional[Callable]
-    :param multiple_outputs: if set, function return value will be
-        unrolled to multiple XCom values. List/Tuples will unroll to xcom 
values
-        with index as key. Dict will unroll to xcom values with keys as XCom 
keys.
-        Defaults to False.
-    :type multiple_outputs: bool
-
-    """
-    # try to infer from  type annotation
-    if python_callable and multiple_outputs is None:
-        sig = signature(python_callable).return_annotation
-        ttype = getattr(sig, "__origin__", None)
-
-        multiple_outputs = sig != inspect.Signature.empty and ttype in (dict, 
Dict)
-
-    def wrapper(f: T):
-        """
-        Python wrapper to generate PythonDecoratedOperator out of simple 
python functions.
-        Used for Airflow Decorated interface
-        """
-        _PythonDecoratedOperator.validate_python_callable(f)
-        kwargs.setdefault('task_id', f.__name__)
-
-        @functools.wraps(f)
-        def factory(*args, **f_kwargs):
-            op = _PythonDecoratedOperator(
-                python_callable=f,
-                op_args=args,
-                op_kwargs=f_kwargs,
-                multiple_outputs=multiple_outputs,
-                **kwargs,
-            )
-            if f.__doc__:
-                op.doc_md = f.__doc__
-            return XComArg(op)
-
-        return cast(T, factory)
-
-    if callable(python_callable):
-        return wrapper(python_callable)
-    elif python_callable is not None:
-        raise AirflowException('No args allowed while using @task, use kwargs 
instead')
-    return wrapper
-
-
 class BranchPythonOperator(PythonOperator, SkipMixin):
     """
     Allows a workflow to "branch" or follow a path following the execution
diff --git a/airflow/decorators.py b/tests/decorators/__init__.py
similarity index 83%
rename from airflow/decorators.py
rename to tests/decorators/__init__.py
index 26c7dfc..13a8339 100644
--- a/airflow/decorators.py
+++ b/tests/decorators/__init__.py
@@ -14,6 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-from airflow.models.dag import dag  # noqa # pylint: disable=unused-import
-from airflow.operators.python import task  # noqa # pylint: 
disable=unused-import
diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py
new file mode 100644
index 0000000..a829863
--- /dev/null
+++ b/tests/decorators/test_python.py
@@ -0,0 +1,485 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest.mock
+from collections import namedtuple
+from datetime import date, timedelta
+from typing import Dict, Tuple
+
+import pytest
+
+from airflow.decorators import task as task_decorator
+from airflow.exceptions import AirflowException
+from airflow.models import DAG, DagRun, TaskInstance as TI
+from airflow.models.xcom_arg import XComArg
+from airflow.utils import timezone
+from airflow.utils.session import create_session
+from airflow.utils.state import State
+from airflow.utils.task_group import TaskGroup
+from airflow.utils.types import DagRunType
+
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+END_DATE = timezone.datetime(2016, 1, 2)
+INTERVAL = timedelta(hours=12)
+FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1)
+
+TI_CONTEXT_ENV_VARS = [
+    'AIRFLOW_CTX_DAG_ID',
+    'AIRFLOW_CTX_TASK_ID',
+    'AIRFLOW_CTX_EXECUTION_DATE',
+    'AIRFLOW_CTX_DAG_RUN_ID',
+]
+
+
+class Call:
+    def __init__(self, *args, **kwargs):
+        self.args = args
+        self.kwargs = kwargs
+
+
+def build_recording_function(calls_collection):
+    """
+    We can not use a Mock instance as a PythonOperator callable function or 
some tests fail with a
+    TypeError: Object of type Mock is not JSON serializable
+    Then using this custom function recording custom Call objects for further 
testing
+    (replacing Mock.assert_called_with assertion method)
+    """
+
+    def recording_function(*args, **kwargs):
+        calls_collection.append(Call(*args, **kwargs))
+
+    return recording_function
+
+
+class TestPythonBase(unittest.TestCase):
+    """Base test class for TestPythonOperator and TestPythonSensor classes"""
+
+    @classmethod
+    def setUpClass(cls):
+        super().setUpClass()
+
+        with create_session() as session:
+            session.query(DagRun).delete()
+            session.query(TI).delete()
+
+    def setUp(self):
+        super().setUp()
+        self.dag = DAG('test_dag', default_args={'owner': 'airflow', 
'start_date': DEFAULT_DATE})
+        self.addCleanup(self.dag.clear)
+        self.clear_run()
+        self.addCleanup(self.clear_run)
+
+    def tearDown(self):
+        super().tearDown()
+
+        with create_session() as session:
+            session.query(DagRun).delete()
+            session.query(TI).delete()
+
+    def clear_run(self):
+        self.run = False
+
+    def _assert_calls_equal(self, first, second):
+        assert isinstance(first, Call)
+        assert isinstance(second, Call)
+        assert first.args == second.args
+        # eliminate context (conf, dag_run, task_instance, etc.)
+        test_args = ["an_int", "a_date", "a_templated_string"]
+        first.kwargs = {key: value for (key, value) in first.kwargs.items() if 
key in test_args}
+        second.kwargs = {key: value for (key, value) in second.kwargs.items() 
if key in test_args}
+        assert first.kwargs == second.kwargs
+
+
+class TestAirflowTaskDecorator(TestPythonBase):
+    def test_python_operator_python_callable_is_callable(self):
+        """Tests that @task will only instantiate if
+        the python_callable argument is callable."""
+        not_callable = {}
+        with pytest.raises(AirflowException):
+            task_decorator(not_callable, dag=self.dag)
+
+    def test_infer_multiple_outputs_using_typing(self):
+        @task_decorator
+        def identity_dict(x: int, y: int) -> Dict[str, int]:
+            return {"x": x, "y": y}
+
+        assert identity_dict(5, 5).operator.multiple_outputs is True  # 
pylint: disable=maybe-no-member
+
+        @task_decorator
+        def identity_tuple(x: int, y: int) -> Tuple[int, int]:
+            return x, y
+
+        assert identity_tuple(5, 5).operator.multiple_outputs is False  # 
pylint: disable=maybe-no-member
+
+        @task_decorator
+        def identity_int(x: int) -> int:
+            return x
+
+        assert identity_int(5).operator.multiple_outputs is False  # pylint: 
disable=maybe-no-member
+
+        @task_decorator
+        def identity_notyping(x: int):
+            return x
+
+        assert identity_notyping(5).operator.multiple_outputs is False  # 
pylint: disable=maybe-no-member
+
+    def test_manual_multiple_outputs_false_with_typings(self):
+        @task_decorator(multiple_outputs=False)
+        def identity2(x: int, y: int) -> Dict[int, int]:
+            return (x, y)
+
+        with self.dag:
+            res = identity2(8, 4)
+
+        dr = self.dag.create_dagrun(
+            run_id=DagRunType.MANUAL.value,
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        res.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)  # 
pylint: disable=maybe-no-member
+
+        ti = dr.get_task_instances()[0]
+
+        assert res.operator.multiple_outputs is False  # pylint: 
disable=maybe-no-member
+        assert ti.xcom_pull() == [8, 4]  # pylint: disable=maybe-no-member
+        assert ti.xcom_pull(key="return_value_0") is None
+        assert ti.xcom_pull(key="return_value_1") is None
+
+    def test_multiple_outputs_ignore_typing(self):
+        @task_decorator
+        def identity_tuple(x: int, y: int) -> Tuple[int, int]:
+            return x, y
+
+        with self.dag:
+            ident = identity_tuple(35, 36)
+
+        dr = self.dag.create_dagrun(
+            run_id=DagRunType.MANUAL.value,
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        ident.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)  # 
pylint: disable=maybe-no-member
+
+        ti = dr.get_task_instances()[0]
+
+        assert not ident.operator.multiple_outputs  # pylint: 
disable=maybe-no-member
+        assert ti.xcom_pull() == [35, 36]
+        assert ti.xcom_pull(key="return_value_0") is None
+        assert ti.xcom_pull(key="return_value_1") is None
+
+    def test_fails_bad_signature(self):
+        """Tests that @task will fail if signature is not binding."""
+
+        @task_decorator
+        def add_number(num: int) -> int:
+            return num + 2
+
+        with pytest.raises(TypeError):
+            add_number(2, 3)  # pylint: disable=too-many-function-args
+        with pytest.raises(TypeError):
+            add_number()  # pylint: disable=no-value-for-parameter
+        add_number('test')  # pylint: disable=no-value-for-parameter
+
+    def test_fail_method(self):
+        """Tests that @task will fail if signature is not binding."""
+
+        with pytest.raises(AirflowException):
+
+            class Test:
+                num = 2
+
+                @task_decorator
+                def add_number(self, num: int) -> int:
+                    return self.num + num
+
+            Test().add_number(2)
+
+    def test_fail_multiple_outputs_key_type(self):
+        @task_decorator(multiple_outputs=True)
+        def add_number(num: int):
+            return {2: num}
+
+        with self.dag:
+            ret = add_number(2)
+        self.dag.create_dagrun(
+            run_id=DagRunType.MANUAL,
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        with pytest.raises(AirflowException):
+            # pylint: disable=maybe-no-member
+            ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+    def test_fail_multiple_outputs_no_dict(self):
+        @task_decorator(multiple_outputs=True)
+        def add_number(num: int):
+            return num
+
+        with self.dag:
+            ret = add_number(2)
+        self.dag.create_dagrun(
+            run_id=DagRunType.MANUAL,
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        with pytest.raises(AirflowException):
+            # pylint: disable=maybe-no-member
+            ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+    def test_python_callable_arguments_are_templatized(self):
+        """Test @task op_args are templatized"""
+        recorded_calls = []
+
+        # Create a named tuple and ensure it is still preserved
+        # after the rendering is done
+        Named = namedtuple('Named', ['var1', 'var2'])
+        named_tuple = Named('{{ ds }}', 'unchanged')
+
+        task = task_decorator(
+            # a Mock instance cannot be used as a callable function or test 
fails with a
+            # TypeError: Object of type Mock is not JSON serializable
+            build_recording_function(recorded_calls),
+            dag=self.dag,
+        )
+        ret = task(4, date(2019, 1, 1), "dag {{dag.dag_id}} ran on {{ds}}.", 
named_tuple)
+
+        self.dag.create_dagrun(
+            run_id=DagRunType.MANUAL,
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)  # 
pylint: disable=maybe-no-member
+
+        ds_templated = DEFAULT_DATE.date().isoformat()
+        assert len(recorded_calls) == 1
+        self._assert_calls_equal(
+            recorded_calls[0],
+            Call(
+                4,
+                date(2019, 1, 1),
+                f"dag {self.dag.dag_id} ran on {ds_templated}.",
+                Named(ds_templated, 'unchanged'),
+            ),
+        )
+
+    def test_python_callable_keyword_arguments_are_templatized(self):
+        """Test PythonOperator op_kwargs are templatized"""
+        recorded_calls = []
+
+        task = task_decorator(
+            # a Mock instance cannot be used as a callable function or test 
fails with a
+            # TypeError: Object of type Mock is not JSON serializable
+            build_recording_function(recorded_calls),
+            dag=self.dag,
+        )
+        ret = task(an_int=4, a_date=date(2019, 1, 1), a_templated_string="dag 
{{dag.dag_id}} ran on {{ds}}.")
+        self.dag.create_dagrun(
+            run_id=DagRunType.MANUAL,
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)  # 
pylint: disable=maybe-no-member
+
+        assert len(recorded_calls) == 1
+        self._assert_calls_equal(
+            recorded_calls[0],
+            Call(
+                an_int=4,
+                a_date=date(2019, 1, 1),
+                a_templated_string="dag {} ran on {}.".format(
+                    self.dag.dag_id, DEFAULT_DATE.date().isoformat()
+                ),
+            ),
+        )
+
+    def test_manual_task_id(self):
+        """Test manually setting task_id"""
+
+        @task_decorator(task_id='some_name')
+        def do_run():
+            return 4
+
+        with self.dag:
+            do_run()
+            assert ['some_name'] == self.dag.task_ids
+
+    def test_multiple_calls(self):
+        """Test calling task multiple times in a DAG"""
+
+        @task_decorator
+        def do_run():
+            return 4
+
+        with self.dag:
+            do_run()
+            assert ['do_run'] == self.dag.task_ids
+            do_run_1 = do_run()
+            do_run_2 = do_run()
+            assert ['do_run', 'do_run__1', 'do_run__2'] == self.dag.task_ids
+
+        assert do_run_1.operator.task_id == 'do_run__1'  # pylint: 
disable=maybe-no-member
+        assert do_run_2.operator.task_id == 'do_run__2'  # pylint: 
disable=maybe-no-member
+
+    def test_multiple_calls_in_task_group(self):
+        """Test calling task multiple times in a TaskGroup"""
+
+        @task_decorator
+        def do_run():
+            return 4
+
+        group_id = "KnightsOfNii"
+        with self.dag:
+            with TaskGroup(group_id=group_id):
+                do_run()
+                assert [f"{group_id}.do_run"] == self.dag.task_ids
+                do_run()
+                assert [f"{group_id}.do_run", f"{group_id}.do_run__1"] == 
self.dag.task_ids
+
+        assert len(self.dag.task_ids) == 2
+
+    def test_call_20(self):
+        """Test calling decorated function 21 times in a DAG"""
+
+        @task_decorator
+        def __do_run():
+            return 4
+
+        with self.dag:
+            __do_run()
+            for _ in range(20):
+                __do_run()
+
+        assert self.dag.task_ids[-1] == '__do_run__20'
+
+    def test_multiple_outputs(self):
+        """Tests pushing multiple outputs as a dictionary"""
+
+        @task_decorator(multiple_outputs=True)
+        def return_dict(number: int):
+            return {'number': number + 1, '43': 43}
+
+        test_number = 10
+        with self.dag:
+            ret = return_dict(test_number)
+
+        dr = self.dag.create_dagrun(
+            run_id=DagRunType.MANUAL,
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)  # 
pylint: disable=maybe-no-member
+
+        ti = dr.get_task_instances()[0]
+        assert ti.xcom_pull(key='number') == test_number + 1
+        assert ti.xcom_pull(key='43') == 43
+        assert ti.xcom_pull() == {'number': test_number + 1, '43': 43}
+
+    def test_default_args(self):
+        """Test that default_args are captured when calling the function 
correctly"""
+
+        @task_decorator
+        def do_run():
+            return 4
+
+        with self.dag:
+            ret = do_run()
+        assert ret.operator.owner == 'airflow'  # pylint: 
disable=maybe-no-member
+
+    def test_xcom_arg(self):
+        """Tests that returned key in XComArg is returned correctly"""
+
+        @task_decorator
+        def add_2(number: int):
+            return number + 2
+
+        @task_decorator
+        def add_num(number: int, num2: int = 2):
+            return number + num2
+
+        test_number = 10
+
+        with self.dag:
+            bigger_number = add_2(test_number)
+            ret = add_num(bigger_number, XComArg(bigger_number.operator))  # 
pylint: disable=maybe-no-member
+
+        dr = self.dag.create_dagrun(
+            run_id=DagRunType.MANUAL,
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        bigger_number.operator.run(  # pylint: disable=maybe-no-member
+            start_date=DEFAULT_DATE, end_date=DEFAULT_DATE
+        )
+        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)  # 
pylint: disable=maybe-no-member
+        ti_add_num = [ti for ti in dr.get_task_instances() if ti.task_id == 
'add_num'][0]
+        assert ti_add_num.xcom_pull(key=ret.key) == (test_number + 2) * 2  # 
pylint: disable=maybe-no-member
+
+    def test_dag_task(self):
+        """Tests dag.task property to generate task"""
+
+        @self.dag.task
+        def add_2(number: int):
+            return number + 2
+
+        test_number = 10
+        res = add_2(test_number)
+        add_2(res)
+
+        assert 'add_2' in self.dag.task_ids
+
+    def test_dag_task_multiple_outputs(self):
+        """Tests dag.task property to generate task with multiple outputs"""
+
+        @self.dag.task(multiple_outputs=True)
+        def add_2(number: int):
+            return {'1': number + 2, '2': 42}
+
+        test_number = 10
+        add_2(test_number)
+        add_2(test_number)
+
+        assert 'add_2' in self.dag.task_ids
+
+    def test_task_documentation(self):
+        """Tests that task_decorator loads doc_md from function doc"""
+
+        @task_decorator
+        def add_2(number: int):
+            """
+            Adds 2 to number.
+            """
+            return number + 2
+
+        test_number = 10
+        with self.dag:
+            ret = add_2(test_number)
+
+        assert ret.operator.doc_md.strip(), "Adds 2 to number."  # pylint: 
disable=maybe-no-member
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index 4c4a93a..5bd78c6 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -22,7 +22,7 @@ import unittest.mock
 from collections import namedtuple
 from datetime import date, datetime, timedelta
 from subprocess import CalledProcessError
-from typing import Dict, List, Tuple
+from typing import List
 
 import pytest
 
@@ -30,7 +30,6 @@ from airflow.exceptions import AirflowException
 from airflow.models import DAG, DagRun, TaskInstance as TI
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.taskinstance import clear_task_instances, 
set_current_context
-from airflow.models.xcom_arg import XComArg
 from airflow.operators.dummy import DummyOperator
 from airflow.operators.python import (
     BranchPythonOperator,
@@ -38,13 +37,11 @@ from airflow.operators.python import (
     PythonVirtualenvOperator,
     ShortCircuitOperator,
     get_current_context,
-    task as task_decorator,
 )
 from airflow.utils import timezone
 from airflow.utils.dates import days_ago
 from airflow.utils.session import create_session
 from airflow.utils.state import State
-from airflow.utils.task_group import TaskGroup
 from airflow.utils.types import DagRunType
 from tests.test_utils.db import clear_db_runs
 
@@ -318,401 +315,6 @@ class TestPythonOperator(TestPythonBase):
         python_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
 
-class TestAirflowTaskDecorator(TestPythonBase):
-    def test_python_operator_python_callable_is_callable(self):
-        """Tests that @task will only instantiate if
-        the python_callable argument is callable."""
-        not_callable = {}
-        with pytest.raises(AirflowException):
-            task_decorator(not_callable, dag=self.dag)
-
-    def test_infer_multiple_outputs_using_typing(self):
-        @task_decorator
-        def identity_dict(x: int, y: int) -> Dict[str, int]:
-            return {"x": x, "y": y}
-
-        assert identity_dict(5, 5).operator.multiple_outputs is True  # 
pylint: disable=maybe-no-member
-
-        @task_decorator
-        def identity_tuple(x: int, y: int) -> Tuple[int, int]:
-            return x, y
-
-        assert identity_tuple(5, 5).operator.multiple_outputs is False  # 
pylint: disable=maybe-no-member
-
-        @task_decorator
-        def identity_int(x: int) -> int:
-            return x
-
-        assert identity_int(5).operator.multiple_outputs is False  # pylint: 
disable=maybe-no-member
-
-        @task_decorator
-        def identity_notyping(x: int):
-            return x
-
-        assert identity_notyping(5).operator.multiple_outputs is False  # 
pylint: disable=maybe-no-member
-
-    def test_manual_multiple_outputs_false_with_typings(self):
-        @task_decorator(multiple_outputs=False)
-        def identity2(x: int, y: int) -> Dict[int, int]:
-            return (x, y)
-
-        with self.dag:
-            res = identity2(8, 4)
-
-        dr = self.dag.create_dagrun(
-            run_id=DagRunType.MANUAL.value,
-            start_date=timezone.utcnow(),
-            execution_date=DEFAULT_DATE,
-            state=State.RUNNING,
-        )
-
-        res.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)  # 
pylint: disable=maybe-no-member
-
-        ti = dr.get_task_instances()[0]
-
-        assert res.operator.multiple_outputs is False  # pylint: 
disable=maybe-no-member
-        assert ti.xcom_pull() == [8, 4]  # pylint: disable=maybe-no-member
-        assert ti.xcom_pull(key="return_value_0") is None
-        assert ti.xcom_pull(key="return_value_1") is None
-
-    def test_multiple_outputs_ignore_typing(self):
-        @task_decorator
-        def identity_tuple(x: int, y: int) -> Tuple[int, int]:
-            return x, y
-
-        with self.dag:
-            ident = identity_tuple(35, 36)
-
-        dr = self.dag.create_dagrun(
-            run_id=DagRunType.MANUAL.value,
-            start_date=timezone.utcnow(),
-            execution_date=DEFAULT_DATE,
-            state=State.RUNNING,
-        )
-
-        ident.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)  # 
pylint: disable=maybe-no-member
-
-        ti = dr.get_task_instances()[0]
-
-        assert not ident.operator.multiple_outputs  # pylint: 
disable=maybe-no-member
-        assert ti.xcom_pull() == [35, 36]
-        assert ti.xcom_pull(key="return_value_0") is None
-        assert ti.xcom_pull(key="return_value_1") is None
-
-    def test_fails_bad_signature(self):
-        """Tests that @task will fail if signature is not binding."""
-
-        @task_decorator
-        def add_number(num: int) -> int:
-            return num + 2
-
-        with pytest.raises(TypeError):
-            add_number(2, 3)  # pylint: disable=too-many-function-args
-        with pytest.raises(TypeError):
-            add_number()  # pylint: disable=no-value-for-parameter
-        add_number('test')  # pylint: disable=no-value-for-parameter
-
-    def test_fail_method(self):
-        """Tests that @task will fail if signature is not binding."""
-
-        with pytest.raises(AirflowException):
-
-            class Test:
-                num = 2
-
-                @task_decorator
-                def add_number(self, num: int) -> int:
-                    return self.num + num
-
-            Test().add_number(2)
-
-    def test_fail_multiple_outputs_key_type(self):
-        @task_decorator(multiple_outputs=True)
-        def add_number(num: int):
-            return {2: num}
-
-        with self.dag:
-            ret = add_number(2)
-        self.dag.create_dagrun(
-            run_id=DagRunType.MANUAL,
-            execution_date=DEFAULT_DATE,
-            start_date=DEFAULT_DATE,
-            state=State.RUNNING,
-        )
-
-        with pytest.raises(AirflowException):
-            # pylint: disable=maybe-no-member
-            ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
-
-    def test_fail_multiple_outputs_no_dict(self):
-        @task_decorator(multiple_outputs=True)
-        def add_number(num: int):
-            return num
-
-        with self.dag:
-            ret = add_number(2)
-        self.dag.create_dagrun(
-            run_id=DagRunType.MANUAL,
-            execution_date=DEFAULT_DATE,
-            start_date=DEFAULT_DATE,
-            state=State.RUNNING,
-        )
-
-        with pytest.raises(AirflowException):
-            # pylint: disable=maybe-no-member
-            ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
-
-    def test_python_callable_arguments_are_templatized(self):
-        """Test @task op_args are templatized"""
-        recorded_calls = []
-
-        # Create a named tuple and ensure it is still preserved
-        # after the rendering is done
-        Named = namedtuple('Named', ['var1', 'var2'])
-        named_tuple = Named('{{ ds }}', 'unchanged')
-
-        task = task_decorator(
-            # a Mock instance cannot be used as a callable function or test 
fails with a
-            # TypeError: Object of type Mock is not JSON serializable
-            build_recording_function(recorded_calls),
-            dag=self.dag,
-        )
-        ret = task(4, date(2019, 1, 1), "dag {{dag.dag_id}} ran on {{ds}}.", 
named_tuple)
-
-        self.dag.create_dagrun(
-            run_id=DagRunType.MANUAL,
-            execution_date=DEFAULT_DATE,
-            start_date=DEFAULT_DATE,
-            state=State.RUNNING,
-        )
-        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)  # 
pylint: disable=maybe-no-member
-
-        ds_templated = DEFAULT_DATE.date().isoformat()
-        assert len(recorded_calls) == 1
-        self._assert_calls_equal(
-            recorded_calls[0],
-            Call(
-                4,
-                date(2019, 1, 1),
-                f"dag {self.dag.dag_id} ran on {ds_templated}.",
-                Named(ds_templated, 'unchanged'),
-            ),
-        )
-
-    def test_python_callable_keyword_arguments_are_templatized(self):
-        """Test PythonOperator op_kwargs are templatized"""
-        recorded_calls = []
-
-        task = task_decorator(
-            # a Mock instance cannot be used as a callable function or test 
fails with a
-            # TypeError: Object of type Mock is not JSON serializable
-            build_recording_function(recorded_calls),
-            dag=self.dag,
-        )
-        ret = task(an_int=4, a_date=date(2019, 1, 1), a_templated_string="dag 
{{dag.dag_id}} ran on {{ds}}.")
-        self.dag.create_dagrun(
-            run_id=DagRunType.MANUAL,
-            execution_date=DEFAULT_DATE,
-            start_date=DEFAULT_DATE,
-            state=State.RUNNING,
-        )
-        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)  # 
pylint: disable=maybe-no-member
-
-        assert len(recorded_calls) == 1
-        self._assert_calls_equal(
-            recorded_calls[0],
-            Call(
-                an_int=4,
-                a_date=date(2019, 1, 1),
-                a_templated_string="dag {} ran on {}.".format(
-                    self.dag.dag_id, DEFAULT_DATE.date().isoformat()
-                ),
-            ),
-        )
-
-    def test_manual_task_id(self):
-        """Test manually setting task_id"""
-
-        @task_decorator(task_id='some_name')
-        def do_run():
-            return 4
-
-        with self.dag:
-            do_run()
-            assert ['some_name'] == self.dag.task_ids
-
-    def test_multiple_calls(self):
-        """Test calling task multiple times in a DAG"""
-
-        @task_decorator
-        def do_run():
-            return 4
-
-        with self.dag:
-            do_run()
-            assert ['do_run'] == self.dag.task_ids
-            do_run_1 = do_run()
-            do_run_2 = do_run()
-            assert ['do_run', 'do_run__1', 'do_run__2'] == self.dag.task_ids
-
-        assert do_run_1.operator.task_id == 'do_run__1'  # pylint: 
disable=maybe-no-member
-        assert do_run_2.operator.task_id == 'do_run__2'  # pylint: 
disable=maybe-no-member
-
-    def test_multiple_calls_in_task_group(self):
-        """Test calling task multiple times in a TaskGroup"""
-
-        @task_decorator
-        def do_run():
-            return 4
-
-        group_id = "KnightsOfNii"
-        with self.dag:
-            with TaskGroup(group_id=group_id):
-                do_run()
-                assert [f"{group_id}.do_run"] == self.dag.task_ids
-                do_run()
-                assert [f"{group_id}.do_run", f"{group_id}.do_run__1"] == 
self.dag.task_ids
-
-        assert len(self.dag.task_ids) == 2
-
-    def test_call_20(self):
-        """Test calling decorated function 21 times in a DAG"""
-
-        @task_decorator
-        def __do_run():
-            return 4
-
-        with self.dag:
-            __do_run()
-            for _ in range(20):
-                __do_run()
-
-        assert self.dag.task_ids[-1] == '__do_run__20'
-
-    def test_multiple_outputs(self):
-        """Tests pushing multiple outputs as a dictionary"""
-
-        @task_decorator(multiple_outputs=True)
-        def return_dict(number: int):
-            return {'number': number + 1, '43': 43}
-
-        test_number = 10
-        with self.dag:
-            ret = return_dict(test_number)
-
-        dr = self.dag.create_dagrun(
-            run_id=DagRunType.MANUAL,
-            start_date=timezone.utcnow(),
-            execution_date=DEFAULT_DATE,
-            state=State.RUNNING,
-        )
-
-        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)  # 
pylint: disable=maybe-no-member
-
-        ti = dr.get_task_instances()[0]
-        assert ti.xcom_pull(key='number') == test_number + 1
-        assert ti.xcom_pull(key='43') == 43
-        assert ti.xcom_pull() == {'number': test_number + 1, '43': 43}
-
-    def test_default_args(self):
-        """Test that default_args are captured when calling the function 
correctly"""
-
-        @task_decorator
-        def do_run():
-            return 4
-
-        with self.dag:
-            ret = do_run()
-        assert ret.operator.owner == 'airflow'  # pylint: 
disable=maybe-no-member
-
-    def test_xcom_arg(self):
-        """Tests that returned key in XComArg is returned correctly"""
-
-        @task_decorator
-        def add_2(number: int):
-            return number + 2
-
-        @task_decorator
-        def add_num(number: int, num2: int = 2):
-            return number + num2
-
-        test_number = 10
-
-        with self.dag:
-            bigger_number = add_2(test_number)
-            ret = add_num(bigger_number, XComArg(bigger_number.operator))  # 
pylint: disable=maybe-no-member
-
-        dr = self.dag.create_dagrun(
-            run_id=DagRunType.MANUAL,
-            start_date=timezone.utcnow(),
-            execution_date=DEFAULT_DATE,
-            state=State.RUNNING,
-        )
-
-        bigger_number.operator.run(  # pylint: disable=maybe-no-member
-            start_date=DEFAULT_DATE, end_date=DEFAULT_DATE
-        )
-        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)  # 
pylint: disable=maybe-no-member
-        ti_add_num = [ti for ti in dr.get_task_instances() if ti.task_id == 
'add_num'][0]
-        assert ti_add_num.xcom_pull(key=ret.key) == (test_number + 2) * 2  # 
pylint: disable=maybe-no-member
-
-    def test_dag_task(self):
-        """Tests dag.task property to generate task"""
-
-        @self.dag.task
-        def add_2(number: int):
-            return number + 2
-
-        test_number = 10
-        res = add_2(test_number)
-        add_2(res)
-
-        assert 'add_2' in self.dag.task_ids
-
-    def test_dag_task_multiple_outputs(self):
-        """Tests dag.task property to generate task with multiple outputs"""
-
-        @self.dag.task(multiple_outputs=True)
-        def add_2(number: int):
-            return {'1': number + 2, '2': 42}
-
-        test_number = 10
-        add_2(test_number)
-        add_2(test_number)
-
-        assert 'add_2' in self.dag.task_ids
-
-    def test_airflow_task(self):
-        """Tests airflow.task decorator to generate task"""
-        from airflow.decorators import task
-
-        @task
-        def add_2(number: int):
-            return number + 2
-
-        test_number = 10
-        with self.dag:
-            add_2(test_number)
-
-        assert 'add_2' in self.dag.task_ids
-
-    def test_task_documentation(self):
-        """Tests that task_decorator loads doc_md from function doc"""
-
-        @task_decorator
-        def add_2(number: int):
-            """
-            Adds 2 to number.
-            """
-            return number + 2
-
-        test_number = 10
-        with self.dag:
-            ret = add_2(test_number)
-
-        assert ret.operator.doc_md.strip(), "Adds 2 to number."  # pylint: 
disable=maybe-no-member
-
-
 class TestBranchOperator(unittest.TestCase):
     @classmethod
     def setUpClass(cls):

Reply via email to