ashb commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r439363494
##########
File path: tests/operators/test_python.py
##########
@@ -312,6 +315,327 @@ def func(**context):
python_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+class TestAirflowTaskDecorator(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+
+ with create_session() as session:
+ session.query(DagRun).delete()
+ session.query(TI).delete()
+
+ def setUp(self):
+ super().setUp()
+ self.owner = 'airflow_tes'
+ self.dag = DAG(
+ 'test_dag',
+ default_args={
+ 'owner': self.owner,
+ 'start_date': DEFAULT_DATE})
+ self.addCleanup(self.dag.clear)
+
+ def tearDown(self):
+ super().tearDown()
+
+ with create_session() as session:
+ session.query(DagRun).delete()
+ session.query(TI).delete()
+
+ def _assert_calls_equal(self, first, second):
+ assert isinstance(first, Call)
+ assert isinstance(second, Call)
+ assert first.args == second.args
+ # eliminate context (conf, dag_run, task_instance, etc.)
+ test_args = ["an_int", "a_date", "a_templated_string"]
+ first.kwargs = {
+ key: value
+ for (key, value) in first.kwargs.items()
+ if key in test_args
+ }
+ second.kwargs = {
+ key: value
+ for (key, value) in second.kwargs.items()
+ if key in test_args
+ }
+ assert first.kwargs == second.kwargs
+
+ def test_python_operator_python_callable_is_callable(self):
+ """Tests that @task will only instantiate if
+ the python_callable argument is callable."""
+ not_callable = {}
+ with pytest.raises(AirflowException):
+ task_decorator(not_callable, dag=self.dag)
+
+ def test_fails_bad_signature(self):
+ """Tests that @task will fail if signature is not binding."""
+ @task_decorator
+ def add_number(num: int) -> int:
+ return num + 2
+ with pytest.raises(TypeError):
+ add_number(2, 3) # pylint: disable=too-many-function-args
+ with pytest.raises(TypeError):
+ add_number() # pylint: disable=no-value-for-parameter
+ add_number('test') # pylint: disable=no-value-for-parameter
+
+ def test_fail_method(self):
+ """Tests that @task will fail if signature is not binding."""
+
+ with pytest.raises(AirflowException):
+ class Test:
+ num = 2
+
+ @task_decorator
+ def add_number(self, num: int) -> int:
+ return self.num + num
+ Test().add_number(2)
+
+ def test_fail_multiple_outputs_key_type(self):
+ @task_decorator(multiple_outputs=True)
+ def add_number(num: int):
+ return {2: num}
+ with self.dag:
+ ret = add_number(2)
+ self.dag.create_dagrun(
+ run_id=DagRunType.MANUAL.value,
+ execution_date=DEFAULT_DATE,
+ start_date=DEFAULT_DATE,
+ state=State.RUNNING
+ )
+
+ with pytest.raises(AirflowException):
+ # pylint: disable=maybe-no-member
+ ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+ def test_fail_multiple_outputs_no_dict(self):
+ @task_decorator(multiple_outputs=True)
+ def add_number(num: int):
+ return num
+
+ with self.dag:
+ ret = add_number(2)
+ self.dag.create_dagrun(
+ run_id=DagRunType.MANUAL.value,
+ execution_date=DEFAULT_DATE,
+ start_date=DEFAULT_DATE,
+ state=State.RUNNING
+ )
+
+ with pytest.raises(AirflowException):
+ # pylint: disable=maybe-no-member
+ ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+ def test_python_callable_arguments_are_templatized(self):
+ """Test @task op_args are templatized"""
+ recorded_calls = []
+
+ # Create a named tuple and ensure it is still preserved
+ # after the rendering is done
+ Named = namedtuple('Named', ['var1', 'var2'])
+ named_tuple = Named('{{ ds }}', 'unchanged')
+
+ task = task_decorator(
+ # a Mock instance cannot be used as a callable function or test
fails with a
+ # TypeError: Object of type Mock is not JSON serializable
+ build_recording_function(recorded_calls),
+ dag=self.dag)
+ ret = task(4, date(2019, 1, 1), "dag {{dag.dag_id}} ran on {{ds}}.",
named_tuple)
+
+ self.dag.create_dagrun(
+ run_id=DagRunType.MANUAL.value,
+ execution_date=DEFAULT_DATE,
+ start_date=DEFAULT_DATE,
+ state=State.RUNNING
+ )
+ ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) #
pylint: disable=maybe-no-member
+
+ ds_templated = DEFAULT_DATE.date().isoformat()
+ assert len(recorded_calls) == 1
+ self._assert_calls_equal(
+ recorded_calls[0],
+ Call(4,
+ date(2019, 1, 1),
+ "dag {} ran on {}.".format(self.dag.dag_id, ds_templated),
+ Named(ds_templated, 'unchanged'))
+ )
+
+ def test_python_callable_keyword_arguments_are_templatized(self):
+ """Test PythonOperator op_kwargs are templatized"""
+ recorded_calls = []
+
+ task = task_decorator(
+ # a Mock instance cannot be used as a callable function or test
fails with a
+ # TypeError: Object of type Mock is not JSON serializable
+ build_recording_function(recorded_calls),
+ dag=self.dag
+ )
+ ret = task(an_int=4, a_date=date(2019, 1, 1), a_templated_string="dag
{{dag.dag_id}} ran on {{ds}}.")
+ self.dag.create_dagrun(
+ run_id=DagRunType.MANUAL.value,
+ execution_date=DEFAULT_DATE,
+ start_date=DEFAULT_DATE,
+ state=State.RUNNING
+ )
+ ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) #
pylint: disable=maybe-no-member
+
+ assert len(recorded_calls) == 1
+ self._assert_calls_equal(
+ recorded_calls[0],
+ Call(an_int=4,
+ a_date=date(2019, 1, 1),
+ a_templated_string="dag {} ran on {}.".format(
+ self.dag.dag_id, DEFAULT_DATE.date().isoformat()))
+ )
+
+ def test_manual_task_id(self):
+ """Test manually seting task_id"""
+
+ @task_decorator(task_id='some_name')
+ def do_run():
+ return 4
+ with self.dag:
+ do_run()
+ assert ['some_name'] == self.dag.task_ids
+
+ def test_multiple_calls(self):
+ """Test calling task multiple times in a DAG"""
+
+ @task_decorator
+ def do_run():
+ return 4
+ with self.dag:
+ do_run()
+ assert ['do_run'] == self.dag.task_ids
+ do_run_1 = do_run()
+ do_run_2 = do_run()
+ assert ['do_run', 'do_run__1', 'do_run__2'] == self.dag.task_ids
+
+ assert do_run_1.operator.task_id == 'do_run__1' # pylint:
disable=maybe-no-member
+ assert do_run_2.operator.task_id == 'do_run__2' # pylint:
disable=maybe-no-member
+
+ def test_call_20(self):
+ """Test calling decorated function 10 times in a DAG"""
Review comment:
```suggestion
"""Test calling decorated function 21 times in a DAG"""
```
##########
File path: tests/operators/test_python.py
##########
@@ -311,6 +315,350 @@ def func(**context):
python_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+class TestAirflowTask(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+
+ with create_session() as session:
+ session.query(DagRun).delete()
+ session.query(TI).delete()
+
+ def setUp(self):
+ super().setUp()
+ self.dag = DAG(
+ 'test_dag',
+ default_args={
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE})
+ self.addCleanup(self.dag.clear)
+
+ def tearDown(self):
+ super().tearDown()
+
+ with create_session() as session:
+ session.query(DagRun).delete()
+ session.query(TI).delete()
+
+ def _assert_calls_equal(self, first, second):
+ assert isinstance(first, Call)
+ assert isinstance(second, Call)
+ assert first.args == second.args
+ # eliminate context (conf, dag_run, task_instance, etc.)
Review comment:
We've already got `TestPythonBase` as a base class -- inherit from that
instead of duplicating these functions again please.
##########
File path: docs/concepts.rst
##########
@@ -173,6 +214,60 @@ Each task is a node in our DAG, and there is a dependency
from task_1 to task_2:
We can say that task_1 is *upstream* of task_2, and conversely task_2 is
*downstream* of task_1.
When a DAG Run is created, task_1 will start running and task_2 waits for
task_1 to complete successfully before it may start.
+.. _concepts:task_decorator:
+
+Python task decorator
+---------------------
+*Added in Airflow 1.10.11*
+
+
+Airflow ``task`` decorator converts any Python decorated function to a Python
Airflow operator.
+The decorated function can be called once to set the arguments and key
arguments for operator execution.
+
+
+.. code:: python
+
+ with DAG('my_dag', start_date=datetime(2020, 5, 15)) as dag:
+
+ @dag.task
+ def hello_world():
+ print('hello world!')
+
+
+ # Also...
+
+ from airflow.decorators import task
+
+ @task
+ def hello_name(name: str):
+ print(f'hello {name}!')
+
+ hello_name('Airflow users')
+
+Operator decorator captures returned values and sends them to the :ref:`XCom
backend <concepts:xcom>`. By default, returned
+value is saved as a single XCom value. You can set ``multiple_outputs`` key
argument to ``True`` to unroll dictionaries,
+lists or tuples into seprate XCom values. This can be used with regular
operators to create
+:ref:`functional DAGs <concepts:functional_dags>`.
+
+Calling a decorated function returns an ``XComArg`` instance. You can use it
to set templated fields on downstream
+operators.
+
+If you call a decorated function twice in a DAG, it will error as it doesn't
know what args to use.
+If you want to reuse decorated functions, use the copy method as follows:
Review comment:
I don't think this is the case anymore, right?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]