ashb commented on a change in pull request #19965: URL: https://github.com/apache/airflow/pull/19965#discussion_r771389087
########## File path: airflow/decorators/base.py ########## @@ -176,11 +178,92 @@ def _hook_apply_defaults(self, *args, **kwargs): T = TypeVar("T", bound=Callable) +OperatorSubclass = TypeVar("OperatorSubclass", bound="BaseOperator") + + +@attr.define +class OperatorWrapper(Generic[T, OperatorSubclass]): + """ + Helper class for providing dynamic task mapping to decorated functions. + + ``task_decorator_factory`` returns an instance of this, instead of just a plain wrapped function. + + :meta private: + """ + + function: T = attr.ib(validator=attr.validators.is_callable()) + operator_class: Type[OperatorSubclass] + multiple_outputs: bool = attr.ib() + kwargs: Dict[str, Any] = attr.ib(factory=dict) + + decorator_name: str = attr.ib(repr=False, default="task") + function_arg_names: Set[str] = attr.ib(repr=False) + + @function_arg_names.default + def _get_arg_names(self): + return set(signature(self.function).parameters) + + @function.validator + def _validate_function(self, _, f): + if 'self' in self.function_arg_names: + raise TypeError(f'@{self.decorator_name} does not support methods') + + @multiple_outputs.default + def _infer_multiple_outputs(self): + sig = signature(self.function).return_annotation + ttype = getattr(sig, "__origin__", None) + + return sig is not inspect.Signature.empty and ttype in (dict, Dict) + + def __attrs_post_init__(self): + self.kwargs.setdefault('task_id', self.function.__name__) + + def __call__(self, *args, **kwargs) -> XComArg: + op = self.operator_class( + python_callable=self.function, + op_args=args, + op_kwargs=kwargs, + multiple_outputs=self.multiple_outputs, + **self.kwargs, + ) + if self.function.__doc__: + op.doc_md = self.function.__doc__ + return XComArg(op) + + def map( + self, *args, dag: Optional["DAG"] = None, task_group: Optional["TaskGroup"] = None, **kwargs Review comment: Yes, probably. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org