ferruzzi commented on code in PR #67449:
URL: https://github.com/apache/airflow/pull/67449#discussion_r3300399729


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -66,8 +67,8 @@
 
     from airflow.cli.cli_config import GroupCommand
     from airflow.executors import workloads
+    from airflow.executors.workloads.types import WorkloadKey

Review Comment:
   Pretty sure this needs a back-compat wrapper.  Check in the ECS executor 
they did:
   
   ```
        if AIRFLOW_V_3_3_PLUS:
            from airflow.executors.workloads.types import WorkloadKey as 
_EcsWorkloadKey
            WorkloadKey: TypeAlias = _EcsWorkloadKey
        else:
            WorkloadKey: TypeAlias = TaskInstanceKey
   ```



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -232,31 +234,30 @@ def execute_async(
         # try and remove it from the QUEUED state while we process it
         self.last_handled[key] = time.time()
 
-    def queue_workload(self, workload: workloads.All, session: Session | None) 
-> None:
-        from airflow.executors import workloads
-
-        if not isinstance(workload, workloads.ExecuteTask):
-            raise RuntimeError(f"{type(self)} cannot handle workloads of type 
{type(workload)}")
-        ti = workload.ti
-        self.queued_tasks[ti.key] = workload

Review Comment:
   ECS and Celery both had to keep this for back-compat and added a note `# 
TODO: Remove this once the minimum supported version is 3.3+, and defer to 
BaseExecutor.queue_workload.`  I'm reasonably certain that should be applied 
here as well unless you know of a reason to drop it that I've missed.



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -78,6 +79,7 @@ class KubernetesExecutor(BaseExecutor):
 
     RUNNING_POD_LOG_LINES = 100
     supports_ad_hoc_ti_run: bool = True
+    supports_callbacks: bool = AIRFLOW_V_3_3_PLUS

Review Comment:
   This is clever but I don't know if it actually works.  In the other 
executors we used 
   
   ```
        if AIRFLOW_V_3_3_PLUS:
            supports_callbacks: bool = True
   ```
   
   Which leaves `supports_callbacks` undefined rather than False.



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py:
##########
@@ -30,11 +30,12 @@
 from sqlalchemy import select
 from urllib3.exceptions import HTTPError
 
+from airflow.models.callback import CallbackKey

Review Comment:
   Pretty sure this needs an `if AIRFLOW_V_3_3_PLUS:` wrapper, right?



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py:
##########
@@ -69,10 +69,10 @@ class KubernetesWatch(NamedTuple):
 
 
 class KubernetesJob(NamedTuple):
-    """Job definition for Kubernetes execution."""
+    """Job definition for Kubernetes execution (task or callback)."""
 
-    key: TaskInstanceKey
-    command: Sequence[str]
+    key: WorkloadKey
+    command: Sequence[Any]

Review Comment:
   I liked the way the ECS Executor handled this; they did
   
   ```
     if AIRFLOW_V_3_3_PLUS:
         CommandType: TypeAlias = Sequence[str] | Sequence[ExecuteTask] | 
Sequence[ExecuteCallback]
     else:
         CommandType: TypeAlias = Sequence[str]
   ```
   
   then defined command as a `CommandType` instead of `Sequence[All]`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to