[GitHub] [airflow] feluelle commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-06-22 Thread GitBox


feluelle commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r443747672



##
File path: docs/concepts.rst
##
@@ -173,6 +213,62 @@ Each task is a node in our DAG, and there is a dependency 
from task_1 to task_2:
 We can say that task_1 is *upstream* of task_2, and conversely task_2 is 
*downstream* of task_1.
 When a DAG Run is created, task_1 will start running and task_2 waits for 
task_1 to complete successfully before it may start.
 
+.. _concepts:task_decorator:
+
+Python task decorator
+-
+
+Airflow ``task`` decorator converts any Python function to an Airflow operator.
+The decorated function can be called once to set the arguments and key 
arguments for operator execution.
+
+
+.. code-block:: python
+
+  with DAG('my_dag', start_date=datetime(2020, 5, 15)) as dag:
+  @dag.task
+  def hello_world():
+  print('hello world!')
+
+
+  # Also...
+  from airflow.decorators import task
+
+
+  @task
+  def hello_name(name: str):
+  print(f'hello {name}!')
+
+
+  hello_name('Airflow users')
+
+Task decorator captures returned values and sends them to the :ref:`XCom 
backend `. By default, returned
+value is saved as a single XCom value. You can set ``multiple_outputs`` key 
argument to ``True`` to unroll dictionaries,
+lists or tuples into seprate XCom values. This can be used with regular 
operators to create
+:ref:`functional DAGs `.
+
+Calling a decorated function returns an ``XComArg`` instance. You can use it 
to set templated fields on downstream
+operators.
+
+You can call a decorated function more than once in a DAG. The decorated 
function will automatically generate
+a unique ``task_id`` for each generated operator.
+
+.. code-block:: python
+
+  with DAG('my_dag', start_date=datetime(2020, 5, 15)) as dag:
+
+@dag.task
+def update_user(user_id: int):
+  ...
+
+# Avoid generating this list dynamically to keep DAG topology stable 
between DAG runs
+for user_id in user_ids:
+  update_current(user_id)

