ferruzzi commented on code in PR #63491:
URL: https://github.com/apache/airflow/pull/63491#discussion_r3243182198


##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -216,8 +218,7 @@ def __init__(self, parallelism: int = PARALLELISM, 
team_name: str | None = None)
 
         self.parallelism: int = parallelism
         self.team_name: str | None = team_name
-        self.queued_tasks: dict[TaskInstanceKey, workloads.ExecuteTask] = {}
-        self.queued_callbacks: dict[str, workloads.ExecuteCallback] = {}
+        self.executor_queues: dict[str, dict[WorkloadKey, QueueableWorkload]] 
= defaultdict(dict)

Review Comment:
   FWIW, the gate on line 80 of CeleryKubernetesExecutor throws a RuntimeError 
if Airflow version is  >= 3.0, so I don't think "simplifying 
CeleryKubernetesExecutor" needs to factor into this  decision.
     
   That said, a flat dict isn't a bad idea in principle and likely would have 
been a cleaner choice in the first place.  My main concern is mostly practical: 
Lambda  (#63035), Celery (#63888), and Batch (#62984) have already merged using 
the  `executor_queues[WorkloadType.X][key]` pattern, and ECS (#63657), K8s 
(#63454), and Edge (#63498)  are all in progress implementing the same pattern. 
 Flattening now would mean reworking all six executor implementations in 
addition to the changes that it would require in this PR.  I'm not sure we 
really gain anything for that work?
     
   How would you feel about adding a TODO to flatten it when the compat 
properties are removed (in 4.0??)  At that point the nested structure loses its 
main justification anyway.  Does that seem reasonable, or is that just punting 
the same work to Future Us?



##########
airflow-core/src/airflow/executors/workloads/base.py:
##########
@@ -32,6 +33,28 @@
     from airflow.executors.workloads.types import WorkloadState
 
 
+class WorkloadType(str, Enum):
+    """Central registry of executor workload types."""
+
+    EXECUTE_TASK = "ExecuteTask"
+    EXECUTE_CALLBACK = "ExecuteCallback"
+
+
+# Central executor priority registry: tuple is ordered from highest priority 
to lowest.
+#
+# Adding a new workload type is a three-place change that must stay in sync:
+#   1. ``WorkloadType`` — declare the enum member.
+#   2. ``_workload_type_priority_order`` — insert it at the right priority 
slot.
+#   3. ``airflow.executors.workloads.QueueableWorkload`` — extend the 
discriminated union
+#      so ``queue_workload`` can accept the new schema.
+_workload_type_priority_order = (
+    WorkloadType.EXECUTE_CALLBACK,
+    WorkloadType.EXECUTE_TASK,
+)
+
+WORKLOAD_TYPE_PRIORITY: dict[str, int] = {name: idx for idx, name in 
enumerate(_workload_type_priority_order)}

Review Comment:
   Nice catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to