ferruzzi commented on code in PR #63491:
URL: https://github.com/apache/airflow/pull/63491#discussion_r3243182198
##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -216,8 +218,7 @@ def __init__(self, parallelism: int = PARALLELISM,
team_name: str | None = None)
self.parallelism: int = parallelism
self.team_name: str | None = team_name
- self.queued_tasks: dict[TaskInstanceKey, workloads.ExecuteTask] = {}
- self.queued_callbacks: dict[str, workloads.ExecuteCallback] = {}
+ self.executor_queues: dict[str, dict[WorkloadKey, QueueableWorkload]]
= defaultdict(dict)
Review Comment:
FWIW, the gate on line 80 of CeleryKubernetesExecutor throws a RuntimeError
if Airflow version is >= 3.0, so I don't think "simplifying
CeleryKubernetesExecutor" needs to factor into this decision.
That said, a flat dict isn't a bad idea in principle and likely would have
been a cleaner choice in the first place. My main concern is mostly practical:
Lambda (#63035), Celery (#63888), and Batch (#62984) have already merged using
the `executor_queues[WorkloadType.X][key]` pattern, and ECS (#63657), K8s
(#63454), and Edge (#63498) are all in progress implementing the same pattern.
Flattening now would mean reworking all six executor implementations in
addition to the changes that it would require in this PR. I'm not sure we
really gain anything for that work?
How would you feel about adding a TODO to flatten it when the compat
properties are removed (in 4.0??) At that point the nested structure loses its
main justification anyway. Does that seem reasonable, or is that just punting
the same work to Future Us?
##########
airflow-core/src/airflow/executors/workloads/base.py:
##########
@@ -32,6 +33,28 @@
from airflow.executors.workloads.types import WorkloadState
+class WorkloadType(str, Enum):
+ """Central registry of executor workload types."""
+
+ EXECUTE_TASK = "ExecuteTask"
+ EXECUTE_CALLBACK = "ExecuteCallback"
+
+
+# Central executor priority registry: tuple is ordered from highest priority
to lowest.
+#
+# Adding a new workload type is a three-place change that must stay in sync:
+# 1. ``WorkloadType`` — declare the enum member.
+# 2. ``_workload_type_priority_order`` — insert it at the right priority
slot.
+# 3. ``airflow.executors.workloads.QueueableWorkload`` — extend the
discriminated union
+# so ``queue_workload`` can accept the new schema.
+_workload_type_priority_order = (
+ WorkloadType.EXECUTE_CALLBACK,
+ WorkloadType.EXECUTE_TASK,
+)
+
+WORKLOAD_TYPE_PRIORITY: dict[str, int] = {name: idx for idx, name in
enumerate(_workload_type_priority_order)}
Review Comment:
Nice catch
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]