wasabigeek opened a new issue, #44779:
URL: https://github.com/apache/airflow/issues/44779

   ### Description
   
   I'd like for `@task.kubernetes` to derive the task_id / pod_name based on 
the decorated function's name. To achieve this now, I would need to pass it 
into the decorator args:
   ```python
   @task.kubernetes(task_id="some_func", name=f"{some_prefix}_some_func")
   def some_func():
       pass
   ```
   
   We managed to do something like the above but it was more involved than 
expected:
   ```python
   # inheriting from a private class, not ideal
   class DefaultKubernetesDecoratedOperator(_KubernetesDecoratedOperator):
       custom_operator_name = "@default_kubernetes_task" # the operator removes 
the decorator so this is required
   
       def __init__(self, **kwargs: Any) -> None:
           if "task_id" not in kwargs and "python_callable" in kwargs:
               task_id = kwargs["python_callable"].__name__
               kwargs["task_id"] = task_id
   
           if "task_id" in kwargs and "name" not in kwargs:
               name = f"{get_prefix()}-{kwargs['task_id']}"
               kwargs["name"] = name
   
           super().__init__(**kwargs)
   
   
   def default_kubernetes_task(
       python_callable: Callable[[], None] | None = None,
       multiple_outputs: bool | None = None,
       **kwargs: Any,
   ) -> TaskDecorator:
       return task_decorator_factory(
           python_callable=python_callable,
           multiple_outputs=multiple_outputs,
           decorated_operator_class=DefaultKubernetesDecoratedOperator,
           **kwargs,
       )
   ```
   
   Initially I tried wrapping `task.kubernetes` itself, but the source code 
scrubbing relies on a hardcoded operator, so it doesn't scrub the decorator and 
the pod fails with `NameError: name 'default_kubernetes_task' is not defined`:
   ```python
   def default_kubernetes_task(
       **task_kwargs: Any,
   ) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
       def decorator(func: Callable[..., Any]) -> Any:
           if "task_id" not in task_kwargs:
               task_kwargs["task_id"] = func.__name__
   
           if "name" not in task_kwargs:
               name = f"{get_[prefix()}-{task_kwargs['task_id']}"
               task_kwargs["name"] = name
   
           return task.kubernetes(**task_kwargs)(func)
   
       return decorator
   ```
   
   It'd be nice if there were a more official way to achieve the above (or 
something similar). Perhaps `@task.kubernetes` could accept an override for the 
`decorated_operator_class`?
   
   ### Use case/motivation
   
   I'd like for `@task.kubernetes` to derive the task_id / pod_name based on 
the decorated function's name, but more generally it could be useful to allow 
`task.kubernetes` to be overridden?
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to