This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fd4fd67c0f2 Improve performance of task dequeuing (#61376)
fd4fd67c0f2 is described below
commit fd4fd67c0f24471707a95a589f14414b55083dba
Author: Steve Ahn <[email protected]>
AuthorDate: Sun Feb 15 01:44:45 2026 -0800
Improve performance of task dequeuing (#61376)
* pop(0) to popleft()
* revert with reverse list
---
airflow-core/src/airflow/executors/base_executor.py | 4 ++--
airflow-core/src/airflow/serialization/definitions/taskgroup.py | 5 +++--
task-sdk/src/airflow/sdk/definitions/taskgroup.py | 5 +++--
3 files changed, 8 insertions(+), 6 deletions(-)
diff --git a/airflow-core/src/airflow/executors/base_executor.py
b/airflow-core/src/airflow/executors/base_executor.py
index a54e1464c24..927e4c07d64 100644
--- a/airflow-core/src/airflow/executors/base_executor.py
+++ b/airflow-core/src/airflow/executors/base_executor.py
@@ -349,7 +349,7 @@ class BaseExecutor(LoggingMixin):
return sorted(
self.queued_tasks.items(),
key=lambda x: x[1].ti.priority_weight,
- reverse=True,
+ reverse=False,
)
@add_debug_span
@@ -363,7 +363,7 @@ class BaseExecutor(LoggingMixin):
workload_list = []
for _ in range(min((open_slots, len(self.queued_tasks)))):
- key, item = sorted_queue.pop(0)
+ key, item = sorted_queue.pop()
# If a task makes it here but is still understood by the executor
# to be running, it generally means that the task has been killed
diff --git a/airflow-core/src/airflow/serialization/definitions/taskgroup.py
b/airflow-core/src/airflow/serialization/definitions/taskgroup.py
index 4a4ee0588cf..d971c303c7c 100644
--- a/airflow-core/src/airflow/serialization/definitions/taskgroup.py
+++ b/airflow-core/src/airflow/serialization/definitions/taskgroup.py
@@ -22,6 +22,7 @@ import copy
import functools
import operator
import weakref
+from collections import deque
from typing import TYPE_CHECKING
import attrs
@@ -188,9 +189,9 @@ class SerializedTaskGroup(DAGNode):
from airflow.serialization.definitions.baseoperator import
SerializedBaseOperator
from airflow.serialization.definitions.mappedoperator import
SerializedMappedOperator
- groups_to_visit = [self]
+ groups_to_visit = deque([self])
while groups_to_visit:
- for child in groups_to_visit.pop(0).children.values():
+ for child in groups_to_visit.popleft().children.values():
if isinstance(child, (SerializedMappedOperator,
SerializedBaseOperator)):
yield child
elif isinstance(child, SerializedTaskGroup):
diff --git a/task-sdk/src/airflow/sdk/definitions/taskgroup.py
b/task-sdk/src/airflow/sdk/definitions/taskgroup.py
index c7c5da21140..c47b1f360ae 100644
--- a/task-sdk/src/airflow/sdk/definitions/taskgroup.py
+++ b/task-sdk/src/airflow/sdk/definitions/taskgroup.py
@@ -22,6 +22,7 @@ from __future__ import annotations
import copy
import re
import weakref
+from collections import deque
from collections.abc import Generator, Iterator, Sequence
from typing import TYPE_CHECKING, Any
@@ -586,10 +587,10 @@ class TaskGroup(DAGNode):
"""Return an iterator of the child tasks."""
from airflow.sdk.definitions._internal.abstractoperator import
AbstractOperator
- groups_to_visit = [self]
+ groups_to_visit = deque([self])
while groups_to_visit:
- visiting = groups_to_visit.pop(0)
+ visiting = groups_to_visit.popleft()
for child in visiting.children.values():
if isinstance(child, AbstractOperator):