wasabigeek opened a new issue, #44779:
URL: https://github.com/apache/airflow/issues/44779
### Description
I'd like for `@task.kubernetes` to derive the task_id / pod_name based on
the decorated function's name. To achieve this now, I would need to pass it
into the decorator args:
```python
@task.kubernetes(task_id="some_func", name=f"{some_prefix}_some_func")
def some_func():
pass
```
We managed to do something like the above but it was more involved than
expected:
```python
# inheriting from a private class, not ideal
class DefaultKubernetesDecoratedOperator(_KubernetesDecoratedOperator):
custom_operator_name = "@default_kubernetes_task" # the operator removes
the decorator so this is required
def __init__(self, **kwargs: Any) -> None:
if "task_id" not in kwargs and "python_callable" in kwargs:
task_id = kwargs["python_callable"].__name__
kwargs["task_id"] = task_id
if "task_id" in kwargs and "name" not in kwargs:
name = f"{get_prefix()}-{kwargs['task_id']}"
kwargs["name"] = name
super().__init__(**kwargs)
def default_kubernetes_task(
python_callable: Callable[[], None] | None = None,
multiple_outputs: bool | None = None,
**kwargs: Any,
) -> TaskDecorator:
return task_decorator_factory(
python_callable=python_callable,
multiple_outputs=multiple_outputs,
decorated_operator_class=DefaultKubernetesDecoratedOperator,
**kwargs,
)
```
Initially I tried wrapping `task.kubernetes` itself, but the source code
scrubbing relies on a hardcoded operator, so it doesn't scrub the decorator and
the pod fails with `NameError: name 'default_kubernetes_task' is not defined`:
```python
def default_kubernetes_task(
**task_kwargs: Any,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
def decorator(func: Callable[..., Any]) -> Any:
if "task_id" not in task_kwargs:
task_kwargs["task_id"] = func.__name__
if "name" not in task_kwargs:
name = f"{get_[prefix()}-{task_kwargs['task_id']}"
task_kwargs["name"] = name
return task.kubernetes(**task_kwargs)(func)
return decorator
```
It'd be nice if there were a more official way to achieve the above (or
something similar). Perhaps `@task.kubernetes` could accept an override for the
`decorated_operator_class`?
### Use case/motivation
I'd like for `@task.kubernetes` to derive the task_id / pod_name based on
the decorated function's name, but more generally it could be useful to allow
`task.kubernetes` to be overridden?
### Related issues
_No response_
### Are you willing to submit a PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]