o-nikolas commented on code in PR #63491:
URL: https://github.com/apache/airflow/pull/63491#discussion_r2957183366


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py:
##########
@@ -98,7 +98,11 @@ def _task_event_logs(self, value):
     @property
     def queued_tasks(self) -> dict[TaskInstanceKey, Any]:
         """Return queued tasks from local and kubernetes executor."""
-        return self.local_executor.queued_tasks | 
self.kubernetes_executor.queued_tasks
+        queued_tasks = self.local_executor.queued_tasks.copy()

Review Comment:
   I wish we'd stop updating these executors. They've been deprecated for a 
long time now.



##########
airflow-core/src/airflow/executors/workloads/base.py:
##########
@@ -20,12 +20,30 @@
 
 import os
 from abc import ABC
+from enum import Enum
 from typing import TYPE_CHECKING
 
 from pydantic import BaseModel, ConfigDict, Field
 
 if TYPE_CHECKING:
     from airflow.api_fastapi.auth.tokens import JWTGenerator
+    from airflow.executors.workloads.types import WorkloadKey
+
+
+class WorkloadType(str, Enum):

Review Comment:
   Don't we have the new Connection testing type?



##########
airflow-core/src/airflow/executors/workloads/base.py:
##########
@@ -20,12 +20,30 @@
 
 import os
 from abc import ABC
+from enum import Enum
 from typing import TYPE_CHECKING
 
 from pydantic import BaseModel, ConfigDict, Field
 
 if TYPE_CHECKING:
     from airflow.api_fastapi.auth.tokens import JWTGenerator
+    from airflow.executors.workloads.types import WorkloadKey
+
+
+class WorkloadType(str, Enum):
+    """Central registry of executor workload types."""
+
+    EXECUTE_TASK = "ExecuteTask"
+    EXECUTE_CALLBACK = "ExecuteCallback"
+
+
+# Central executor priority registry: Tuple is ordered from highest priority 
to lowest.
+_workload_type_priority_order = (
+    WorkloadType.EXECUTE_CALLBACK,
+    WorkloadType.EXECUTE_TASK,
+)
+
+WORKLOAD_TYPE_TIER: dict[str, int] = {name: idx for idx, name in 
enumerate(_workload_type_priority_order)}

Review Comment:
   > Nit: but I like "priority" or "order" instead of "tier", but just a 
suggestion.
   
   



##########
airflow-core/src/airflow/executors/workloads/base.py:
##########
@@ -20,12 +20,30 @@
 
 import os
 from abc import ABC
+from enum import Enum
 from typing import TYPE_CHECKING
 
 from pydantic import BaseModel, ConfigDict, Field
 
 if TYPE_CHECKING:
     from airflow.api_fastapi.auth.tokens import JWTGenerator
+    from airflow.executors.workloads.types import WorkloadKey
+
+
+class WorkloadType(str, Enum):
+    """Central registry of executor workload types."""
+
+    EXECUTE_TASK = "ExecuteTask"
+    EXECUTE_CALLBACK = "ExecuteCallback"
+
+
+# Central executor priority registry: Tuple is ordered from highest priority 
to lowest.
+_workload_type_priority_order = (
+    WorkloadType.EXECUTE_CALLBACK,
+    WorkloadType.EXECUTE_TASK,
+)
+
+WORKLOAD_TYPE_TIER: dict[str, int] = {name: idx for idx, name in 
enumerate(_workload_type_priority_order)}

Review Comment:
   Nit: but I like "order" instead of "tier"



##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -63,11 +62,6 @@ class AwsLambdaExecutor(BaseExecutor):
 
     supports_multi_team: bool = True
 
-    if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS:
-        # In the v3 path, we store workloads, not commands as strings.
-        # TODO: TaskSDK: move this type change into BaseExecutor
-        queued_tasks: dict[TaskInstanceKey, workloads.All]  # type: 
ignore[assignment]

Review Comment:
   I'm really happy to see this going away!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to