Review comment:
   ```suggestion
 update_user(user_id)
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] feluelle commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-05-28 Thread GitBox


feluelle commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r431671551



##
File path: docs/concepts.rst
##
@@ -116,6 +116,47 @@ DAGs can be used as context managers to automatically 
assign new operators to th
 
 op.dag is dag # True
 
+.. _concepts:functional_dags:
+
+Functional DAGs
+---
+*Added in Airflow 1.10.11*
+
+DAGs can be defined using functional abstractions. Outputs and inputs are sent 
between tasks using
+:ref:`XComs ` values. In addition, you can wrap functions as 
tasks using the
+:ref:`task decorator `. Dependencies are 
automatically inferred from
+the message dependencies.
+
+Example DAG with functional abstraction
+
+.. code:: python
+
+  with DAG(
+  'send_server_ip', default_args=default_args, schedule_interval=None
+  ) as dag:
+
+# Using default connection as it's set to httpbin.org by default
+get_ip = SimpleHttpOperator(
+task_id='get_ip', endpoint='get', method='GET', xcom_push=True
+)
+
+@dag.task(multiple_outputs=True)
+def prepare_email(raw_json: str) -> str:
+  external_ip = json.loads(raw_json)['origin']
+  return {
+'subject':f'Server connected from {external_ip}',
+'body': f'Seems like today your server executing Airflow is connected 
from the external IP {external_ip}'

Review comment:
   There are more cases where you are using F-Strings 
https://github.com/apache/airflow/pull/8962#discussion_r431670798





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] feluelle commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-05-28 Thread GitBox


feluelle commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r431670798



##
File path: airflow/operators/python.py
##
@@ -145,6 +148,140 @@ def execute_callable(self):
 return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
+class _PythonFunctionalOperator(BaseOperator):
+"""
+Wraps a Python callable and captures args/kwargs when called for execution.
+
+:param python_callable: A reference to an object that is callable
+:type python_callable: python callable
+:param multiple_outputs: if set, function return value will be
+unrolled to multiple XCom values. Dict will unroll to xcom values with 
keys as keys.
+Defaults to False.
+:type multiple_outputs: bool
+"""
+
+template_fields = ('_op_args', '_op_kwargs')
+ui_color = '#ffefeb'
+
+# since we won't mutate the arguments, we should just do the shallow copy
+# there are some cases we can't deepcopy the objects(e.g protobuf).
+shallow_copy_attrs = ('python_callable',)
+
+@apply_defaults
+def __init__(
+self,
+python_callable: Callable,
+multiple_outputs: bool = False,
+**kwargs
+) -> None:
+self._validate_python_callable(python_callable)
+super().__init__(**kwargs)
+self.python_callable = python_callable
+self.multiple_outputs = multiple_outputs
+self._kwargs = kwargs
+self._op_args: List[Any] = []
+self._called = False
+self._op_kwargs: Dict[str, Any] = {}
+
+@staticmethod
+def _get_unique_task_id(task_id: str, dag: Optional[DAG]) -> str:
+dag = dag or DagContext.get_current_dag()
+if not dag or task_id not in dag.task_ids:
+return task_id
+core = re.split(r'__\d+$', task_id)[0]
+suffixes = sorted(
+[int(re.split(r'^.+__', task_id)[1])
+ for task_id in dag.task_ids
+ if re.match(rf'^{core}__\d+$', task_id)]
+)
+if not suffixes:
+return f'{core}__1'
+return f'{core}__{suffixes[-1] + 1}'

Review comment:
   You are using F-Strings here, too - not only in examples.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] feluelle commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-05-28 Thread GitBox


feluelle commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r431670798



##
File path: airflow/operators/python.py
##
@@ -145,6 +148,140 @@ def execute_callable(self):
 return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
+class _PythonFunctionalOperator(BaseOperator):
+"""
+Wraps a Python callable and captures args/kwargs when called for execution.
+
+:param python_callable: A reference to an object that is callable
+:type python_callable: python callable
+:param multiple_outputs: if set, function return value will be
+unrolled to multiple XCom values. Dict will unroll to xcom values with 
keys as keys.
+Defaults to False.
+:type multiple_outputs: bool
+"""
+
+template_fields = ('_op_args', '_op_kwargs')
+ui_color = '#ffefeb'
+
+# since we won't mutate the arguments, we should just do the shallow copy
+# there are some cases we can't deepcopy the objects(e.g protobuf).
+shallow_copy_attrs = ('python_callable',)
+
+@apply_defaults
+def __init__(
+self,
+python_callable: Callable,
+multiple_outputs: bool = False,
+**kwargs
+) -> None:
+self._validate_python_callable(python_callable)
+super().__init__(**kwargs)
+self.python_callable = python_callable
+self.multiple_outputs = multiple_outputs
+self._kwargs = kwargs
+self._op_args: List[Any] = []
+self._called = False
+self._op_kwargs: Dict[str, Any] = {}
+
+@staticmethod
+def _get_unique_task_id(task_id: str, dag: Optional[DAG]) -> str:
+dag = dag or DagContext.get_current_dag()
+if not dag or task_id not in dag.task_ids:
+return task_id
+core = re.split(r'__\d+$', task_id)[0]
+suffixes = sorted(
+[int(re.split(r'^.+__', task_id)[1])
+ for task_id in dag.task_ids
+ if re.match(rf'^{core}__\d+$', task_id)]
+)
+if not suffixes:
+return f'{core}__1'
+return f'{core}__{suffixes[-1] + 1}'

Review comment:
   You are also using F-Strings here, too - not only in examples.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] feluelle commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-05-27 Thread GitBox


feluelle commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r431366969



##
File path: docs/concepts.rst
##
@@ -116,6 +116,47 @@ DAGs can be used as context managers to automatically 
assign new operators to th
 
 op.dag is dag # True
 
+.. _concepts:functional_dags:
+
+Functional DAGs
+---
+*Added in Airflow 1.10.11*
+
+DAGs can be defined using functional abstractions. Outputs and inputs are sent 
between tasks using
+:ref:`XComs ` values. In addition, you can wrap functions as 
tasks using the
+:ref:`task decorator `. Dependencies are 
automatically inferred from
+the message dependencies.
+
+Example DAG with functional abstraction
+
+.. code:: python
+
+  with DAG(
+  'send_server_ip', default_args=default_args, schedule_interval=None
+  ) as dag:
+
+# Using default connection as it's set to httpbin.org by default
+get_ip = SimpleHttpOperator(
+task_id='get_ip', endpoint='get', method='GET', xcom_push=True
+)
+
+@dag.task(multiple_outputs=True)
+def prepare_email(raw_json: str) -> str:
+  external_ip = json.loads(raw_json)['origin']
+  return {
+'subject':f'Server connected from {external_ip}',
+'body': f'Seems like today your server executing Airflow is connected 
from the external IP {external_ip}'

Review comment:
   Or we could use 
[future-fstrings](https://github.com/asottile/future-fstrings). We already have 
it as dep of a 
[dep](https://github.com/apache/airflow/search?q=future-fstrings&unscoped_q=future-fstrings)
 and add `# -*- coding: future_fstrings -*-`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] feluelle commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-05-27 Thread GitBox


