[GitHub] [airflow] turbaszek commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-06-19 Thread GitBox


turbaszek commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r442706891



##
File path: docs/concepts.rst
##
@@ -116,6 +116,46 @@ DAGs can be used as context managers to automatically 
assign new operators to th
 
 op.dag is dag # True
 
+.. _concepts:functional_dags:
+
+Functional DAGs

Review comment:
   Should we make an issue for that, discuss and then adjust the docs in a 
follow-up PR?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-06-19 Thread GitBox


turbaszek commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r442706891



##
File path: docs/concepts.rst
##
@@ -116,6 +116,46 @@ DAGs can be used as context managers to automatically 
assign new operators to th
 
 op.dag is dag # True
 
+.. _concepts:functional_dags:
+
+Functional DAGs

Review comment:
   Definitely! We should make an issue for that, discuss and then adjust 
the docs in a follow-up PR





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-06-18 Thread GitBox


turbaszek commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r442038382



##
File path: docs/concepts.rst
##
@@ -116,6 +116,46 @@ DAGs can be used as context managers to automatically 
assign new operators to th
 
 op.dag is dag # True
 
+.. _concepts:functional_dags:
+
+Functional DAGs

Review comment:
   @casassg do we have any backwards issues other than metaclass? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-06-17 Thread GitBox


turbaszek commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r441328022



##
File path: docs/concepts.rst
##
@@ -116,6 +116,46 @@ DAGs can be used as context managers to automatically 
assign new operators to th
 
 op.dag is dag # True
 
+.. _concepts:functional_dags:
+
+Functional DAGs

Review comment:
   Does this need any name? Can't we just say that this is a way how users 
can define DAGs? Personally I would encourage users to use this way instead of 
using XComs. WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-06-11 Thread GitBox


turbaszek commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r438640458



##
File path: airflow/operators/python.py
##
@@ -145,6 +149,131 @@ def execute_callable(self):
 return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
+class _PythonFunctionalOperator(BaseOperator):
+"""
+Wraps a Python callable and captures args/kwargs when called for execution.
+
+:param python_callable: A reference to an object that is callable
+:type python_callable: python callable
+:param op_kwargs: a dictionary of keyword arguments that will get unpacked
+in your function
+:type op_kwargs: dict (templated)
+:param op_args: a list of positional arguments that will get unpacked when
+calling your callable
+:type op_args: list (templated)
+:param multiple_outputs: if set, function return value will be
+unrolled to multiple XCom values. Dict will unroll to xcom values with 
keys as keys.
+Defaults to False.
+:type multiple_outputs: bool
+"""
+
+template_fields = ('op_args', 'op_kwargs')
+ui_color = '#ffefeb'
+
+# since we won't mutate the arguments, we should just do the shallow copy
+# there are some cases we can't deepcopy the objects(e.g protobuf).
+shallow_copy_attrs = ('python_callable',)
+
+@apply_defaults
+def __init__(
+self,
+python_callable: Callable,
+op_args: Tuple[Any],
+op_kwargs: Dict[str, Any],
+multiple_outputs: bool = False,
+**kwargs
+) -> None:
+kwargs['task_id'] = self._get_unique_task_id(kwargs['task_id'], 
kwargs.get('dag', None))
+super().__init__(**kwargs)

Review comment:
   I see another issue here. Currently, if `task_id` is not provided user 
will get `KeyError: 'task_id'` instead of `TypeError: __init__() missing 1 
required positional argument: 'task_id' `
   
   Also, this seems to work as expected:
   ```python
   In [8]: class CustomOp(BaseOperator):
  ...: def __init__(self, a, b, *args, **kwargs):
  ...: super().__init__(*args, **kwargs)
  ...: self.task_id = "other task id"
  ...:
   
   In [9]: op = CustomOp(a=1, b=2, task_id="task_id")
   
   In [10]: op.task_id
   Out[10]: 'other task id'
   ```
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-06-11 Thread GitBox


turbaszek commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r438646036



##
File path: airflow/operators/python.py
##
@@ -145,6 +148,142 @@ def execute_callable(self):
 return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
