ferruzzi commented on code in PR #67449:
URL: https://github.com/apache/airflow/pull/67449#discussion_r3300399729
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -66,8 +67,8 @@
from airflow.cli.cli_config import GroupCommand
from airflow.executors import workloads
+ from airflow.executors.workloads.types import WorkloadKey
Review Comment:
Pretty sure this needs a back-compat wrapper. Check in the ECS executor
they did:
```
if AIRFLOW_V_3_3_PLUS:
from airflow.executors.workloads.types import WorkloadKey as
_EcsWorkloadKey
WorkloadKey: TypeAlias = _EcsWorkloadKey
else:
WorkloadKey: TypeAlias = TaskInstanceKey
```
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -232,31 +234,30 @@ def execute_async(
# try and remove it from the QUEUED state while we process it
self.last_handled[key] = time.time()
- def queue_workload(self, workload: workloads.All, session: Session | None)
-> None:
- from airflow.executors import workloads
-
- if not isinstance(workload, workloads.ExecuteTask):
- raise RuntimeError(f"{type(self)} cannot handle workloads of type
{type(workload)}")
- ti = workload.ti
- self.queued_tasks[ti.key] = workload
Review Comment:
ECS and Celery both had to keep this for back-compat and added a note `#
TODO: Remove this once the minimum supported version is 3.3+, and defer to
BaseExecutor.queue_workload.` I'm reasonably certain that should be applied
here as well unless you know of a reason to drop it that I've missed.
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -78,6 +79,7 @@ class KubernetesExecutor(BaseExecutor):
RUNNING_POD_LOG_LINES = 100
supports_ad_hoc_ti_run: bool = True
+ supports_callbacks: bool = AIRFLOW_V_3_3_PLUS
Review Comment:
This is clever but I don't know if it actually works. In the other
executors we used
```
if AIRFLOW_V_3_3_PLUS:
supports_callbacks: bool = True
```
Which leaves `supports_callbacks` undefined rather than False.
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py:
##########
@@ -30,11 +30,12 @@
from sqlalchemy import select
from urllib3.exceptions import HTTPError
+from airflow.models.callback import CallbackKey
Review Comment:
Pretty sure this needs an `if AIRFLOW_V_3_3_PLUS:` wrapper, right?
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py:
##########
@@ -69,10 +69,10 @@ class KubernetesWatch(NamedTuple):
class KubernetesJob(NamedTuple):
- """Job definition for Kubernetes execution."""
+ """Job definition for Kubernetes execution (task or callback)."""
- key: TaskInstanceKey
- command: Sequence[str]
+ key: WorkloadKey
+ command: Sequence[Any]
Review Comment:
I liked the way the ECS Executor handled this; they did
```
if AIRFLOW_V_3_3_PLUS:
CommandType: TypeAlias = Sequence[str] | Sequence[ExecuteTask] |
Sequence[ExecuteCallback]
else:
CommandType: TypeAlias = Sequence[str]
```
then defined command as a `CommandType` instead of `Sequence[All]`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]