feluelle commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r431035377



##
File path: airflow/operators/python.py
##
@@ -145,6 +147,141 @@ def execute_callable(self):
 return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
+class _PythonFunctionalOperator(BaseOperator):
+"""
+Wraps a Python callable and captures args/kwargs when called for execution.
+
+:param python_callable: A reference to an object that is callable
+:type python_callable: python callable
+:param multiple_outputs: if set, function return value will be
+unrolled to multiple XCom values. List/Tuples will unroll to xcom 
values
+with index as key. Dict will unroll to xcom values with keys as keys.
+Defaults to False.
+:type multiple_outputs: bool
+"""
+
+template_fields = ('_op_args', '_op_kwargs')
+ui_color = '#ffefeb'
+
+# since we won't mutate the arguments, we should just do the shallow copy
+# there are some cases we can't deepcopy the objects(e.g protobuf).
+shallow_copy_attrs = ('python_callable',)
+
+@apply_defaults
+def __init__(
+self,
+python_callable: Callable,
+multiple_outputs: bool = False,
+*args,
+**kwargs
+) -> None:
+# Check if we need to generate a new task_id
+task_id = kwargs.get('task_id', None)
+dag = kwargs.get('dag', None) or DagContext.get_current_dag()
+if task_id and dag and task_id in dag.task_ids:
+prefix = task_id.rsplit("__", 1)[0]
+task_id = sorted(
+filter(lambda x: x.startswith(prefix), dag.task_ids),
+reverse=True
+)[0]
+num = int(task_id[-1] if '__' in task_id else '0') + 1
+kwargs['task_id'] = f'{prefix}__{num}'
+
+if not kwargs.get('do_xcom_push', True) and not multiple_outputs:
+raise AirflowException('@task needs to have either 
do_xcom_push=True or '
+   'multiple_outputs=True.')
+if not callable(python_callable):
+raise AirflowException('`python_callable` param must be callable')
+self._fail_if_method(python_callable)
+super().__init__(*args, **kwargs)
+self.python_callable = python_callable
+self.multiple_outputs = multiple_outputs
+self._kwargs = kwargs

Review comment:
   Yes, I think that would be better. +1, for dropping it.

##
File path: docs/concepts.rst
##
@@ -116,6 +116,47 @@ DAGs can be used as context managers to automatically 
assign new operators to th
 
 op.dag is dag # True
 
+.. _concepts:functional_dags:
+
+Functional DAGs
+---
+*Added in Airflow 1.10.11*
+
+DAGs can be defined using functional abstractions. Outputs and inputs are sent 
between tasks using
+:ref:`XComs ` values. In addition, you can wrap functions as 
tasks using the
+:ref:`task decorator `. Dependencies are 
automatically inferred from
+the message dependencies.
+
+Example DAG with functional abstraction
+
+.. code:: python
+
+  with DAG(
+  'send_server_ip', default_args=default_args, schedule_interval=None
+  ) as dag:
+
+# Using default connection as it's set to httpbin.org by default
+get_ip = SimpleHttpOperator(
+task_id='get_ip', endpoint='get', method='GET', xcom_push=True
+)
+
+@dag.task(multiple_outputs=True)
+def prepare_email(raw_json: str) -> str:
+  external_ip = json.loads(raw_json)['origin']
+  return {
+'subject':f'Server connected from {external_ip}',
+'body': f'Seems like today your server executing Airflow is connected 
from the external IP {external_ip}'

Review comment:
   So if we want this in 1.10.11 we need to be very careful :/

##
File path: airflow/operators/python.py
##
@@ -145,6 +147,141 @@ def execute_callable(self):
 return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
+class _PythonFunctionalOperator(BaseOperator):
+"""
+Wraps a Python callable and captures args/kwargs when called for execution.
+
+:param python_callable: A reference to an object that is callable
+:type python_callable: python callable
+:param multiple_outputs: if set, function return value will be
+unrolled to multiple XCom values. List/Tuples will unroll to xcom 
values
+with index as key. Dict will unroll to xcom values with keys as keys.
+Defaults to False.
+:type multiple_outputs: bool
+"""
+
+template_fields = ('_op_args', '_op_kwargs')
+ui_color = '#ffefeb'
+
+# since we won't mutate the arguments, we should just do the shallow copy
+# there are some cases we can't deepcopy the objects(e.g protobuf).
+shallow_copy_attrs = ('python_callable',)
+
+@apply_defaults
+def __init__(
+self,
+python_callable: Callable,
+multiple_outputs: boo