+class _PythonFunctionalOperator(BaseOperator):
+"""
+Wraps a Python callable and captures args/kwargs when called for execution.
+
+:param python_callable: A reference to an object that is callable
+:type python_callable: python callable
+:param multiple_outputs: if set, function return value will be
+unrolled to multiple XCom values. List/Tuples will unroll to xcom 
values
+with index as key. Dict will unroll to xcom values with keys as keys.
+Defaults to False.
+:type multiple_outputs: bool
+"""
+
+template_fields = ('_op_args', '_op_kwargs')
+ui_color = '#ffefeb'
+
+# since we won't mutate the arguments, we should just do the shallow copy
+# there are some cases we can't deepcopy the objects(e.g protobuf).
+shallow_copy_attrs = ('python_callable',)
+
+@apply_defaults
+def __init__(
+self,
+python_callable: Callable,
+multiple_outputs: bool = False,
+*args,
+**kwargs
+) -> None:
+dag = kwargs.get('dag', None) or DagContext.get_current_dag()
+kwargs['task_id'] = self._get_unique_task_id(kwargs['task_id'], dag)
+self._validate_python_callable(python_callable)
+super().__init__(*args, **kwargs)
+self.python_callable = python_callable
+self.multiple_outputs = multiple_outputs
+self._kwargs = kwargs
+self._op_args: List[Any] = []
+self._called = False
+self._op_kwargs: Dict[str, Any] = {}
+
+@staticmethod
+def _get_unique_task_id(task_id, dag):
+if not dag or task_id not in dag.task_ids:
+return task_id
+core = re.split(r'__\d+$', task_id)[0]
+suffixes = sorted(
+[int(re.split(r'^.+__', task_id)[1])
+ for task_id in dag.task_ids
+ if re.match(rf'^{core}__\d+$', task_id)]
+)
+if not suffixes:
+return f'{core}__1'
+return f'{core}__{suffixes[-1] + 1}'
+
+@staticmethod
+def _validate_python_callable(python_callable):
+if not callable(python_callable):
+raise AirflowException('`python_callable` param must be callable')
+if 'self' in signature(python_callable).parameters.keys():
+raise AirflowException('@task does not support methods')
+
+def __call__(self, *args, **kwargs):
+# If args/kwargs are set, then operator has been called. Raise 
exception
+if self._called:
+raise AirflowException('@task decorated functions can only be 
called once. If you need to reuse '
+   'it several times in a DAG, use the `copy` 
method.')
+
+# If we have no DAG, reinitialize class to capture DAGContext and DAG 
default args.
+if not self.has_dag():
+self.__init__(python_callable=self.python_callable,
+  multiple_outputs=self.multiple_outputs,
+  **self._kwargs)
+
+# Capture args/kwargs
+self._op_args = args
+self._op_kwargs = kwargs
+self._called = True
+return XComArg(self)
+
+def copy(self, task_id: Optional[str] = None, **kwargs):
+"""
+Create a copy of the task, allow to overwrite ctor kwargs if needed.
+
+If alias is created a new DAGContext, apply defaults and set new DAG 
as the operator DAG.
+
+:param task_id: Task id for the new operator
+:type task_id: Optional[str]
+"""
+if task_id:
+self._kwargs['task_id'] = task_id
+return _PythonFunctionalOperator(
+python_callable=self.python_callable,
+multiple_outputs=self.multiple_outputs,
+**{**kwargs, **self._kwargs}
+)
+
+def execute(self, context: Dict):
+return_value = self.python_callable(*self._op_args, **self._op_kwargs)
+self.log.info("Done. Returned value was: %s", return_value)
+if not self.multiple_outputs:
+return return_value
+if isinstance(return_value, dict):
+for key, value in return_value.items():
+self.xcom_push(context, str(key), value)
+elif isinstance(return_value, (list, tuple)):
+for key, value in enumerate(return_value):
+self.xcom_push(context, str(key), value)
+return return_value
+
+
+def task(python_callable: Optional[Callable] = None, **kwargs):
+"""
+Python operator decorator. Wraps a function into an Airflow operator.
+Accepts kwargs for operator kwarg. Will try to wrap operator into DAG at 
declaration or
+on function invocation. Use alias to 

[GitHub] [airflow] turbaszek commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-06-11 Thread GitBox


turbaszek commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r438640458



##
File path: airflow/operators/python.py
##
@@ -145,6 +149,131 @@ def execute_callable(self):
 return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
+class _PythonFunctionalOperator(BaseOperator):
+"""
+Wraps a Python callable and captures args/kwargs when called for execution.
+
+:param python_callable: A reference to an object that is callable
+:type python_callable: python callable
+:param op_kwargs: a dictionary of keyword arguments that will get unpacked
+in your function
+:type op_kwargs: dict (templated)
+:param op_args: a list of positional arguments that will get unpacked when
+calling your callable
+:type op_args: list (templated)
+:param multiple_outputs: if set, function return value will be
+unrolled to multiple XCom values. Dict will unroll to xcom values with 
keys as keys.
+Defaults to False.
+:type multiple_outputs: bool
+"""
+
+template_fields = ('op_args', 'op_kwargs')
+ui_color = '#ffefeb'
+
+# since we won't mutate the arguments, we should just do the shallow copy
+# there are some cases we can't deepcopy the objects(e.g protobuf).
+shallow_copy_attrs = ('python_callable',)
+
+@apply_defaults
+def __init__(
+self,
+python_callable: Callable,
+op_args: Tuple[Any],
+op_kwargs: Dict[str, Any],
+multiple_outputs: bool = False,
+**kwargs
+) -> None:
+kwargs['task_id'] = self._get_unique_task_id(kwargs['task_id'], 
kwargs.get('dag', None))
+super().__init__(**kwargs)

Review comment:
   I see another issue here. Currently, if `task_id` is not provided user 
will get `KeyError: 'task_id'` instead of TypeError: __init__() missing 1 
required positional argument: 'task_id' 
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-06-11 Thread GitBox


turbaszek commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r438640458



##
File path: airflow/operators/python.py
##
@@ -145,6 +149,131 @@ def execute_callable(self):
 return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
+class _PythonFunctionalOperator(BaseOperator):
+"""
+Wraps a Python callable and captures args/kwargs when called for execution.
+
+:param python_callable: A reference to an object that is callable
+:type python_callable: python callable
+:param op_kwargs: a dictionary of keyword arguments that will get unpacked
+in your function
+:type op_kwargs: dict (templated)
+:param op_args: a list of positional arguments that will get unpacked when
+calling your callable
+:type op_args: list (templated)
+:param multiple_outputs: if set, function return value will be
+unrolled to multiple XCom values. Dict will unroll to xcom values with 
keys as keys.
+Defaults to False.
+:type multiple_outputs: bool
+"""
+
+template_fields = ('op_args', 'op_kwargs')
+ui_color = '#ffefeb'
+
+# since we won't mutate the arguments, we should just do the shallow copy
+# there are some cases we can't deepcopy the objects(e.g protobuf).
+shallow_copy_attrs = ('python_callable',)
+
+@apply_defaults
+def __init__(
+self,
+python_callable: Callable,
+op_args: Tuple[Any],
+op_kwargs: Dict[str, Any],
+multiple_outputs: bool = False,
+**kwargs
+) -> None:
+kwargs['task_id'] = self._get_unique_task_id(kwargs['task_id'], 
kwargs.get('dag', None))
+super().__init__(**kwargs)

Review comment:
   I see another issue here. Currently, if `task_id` is not provided user 
will get `KeyError: 'task_id'` instead of `TypeError: __init__() missing 1 
required positional argument: 'task_id' `
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-06-09 Thread GitBox


turbaszek commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r437204483



##
File path: airflow/operators/python.py
##
@@ -145,6 +149,131 @@ def execute_callable(self):
 return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
+class _PythonFunctionalOperator(BaseOperator):
+"""
+Wraps a Python callable and captures args/kwargs when called for execution.
+
+:param python_callable: A reference to an object that is callable
+:type python_callable: python callable
+:param op_kwargs: a dictionary of keyword arguments that will get unpacked
+in your function
+:type op_kwargs: dict (templated)
+:param op_args: a list of positional arguments that will get unpacked when
+calling your callable
+:type op_args: list (templated)
+:param multiple_outputs: if set, function return value will be
+unrolled to multiple XCom values. Dict will unroll to xcom values with 
keys as keys.
+Defaults to False.
+:type multiple_outputs: bool
+"""
+
+template_fields = ('op_args', 'op_kwargs')
+ui_color = '#ffefeb'
+
+# since we won't mutate the arguments, we should just do the shallow copy
+# there are some cases we can't deepcopy the objects(e.g protobuf).
+shallow_copy_attrs = ('python_callable',)
+
+@apply_defaults
+def __init__(
+self,
+python_callable: Callable,
+op_args: Tuple[Any],
+op_kwargs: Dict[str, Any],
+multiple_outputs: bool = False,
+**kwargs
+) -> None:
+kwargs['task_id'] = self._get_unique_task_id(kwargs['task_id'], 
kwargs.get('dag', None))
+super().__init__(**kwargs)

Review comment:
   No strong opinion here, I just think this is more explicit 

##
File path: airflow/operators/python.py
##
@@ -145,6 +149,131 @@ def execute_callable(self):
 return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
+class _PythonFunctionalOperator(BaseOperator):
+"""
+Wraps a Python callable and captures args/kwargs when called for execution.
+
+:param python_callable: A reference to an object that is callable
+:type python_callable: python callable
+:param op_kwargs: a dictionary of keyword arguments that will get unpacked
+in your function
+:type op_kwargs: dict (templated)
+:param op_args: a list of positional arguments that will get unpacked when
+calling your callable
+:type op_args: list (templated)
+:param multiple_outputs: if set, function return value will be
+unrolled to multiple XCom values. Dict will unroll to xcom values with 
keys as keys.
+Defaults to False.
+:type multiple_outputs: bool
+"""
+
+template_fields = ('op_args', 'op_kwargs')
+ui_color = '#ffefeb'
+
+# since we won't mutate the arguments, we should just do the shallow copy
+# there are some cases we can't deepcopy the objects(e.g protobuf).
+shallow_copy_attrs = ('python_callable',)
+
+@apply_defaults
+def __init__(
+self,
+python_callable: Callable,
+op_args: Tuple[Any],
+op_kwargs: Dict[str, Any],
+multiple_outputs: bool = False,
+**kwargs
+) -> None:
+kwargs['task_id'] = self._get_unique_task_id(kwargs['task_id'], 
kwargs.get('dag', None))
+super().__init__(**kwargs)
+self.python_callable = python_callable
+
+# Check that arguments can be binded
+signature(python_callable).bind(*op_args, **op_kwargs)
+self.multiple_outputs = multiple_outputs
+self.op_args = op_args
+self.op_kwargs = op_kwargs
+
+@staticmethod
+def _get_unique_task_id(task_id: str, dag: Optional[DAG]) -> str:
+dag = dag or DagContext.get_current_dag()
+if not dag or task_id not in dag.task_ids:
+return task_id
+core = re.split(r'__\d+$', task_id)[0]
+suffixes = sorted(
+[int(re.split(r'^.+__', task_id)[1])
+ for task_id in dag.task_ids
+ if re.match(rf'^{core}__\d+$', task_id)]
+)
+if not suffixes:
+return f'{core}__1'
+return f'{core}__{suffixes[-1] + 1}'
+
+@staticmethod
+def validate_python_callable(python_callable):
+"""Validate that python callable can be wrapped by operator.
+Raises exception if invalid.
+
+:param python_callable: Python object to be validated
+:raises: TypeError, AirflowException
+"""
+if not callable(python_callable):
+raise TypeError('`python_callable` param must be callable')
+if 'self' in signature(python_callable).parameters.keys():
+raise AirflowException('@task does not support methods')

Review comment:
   What about class methods?





This is an automated message from the 

[GitHub] [airflow] turbaszek commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-05-29 Thread GitBox


turbaszek commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r432401785



##
File path: airflow/operators/python.py
##
@@ -145,6 +149,131 @@ def execute_callable(self):
 return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
+class _PythonFunctionalOperator(BaseOperator):
+"""
+Wraps a Python callable and captures args/kwargs when called for execution.
+
+:param python_callable: A reference to an object that is callable
+:type python_callable: python callable
+:param op_kwargs: a dictionary of keyword arguments that will get unpacked
+in your function
+:type op_kwargs: dict (templated)
+:param op_args: a list of positional arguments that will get unpacked when
+calling your callable
+:type op_args: list (templated)
+:param multiple_outputs: if set, function return value will be
+unrolled to multiple XCom values. Dict will unroll to xcom values with 
keys as keys.
+Defaults to False.
+:type multiple_outputs: bool
+"""
+
+template_fields = ('op_args', 'op_kwargs')
+ui_color = '#ffefeb'

Review comment:
   ```suggestion
   ui_color = PythonOperator. ui_color
   ```
   To keep it consistent. WDYT?

##
File path: airflow/operators/python.py
##
@@ -145,6 +149,131 @@ def execute_callable(self):
 return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
+class _PythonFunctionalOperator(BaseOperator):
+"""
+Wraps a Python callable and captures args/kwargs when called for execution.
+
+:param python_callable: A reference to an object that is callable
+:type python_callable: python callable
+:param op_kwargs: a dictionary of keyword arguments that will get unpacked
+in your function
+:type op_kwargs: dict (templated)
+:param op_args: a list of positional arguments that will get unpacked when
+calling your callable
+:type op_args: list (templated)
+:param multiple_outputs: if set, function return value will be
+unrolled to multiple XCom values. Dict will unroll to xcom values with 
keys as keys.
+Defaults to False.
+:type multiple_outputs: bool
+"""
+
+template_fields = ('op_args', 'op_kwargs')
+ui_color = '#ffefeb'
+
+# since we won't mutate the arguments, we should just do the shallow copy
+# there are some cases we can't deepcopy the objects(e.g protobuf).

Review comment:
   ```suggestion
   # there are some cases we can't deepcopy the objects (e.g protobuf).
   ```

##
File path: airflow/operators/python.py
##
@@ -145,6 +149,131 @@ def execute_callable(self):
 return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
+class _PythonFunctionalOperator(BaseOperator):
+"""
+Wraps a Python callable and captures args/kwargs when called for execution.
+
+:param python_callable: A reference to an object that is callable
+:type python_callable: python callable
+:param op_kwargs: a dictionary of keyword arguments that will get unpacked
+in your function
+:type op_kwargs: dict (templated)
+:param op_args: a list of positional arguments that will get unpacked when
+calling your callable
+:type op_args: list (templated)
+:param multiple_outputs: if set, function return value will be
+unrolled to multiple XCom values. Dict will unroll to xcom values with 
keys as keys.
+Defaults to False.
+:type multiple_outputs: bool
+"""
+
+template_fields = ('op_args', 'op_kwargs')
+ui_color = '#ffefeb'
+
+# since we won't mutate the arguments, we should just do the shallow copy
+# there are some cases we can't deepcopy the objects(e.g protobuf).
+shallow_copy_attrs = ('python_callable',)
+
+@apply_defaults
+def __init__(
+self,
+python_callable: Callable,
+op_args: Tuple[Any],
+op_kwargs: Dict[str, Any],
+multiple_outputs: bool = False,
+**kwargs
+) -> None:
+kwargs['task_id'] = self._get_unique_task_id(kwargs['task_id'], 
kwargs.get('dag', None))
+super().__init__(**kwargs)
+self.python_callable = python_callable
+
+# Check that arguments can be binded
+signature(python_callable).bind(*op_args, **op_kwargs)
+self.multiple_outputs = multiple_outputs
+self.op_args = op_args
+self.op_kwargs = op_kwargs
+
+@staticmethod
+def _get_unique_task_id(task_id: str, dag: Optional[DAG]) -> str:
+dag = dag or DagContext.get_current_dag()
+if not dag or task_id not in dag.task_ids:
+return task_id
+core = re.split(r'__\d+$', task_id)[0]
+suffixes = sorted(
+[int(re.split(r'^.+__', task_id)[1])
+ for task_id in dag.task_ids
+ if re.match(rf'^{core}__\d+$', task_id)]
+)
+if not suffixes:
+  

[GitHub] [airflow] turbaszek commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-05-28 Thread GitBox


turbaszek commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r431578845



##
File path: airflow/operators/python.py
##
@@ -145,6 +147,141 @@ def execute_callable(self):
 return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
+class _PythonFunctionalOperator(BaseOperator):
+"""
+Wraps a Python callable and captures args/kwargs when called for execution.
+
+:param python_callable: A reference to an object that is callable
+:type python_callable: python callable
+:param multiple_outputs: if set, function return value will be
+unrolled to multiple XCom values. List/Tuples will unroll to xcom 
values
+with index as key. Dict will unroll to xcom values with keys as keys.
+Defaults to False.
+:type multiple_outputs: bool
+"""
+
+template_fields = ('_op_args', '_op_kwargs')
+ui_color = '#ffefeb'
+
+# since we won't mutate the arguments, we should just do the shallow copy
+# there are some cases we can't deepcopy the objects(e.g protobuf).
+shallow_copy_attrs = ('python_callable',)
+
+@apply_defaults
+def __init__(
+self,
+python_callable: Callable,
+multiple_outputs: bool = False,
+*args,
+**kwargs
+) -> None:
+# Check if we need to generate a new task_id
+task_id = kwargs.get('task_id', None)
+dag = kwargs.get('dag', None) or DagContext.get_current_dag()
+if task_id and dag and task_id in dag.task_ids:
+prefix = task_id.rsplit("__", 1)[0]
+task_id = sorted(
+filter(lambda x: x.startswith(prefix), dag.task_ids),
+reverse=True
+)[0]
+num = int(task_id[-1] if '__' in task_id else '0') + 1
+kwargs['task_id'] = f'{prefix}__{num}'
+
+if not kwargs.get('do_xcom_push', True) and not multiple_outputs:
+raise AirflowException('@task needs to have either 
do_xcom_push=True or '
+   'multiple_outputs=True.')
+if not callable(python_callable):
+raise AirflowException('`python_callable` param must be callable')
+self._fail_if_method(python_callable)
+super().__init__(*args, **kwargs)
+self.python_callable = python_callable
+self.multiple_outputs = multiple_outputs
+self._kwargs = kwargs
+self._op_args: List[Any] = []
+self._called = False
+self._op_kwargs: Dict[str, Any] = {}
+
+@staticmethod
+def _fail_if_method(python_callable):
+if 'self' in signature(python_callable).parameters.keys():
+raise AirflowException('@task does not support methods')
+
+def __call__(self, *args, **kwargs):
+# If args/kwargs are set, then operator has been called. Raise 
exception
+if self._called:

Review comment:
   > We could capture task_id kwarg and generate a new operator, but then 
what is `update_user` the first operator, the latest one? What does 
`update_user` represent?
   
   For me `update_user` is a function and as a function it can be called many 
times with different input thus yielding different results (here creating new 
task). I have never meet "function as a singleton" pattern. If we don't want to 
generate `task_id` for users then we may consider raising an exception on 
second invocation when no custom `task_id` is passed. 
   
   My point is: this is a function, I expect to be able to call it as many time 
as I wish. I expect Airflow to treat each call of this function (in proper 
context) as creating a new task.
   
   > You can either do (1) `update_user(i) for i in range(20)` or (2) 
`update_user >> other_operation`, but not both. I prefer to support 2nd option 
as it adapts more to what Airflow already does with operators.
   
   Why should I not be able to do this? This is something that I saw many 
times. 
   ```python
   first_task = BashOperator()
   last_task = BashOperator()
   
   for user_id in users_list:
  first_task >> update_user(user_id) >> last_task
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-05-28 Thread GitBox


turbaszek commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r431578845



##
File path: airflow/operators/python.py
##
@@ -145,6 +147,141 @@ def execute_callable(self):
 return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
+class _PythonFunctionalOperator(BaseOperator):
+"""
+Wraps a Python callable and captures args/kwargs when called for execution.
+
+:param python_callable: A reference to an object that is callable
+:type python_callable: python callable
+:param multiple_outputs: if set, function return value will be
+unrolled to multiple XCom values. List/Tuples will unroll to xcom 
values
+with index as key. Dict will unroll to xcom values with keys as keys.
+Defaults to False.
+:type multiple_outputs: bool
+"""
+
+template_fields = ('_op_args', '_op_kwargs')
+ui_color = '#ffefeb'
+
+# since we won't mutate the arguments, we should just do the shallow copy
+# there are some cases we can't deepcopy the objects(e.g protobuf).
+shallow_copy_attrs = ('python_callable',)
+
+@apply_defaults
+def __init__(
+self,
+python_callable: Callable,
+multiple_outputs: bool = False,
+*args,
+**kwargs
+) -> None:
+# Check if we need to generate a new task_id
+task_id = kwargs.get('task_id', None)
+dag = kwargs.get('dag', None) or DagContext.get_current_dag()
+if task_id and dag and task_id in dag.task_ids:
+prefix = task_id.rsplit("__", 1)[0]
+task_id = sorted(
+filter(lambda x: x.startswith(prefix), dag.task_ids),
+reverse=True
+)[0]
+num = int(task_id[-1] if '__' in task_id else '0') + 1
+kwargs['task_id'] = f'{prefix}__{num}'
+
+if not kwargs.get('do_xcom_push', True) and not multiple_outputs:
+raise AirflowException('@task needs to have either 
do_xcom_push=True or '
+   'multiple_outputs=True.')
+if not callable(python_callable):
+raise AirflowException('`python_callable` param must be callable')
+self._fail_if_method(python_callable)
+super().__init__(*args, **kwargs)
+self.python_callable = python_callable
+self.multiple_outputs = multiple_outputs
+self._kwargs = kwargs
+self._op_args: List[Any] = []
+self._called = False
+self._op_kwargs: Dict[str, Any] = {}
+
+@staticmethod
+def _fail_if_method(python_callable):
+if 'self' in signature(python_callable).parameters.keys():
+raise AirflowException('@task does not support methods')
+
+def __call__(self, *args, **kwargs):
+# If args/kwargs are set, then operator has been called. Raise 
exception
+if self._called:

Review comment:
   > We could capture task_id kwarg and generate a new operator, but then 
what is `update_user` the first operator, the latest one? What does 
`update_user` represent?
   
   For me `update_user` is a function and as a function it can be called many 
times with different input thus yielding different results. I have never meet 
"function as a singleton" pattern. If we don't want to generate `task_id` for 
users then we may consider raising an exception on second invocation when no 
custom `task_id` is passed. 
   
   My point is: this is a function, I expect to be able to call it as many time 
as I wish. I expect Airflow to treat each call of this function (in proper 
context) as creating a new task.
   
   > You can either do (1) `update_user(i) for i in range(20)` or (2) 
`update_user >> other_operation`, but not both. I prefer to support 2nd option 
as it adapts more to what Airflow already does with operators.
   
   Why should I not be able to do this? This is something that I saw many 
times. 
   ```python
   first_task = BashOperator()
   last_task = BashOperator()
   
   for user_id in users_list:
  first_task >> update_user(user_id) >> last_task
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-05-28 Thread GitBox


turbaszek commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r431674712



##
File path: docs/concepts.rst
##
@@ -116,6 +116,47 @@ DAGs can be used as context managers to automatically 
assign new operators to th
 
 op.dag is dag # True
 
+.. _concepts:functional_dags:
+
+Functional DAGs
+---
+*Added in Airflow 1.10.11*
+
+DAGs can be defined using functional abstractions. Outputs and inputs are sent 
between tasks using
+:ref:`XComs ` values. In addition, you can wrap functions as 
tasks using the
+:ref:`task decorator `. Dependencies are 
automatically inferred from
+the message dependencies.
+
+Example DAG with functional abstraction
+
+.. code:: python
+
+  with DAG(
+  'send_server_ip', default_args=default_args, schedule_interval=None
+  ) as dag:
+
+# Using default connection as it's set to httpbin.org by default
+get_ip = SimpleHttpOperator(
+task_id='get_ip', endpoint='get', method='GET', xcom_push=True
+)
+
+@dag.task(multiple_outputs=True)
+def prepare_email(raw_json: str) -> str:
+  external_ip = json.loads(raw_json)['origin']
+  return {
+'subject':f'Server connected from {external_ip}',
+'body': f'Seems like today your server executing Airflow is connected 
from the external IP {external_ip}'

Review comment:
   f-strings are not a blocker and we are using them in many places 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-05-27 Thread GitBox


turbaszek commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r431578845



##
File path: airflow/operators/python.py
##
@@ -145,6 +147,141 @@ def execute_callable(self):
 return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
+class _PythonFunctionalOperator(BaseOperator):
+"""
+Wraps a Python callable and captures args/kwargs when called for execution.
+
+:param python_callable: A reference to an object that is callable
+:type python_callable: python callable
+:param multiple_outputs: if set, function return value will be
+unrolled to multiple XCom values. List/Tuples will unroll to xcom 
values
+with index as key. Dict will unroll to xcom values with keys as keys.
+Defaults to False.
+:type multiple_outputs: bool
+"""
+
+template_fields = ('_op_args', '_op_kwargs')
+ui_color = '#ffefeb'
+
+# since we won't mutate the arguments, we should just do the shallow copy
+# there are some cases we can't deepcopy the objects(e.g protobuf).
+shallow_copy_attrs = ('python_callable',)
+
+@apply_defaults
+def __init__(
+self,
+python_callable: Callable,
+multiple_outputs: bool = False,
+*args,
+**kwargs
+) -> None:
+# Check if we need to generate a new task_id
+task_id = kwargs.get('task_id', None)
+dag = kwargs.get('dag', None) or DagContext.get_current_dag()
+if task_id and dag and task_id in dag.task_ids:
+prefix = task_id.rsplit("__", 1)[0]
+task_id = sorted(
+filter(lambda x: x.startswith(prefix), dag.task_ids),
+reverse=True
+)[0]
+num = int(task_id[-1] if '__' in task_id else '0') + 1
+kwargs['task_id'] = f'{prefix}__{num}'
+
+if not kwargs.get('do_xcom_push', True) and not multiple_outputs:
+raise AirflowException('@task needs to have either 
do_xcom_push=True or '
+   'multiple_outputs=True.')
+if not callable(python_callable):
+raise AirflowException('`python_callable` param must be callable')
+self._fail_if_method(python_callable)
+super().__init__(*args, **kwargs)
+self.python_callable = python_callable
+self.multiple_outputs = multiple_outputs
+self._kwargs = kwargs
+self._op_args: List[Any] = []
+self._called = False
+self._op_kwargs: Dict[str, Any] = {}
+
+@staticmethod
+def _fail_if_method(python_callable):
+if 'self' in signature(python_callable).parameters.keys():
+raise AirflowException('@task does not support methods')
+
+def __call__(self, *args, **kwargs):
+# If args/kwargs are set, then operator has been called. Raise 
exception
+if self._called:

Review comment:
   > We could capture task_id kwarg and generate a new operator, but then 
what is `update_user` the first operator, the latest one? What does 
`update_user` represent?
   
   For me `update_user` is a function and as a function in can be called many 
times with different input thus yielding different results. I have never meet 
"function as a singleton" pattern. If we don't want to generate `task_id` for 
users then we may consider raising an exception on second invocation when no 
custom `task_id` is passed. 
   
   My point is: this is a function, I expect to be able to call it as many time 
as I wish. I expect Airflow to treat each call of this function (in proper 
context) to be as creating a new task.
   
   > You can either do (1) `update_user(i) for i in range(20)` or (2) 
`update_user >> other_operation`, but not both. I prefer to support 2nd option 
as it adapts more to what Airflow already does with operators.
   
   Why should I not be able to do this? This is something that I saw many 
times. 
   ```python
   first_task = BashOperator()
   last_task = BashOperator()
   
   for user_id in users_list:
  first_task >> update_user(user_id) >> last_task
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-05-27 Thread GitBox


turbaszek commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r431318258



##
File path: docs/concepts.rst
##
@@ -116,6 +116,47 @@ DAGs can be used as context managers to automatically 
assign new operators to th
 
 op.dag is dag # True
 
+.. _concepts:functional_dags:
+
+Functional DAGs
+---
+*Added in Airflow 1.10.11*
+
+DAGs can be defined using functional abstractions. Outputs and inputs are sent 
between tasks using
+:ref:`XComs ` values. In addition, you can wrap functions as 
tasks using the
+:ref:`task decorator `. Dependencies are 
automatically inferred from
+the message dependencies.
+
+Example DAG with functional abstraction
+
+.. code:: python
+
+  with DAG(
+  'send_server_ip', default_args=default_args, schedule_interval=None
+  ) as dag:
+
+# Using default connection as it's set to httpbin.org by default
+get_ip = SimpleHttpOperator(
+task_id='get_ip', endpoint='get', method='GET', xcom_push=True
+)
+
+@dag.task(multiple_outputs=True)
+def prepare_email(raw_json: str) -> str:
+  external_ip = json.loads(raw_json)['origin']
+  return {
+'subject':f'Server connected from {external_ip}',
+'body': f'Seems like today your server executing Airflow is connected 
from the external IP {external_ip}'

Review comment:
   The question is do we want to backport it?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-05-24 Thread GitBox


turbaszek commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r429614589



##
File path: airflow/operators/python.py
##
@@ -145,6 +147,141 @@ def execute_callable(self):
 return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
+class _PythonFunctionalOperator(BaseOperator):
+"""
+Wraps a Python callable and captures args/kwargs when called for execution.
+
+:param python_callable: A reference to an object that is callable
+:type python_callable: python callable
+:param multiple_outputs: if set, function return value will be
+unrolled to multiple XCom values. List/Tuples will unroll to xcom 
values
+with index as key. Dict will unroll to xcom values with keys as keys.
+Defaults to False.
+:type multiple_outputs: bool
+"""
+
+template_fields = ('_op_args', '_op_kwargs')
+ui_color = '#ffefeb'
+
+# since we won't mutate the arguments, we should just do the shallow copy
+# there are some cases we can't deepcopy the objects(e.g protobuf).
+shallow_copy_attrs = ('python_callable',)
+
+@apply_defaults
+def __init__(
+self,
+python_callable: Callable,
+multiple_outputs: bool = False,
+*args,
+**kwargs
+) -> None:
+# Check if we need to generate a new task_id
+task_id = kwargs.get('task_id', None)
+dag = kwargs.get('dag', None) or DagContext.get_current_dag()
+if task_id and dag and task_id in dag.task_ids:
+prefix = task_id.rsplit("__", 1)[0]
+task_id = sorted(
+filter(lambda x: x.startswith(prefix), dag.task_ids),
+reverse=True
+)[0]
+num = int(task_id[-1] if '__' in task_id else '0') + 1
+kwargs['task_id'] = f'{prefix}__{num}'
+
+if not kwargs.get('do_xcom_push', True) and not multiple_outputs:
+raise AirflowException('@task needs to have either 
do_xcom_push=True or '
+   'multiple_outputs=True.')
+if not callable(python_callable):
+raise AirflowException('`python_callable` param must be callable')
+self._fail_if_method(python_callable)
+super().__init__(*args, **kwargs)
+self.python_callable = python_callable
+self.multiple_outputs = multiple_outputs
+self._kwargs = kwargs
+self._op_args: List[Any] = []
+self._called = False
+self._op_kwargs: Dict[str, Any] = {}
+
+@staticmethod
+def _fail_if_method(python_callable):
+if 'self' in signature(python_callable).parameters.keys():
+raise AirflowException('@task does not support methods')
+
+def __call__(self, *args, **kwargs):
+# If args/kwargs are set, then operator has been called. Raise 
exception
+if self._called:
+raise AirflowException('@task decorated functions can only be 
called once. If you need to reuse '
+   'it several times in a DAG, use the `copy` 
method.')
+
+# If we have no DAG, reinitialize class to capture DAGContext and DAG 
default args.
+if not self.has_dag():
+self.__init__(python_callable=self.python_callable,
+  multiple_outputs=self.multiple_outputs,
+  **self._kwargs)
+
+# Capture args/kwargs
+self._op_args = args
+self._op_kwargs = kwargs
+self._called = True
+return XComArg(self)
+
+def copy(self, task_id: Optional[str] = None, **kwargs):
+"""
+Create a copy of the task, allow to overwrite ctor kwargs if needed.
+
+If alias is created a new DAGContext, apply defaults and set new DAG 
as the operator DAG.
+
+:param task_id: Task id for the new operator
+:type task_id: Optional[str]
+"""
+if task_id:
+self._kwargs['task_id'] = task_id
+return _PythonFunctionalOperator(
+python_callable=self.python_callable,
+multiple_outputs=self.multiple_outputs,
+**{**kwargs, **self._kwargs}
+)
+
+def execute(self, context: Dict):
+return_value = self.python_callable(*self._op_args, **self._op_kwargs)
+self.log.info("Done. Returned value was: %s", return_value)
+if not self.multiple_outputs:
+return return_value
+if isinstance(return_value, dict):
+for key, value in return_value.items():
+self.xcom_push(context, str(key), value)
+elif isinstance(return_value, (list, tuple)):
+for key, value in enumerate(return_value):
+self.xcom_push(context, str(key), value)
+return return_value
+
+
+def task(python_callable: Optional[Callable] = None, **kwargs):
+"""
+Python operator decorator. Wraps a function into an Airflow operator.
+Accepts 

[GitHub] [airflow] turbaszek commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-05-23 Thread GitBox


turbaszek commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r429556544



##
File path: airflow/operators/python.py
##
@@ -145,6 +147,141 @@ def execute_callable(self):
 return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
+class _PythonFunctionalOperator(BaseOperator):
+"""
+Wraps a Python callable and captures args/kwargs when called for execution.
+
+:param python_callable: A reference to an object that is callable
+:type python_callable: python callable
+:param multiple_outputs: if set, function return value will be
+unrolled to multiple XCom values. List/Tuples will unroll to xcom 
values
+with index as key. Dict will unroll to xcom values with keys as keys.
+Defaults to False.
+:type multiple_outputs: bool
+"""
+
+template_fields = ('_op_args', '_op_kwargs')
+ui_color = '#ffefeb'
+
+# since we won't mutate the arguments, we should just do the shallow copy
+# there are some cases we can't deepcopy the objects(e.g protobuf).
+shallow_copy_attrs = ('python_callable',)
+
+@apply_defaults
+def __init__(
+self,
+python_callable: Callable,
+multiple_outputs: bool = False,
+*args,
+**kwargs
+) -> None:
+# Check if we need to generate a new task_id
+task_id = kwargs.get('task_id', None)
+dag = kwargs.get('dag', None) or DagContext.get_current_dag()
+if task_id and dag and task_id in dag.task_ids:
+prefix = task_id.rsplit("__", 1)[0]
+task_id = sorted(
+filter(lambda x: x.startswith(prefix), dag.task_ids),
+reverse=True
+)[0]
+num = int(task_id[-1] if '__' in task_id else '0') + 1
+kwargs['task_id'] = f'{prefix}__{num}'
+
+if not kwargs.get('do_xcom_push', True) and not multiple_outputs:
+raise AirflowException('@task needs to have either 
do_xcom_push=True or '
+   'multiple_outputs=True.')
+if not callable(python_callable):
+raise AirflowException('`python_callable` param must be callable')
+self._fail_if_method(python_callable)
+super().__init__(*args, **kwargs)
+self.python_callable = python_callable
+self.multiple_outputs = multiple_outputs
+self._kwargs = kwargs
+self._op_args: List[Any] = []
+self._called = False
+self._op_kwargs: Dict[str, Any] = {}
+
+@staticmethod
+def _fail_if_method(python_callable):
+if 'self' in signature(python_callable).parameters.keys():
+raise AirflowException('@task does not support methods')
+
+def __call__(self, *args, **kwargs):
+# If args/kwargs are set, then operator has been called. Raise 
exception
+if self._called:

Review comment:
   Hm, I have mixed feelings:
   - `copy` is not self-explanatory in this case imho
   - in such case, shouldn't we generate auto id? Or at least can we try to do 
`update_user(user_id, task_id=user_id)`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-05-22 Thread GitBox


turbaszek commented on a change in pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#discussion_r429151128



##
File path: airflow/operators/python.py
##
@@ -145,6 +147,141 @@ def execute_callable(self):
 return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
+class _PythonFunctionalOperator(BaseOperator):
+"""
+Wraps a Python callable and captures args/kwargs when called for execution.
+
+:param python_callable: A reference to an object that is callable
+:type python_callable: python callable
+:param multiple_outputs: if set, function return value will be
+unrolled to multiple XCom values. List/Tuples will unroll to xcom 
values
+with index as key. Dict will unroll to xcom values with keys as keys.
+Defaults to False.
+:type multiple_outputs: bool
+"""
+
+template_fields = ('_op_args', '_op_kwargs')
+ui_color = '#ffefeb'
+
+# since we won't mutate the arguments, we should just do the shallow copy
+# there are some cases we can't deepcopy the objects(e.g protobuf).
+shallow_copy_attrs = ('python_callable',)
+
+@apply_defaults
+def __init__(
+self,
+python_callable: Callable,
+multiple_outputs: bool = False,
+*args,
+**kwargs
+) -> None:
+# Check if we need to generate a new task_id
+task_id = kwargs.get('task_id', None)
+dag = kwargs.get('dag', None) or DagContext.get_current_dag()
+if task_id and dag and task_id in dag.task_ids:
+prefix = task_id.rsplit("__", 1)[0]
+task_id = sorted(
+filter(lambda x: x.startswith(prefix), dag.task_ids),
+reverse=True
+)[0]
+num = int(task_id[-1] if '__' in task_id else '0') + 1
+kwargs['task_id'] = f'{prefix}__{num}'
+
+if not kwargs.get('do_xcom_push', True) and not multiple_outputs:
+raise AirflowException('@task needs to have either 
do_xcom_push=True or '
+   'multiple_outputs=True.')
+if not callable(python_callable):
+raise AirflowException('`python_callable` param must be callable')
+self._fail_if_method(python_callable)
+super().__init__(*args, **kwargs)
+self.python_callable = python_callable
+self.multiple_outputs = multiple_outputs
+self._kwargs = kwargs
+self._op_args: List[Any] = []
+self._called = False
+self._op_kwargs: Dict[str, Any] = {}
+
+@staticmethod
+def _fail_if_method(python_callable):
+if 'self' in signature(python_callable).parameters.keys():
+raise AirflowException('@task does not support methods')
+
+def __call__(self, *args, **kwargs):
+# If args/kwargs are set, then operator has been called. Raise 
exception
+if self._called:

Review comment:
   Do I correctly understand that this will not work?
   ``` python
   @task 
   def update_user(user_id: str):
   ...
   
   with DAG(...):
   # Fetch list of users 
   ...
   # Execute task for each user
   for user_id in users_list:
   update_user(user_id)
   ```

##
File path: airflow/operators/python.py
##
@@ -145,6 +148,142 @@ def execute_callable(self):
 return self.python_callable(*self.op_args, **self.op_kwargs)
 
 
+class _PythonFunctionalOperator(BaseOperator):
+"""
+Wraps a Python callable and captures args/kwargs when called for execution.
+
+:param python_callable: A reference to an object that is callable
+:type python_callable: python callable
+:param multiple_outputs: if set, function return value will be
+unrolled to multiple XCom values. List/Tuples will unroll to xcom 
values
+with index as key. Dict will unroll to xcom values with keys as keys.
+Defaults to False.
+:type multiple_outputs: bool
+"""
+
+template_fields = ('_op_args', '_op_kwargs')
+ui_color = '#ffefeb'
+
+# since we won't mutate the arguments, we should just do the shallow copy
+# there are some cases we can't deepcopy the objects(e.g protobuf).
+shallow_copy_attrs = ('python_callable',)
+
+@apply_defaults
+def __init__(
+self,
+python_callable: Callable,
+multiple_outputs: bool = False,
+*args,
+**kwargs
+) -> None:
+dag = kwargs.get('dag', None) or DagContext.get_current_dag()
+kwargs['task_id'] = self._get_unique_task_id(kwargs['task_id'], dag)
+self._validate_python_callable(python_callable)
+super().__init__(*args, **kwargs)
+self.python_callable = python_callable
+self.multiple_outputs = multiple_outputs
+self._kwargs = kwargs
+self._op_args: List[Any] = []
+self._called = False
+self._op_kwargs: Dict[str, Any] = {}
+
+@staticmethod
+def _get_unique_task_id(task_id,