This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push:
new a20a8128bab [v3-0-test] Apply task group sorting based on webserver
config in grid structure response (#49418) (#50138)
a20a8128bab is described below
commit a20a8128bab4b45a2294a57b3bba78ed4c11648d
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat May 3 07:47:25 2025 +0200
[v3-0-test] Apply task group sorting based on webserver config in grid
structure response (#49418) (#50138)
* Apply topological sort on task group grid
* Fix task_group sorting by utilizing get_task_group_children_getter to
sort based on webserver config
* Adjust test expectation to sort children topologically
(cherry picked from commit d833ecb242b6546d937495c466507adcddc4c9f2)
Co-authored-by: Jason <[email protected]>
---
.../api_fastapi/core_api/services/ui/grid.py | 19 +-----
airflow-core/src/airflow/utils/task_group.py | 16 ++++-
airflow-core/tests/unit/utils/test_task_group.py | 72 +++++++++++-----------
3 files changed, 53 insertions(+), 54 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
index 7865ffa168f..346676e14cd 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
@@ -18,9 +18,6 @@
from __future__ import annotations
import contextlib
-from functools import cache
-from operator import methodcaller
-from typing import Callable
from uuid import UUID
import structlog
@@ -38,7 +35,6 @@ from airflow.api_fastapi.core_api.datamodels.ui.grid import (
from airflow.api_fastapi.core_api.datamodels.ui.structure import (
StructureDataResponse,
)
-from airflow.configuration import conf
from airflow.models.baseoperator import BaseOperator as DBBaseOperator
from airflow.models.dag_version import DagVersion
from airflow.models.taskmap import TaskMap
@@ -49,20 +45,11 @@ from airflow.sdk.definitions.mappedoperator import
MappedOperator
from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils.state import TaskInstanceState
-from airflow.utils.task_group import task_group_to_dict
+from airflow.utils.task_group import get_task_group_children_getter,
task_group_to_dict
log = structlog.get_logger(logger_name=__name__)
-@cache
-def get_task_group_children_getter() -> Callable:
- """Get the Task Group Children Getter for the DAG."""
- sort_order = conf.get("webserver", "grid_view_sorting_order")
- if sort_order == "topological":
- return methodcaller("topological_sort")
- return methodcaller("hierarchical_alphabetical_sort")
-
-
def get_task_group_map(dag: DAG) -> dict[str, dict[str, Any]]:
"""
Get the Task Group Map for the DAG.
@@ -262,7 +249,7 @@ def fill_task_instance_summaries(
def get_structure_from_dag(dag: DAG) -> StructureDataResponse:
"""If we do not have TIs, we just get the structure from the DAG."""
- nodes = [task_group_to_dict(child) for child in
dag.task_group.topological_sort()]
+ nodes = [task_group_to_dict(child) for child in
get_task_group_children_getter()(dag.task_group)]
return StructureDataResponse(nodes=nodes, edges=[])
@@ -299,7 +286,7 @@ def get_combined_structure(task_instances, session):
if serdag:
dags.append(serdag.dag)
for dag in dags:
- nodes = [task_group_to_dict(child) for child in
dag.task_group.topological_sort()]
+ nodes = [task_group_to_dict(child) for child in
get_task_group_children_getter()(dag.task_group)]
_merge_node_dicts(merged_nodes, nodes)
return StructureDataResponse(nodes=merged_nodes, edges=[])
diff --git a/airflow-core/src/airflow/utils/task_group.py
b/airflow-core/src/airflow/utils/task_group.py
index 51270fa473a..034eb6d1bb8 100644
--- a/airflow-core/src/airflow/utils/task_group.py
+++ b/airflow-core/src/airflow/utils/task_group.py
@@ -19,9 +19,12 @@
from __future__ import annotations
-from typing import TYPE_CHECKING
+from functools import cache
+from operator import methodcaller
+from typing import TYPE_CHECKING, Callable
import airflow.sdk.definitions.taskgroup
+from airflow.configuration import conf
if TYPE_CHECKING:
from airflow.typing_compat import TypeAlias
@@ -30,6 +33,15 @@ TaskGroup: TypeAlias =
airflow.sdk.definitions.taskgroup.TaskGroup
MappedTaskGroup: TypeAlias = airflow.sdk.definitions.taskgroup.MappedTaskGroup
+@cache
+def get_task_group_children_getter() -> Callable:
+ """Get the Task Group Children Getter for the DAG."""
+ sort_order = conf.get("webserver", "grid_view_sorting_order")
+ if sort_order == "topological":
+ return methodcaller("topological_sort")
+ return methodcaller("hierarchical_alphabetical_sort")
+
+
def task_group_to_dict(task_item_or_group, parent_group_is_mapped=False):
"""Create a nested dict representation of this TaskGroup and its children
used to construct the Graph."""
from airflow.sdk.bases.operator import BaseOperator
@@ -63,7 +75,7 @@ def task_group_to_dict(task_item_or_group,
parent_group_is_mapped=False):
is_mapped = isinstance(task_group, MappedTaskGroup)
children = [
task_group_to_dict(child,
parent_group_is_mapped=parent_group_is_mapped or is_mapped)
- for child in sorted(task_group.children.values(), key=lambda t:
t.label)
+ for child in get_task_group_children_getter()(task_group)
]
if task_group.upstream_group_ids or task_group.upstream_task_ids:
diff --git a/airflow-core/tests/unit/utils/test_task_group.py
b/airflow-core/tests/unit/utils/test_task_group.py
index 544d81d9661..8d6e6c0b334 100644
--- a/airflow-core/tests/unit/utils/test_task_group.py
+++ b/airflow-core/tests/unit/utils/test_task_group.py
@@ -65,6 +65,16 @@ EXPECTED_JSON_LEGACY = {
"tooltip": "",
},
"children": [
+ {
+ "id": "task1",
+ "value": {
+ "label": "task1",
+ "labelStyle": "fill:#000;",
+ "style": "fill:#e8f7e4;",
+ "rx": 5,
+ "ry": 5,
+ },
+ },
{
"id": "group234",
"value": {
@@ -78,6 +88,16 @@ EXPECTED_JSON_LEGACY = {
"isMapped": False,
},
"children": [
+ {
+ "id": "group234.task2",
+ "value": {
+ "label": "task2",
+ "labelStyle": "fill:#000;",
+ "style": "fill:#e8f7e4;",
+ "rx": 5,
+ "ry": 5,
+ },
+ },
{
"id": "group234.group34",
"value": {
@@ -122,16 +142,6 @@ EXPECTED_JSON_LEGACY = {
},
],
},
- {
- "id": "group234.task2",
- "value": {
- "label": "task2",
- "labelStyle": "fill:#000;",
- "style": "fill:#e8f7e4;",
- "rx": 5,
- "ry": 5,
- },
- },
{
"id": "group234.upstream_join_id",
"value": {
@@ -143,16 +153,6 @@ EXPECTED_JSON_LEGACY = {
},
],
},
- {
- "id": "task1",
- "value": {
- "label": "task1",
- "labelStyle": "fill:#000;",
- "style": "fill:#e8f7e4;",
- "rx": 5,
- "ry": 5,
- },
- },
{
"id": "task5",
"value": {
@@ -172,12 +172,14 @@ EXPECTED_JSON = {
"tooltip": "",
"is_mapped": False,
"children": [
+ {"id": "task1", "label": "task1", "operator": "EmptyOperator", "type":
"task"},
{
"id": "group234",
"label": "group234",
"tooltip": "",
"is_mapped": False,
"children": [
+ {"id": "group234.task2", "label": "task2", "operator":
"EmptyOperator", "type": "task"},
{
"id": "group234.group34",
"label": "group34",
@@ -200,12 +202,10 @@ EXPECTED_JSON = {
],
"type": "task",
},
- {"id": "group234.task2", "label": "task2", "operator":
"EmptyOperator", "type": "task"},
{"id": "group234.upstream_join_id", "label": "", "type":
"join"},
],
"type": "task",
},
- {"id": "task1", "label": "task1", "operator": "EmptyOperator", "type":
"task"},
{"id": "task5", "label": "task5", "operator": "EmptyOperator", "type":
"task"},
],
"type": "task",
@@ -314,28 +314,28 @@ def test_build_task_group_with_prefix():
"id": None,
"label": None,
"children": [
+ {"id": "task1", "label": "task1"},
{
"id": "group234",
"label": "group234",
"children": [
+ {"id": "task2", "label": "task2"},
{
"id": "group34",
"label": "group34",
"children": [
+ {"id": "group34.task3", "label": "task3"},
{
"id": "group34.group4",
"label": "group4",
"children": [{"id": "task4", "label":
"task4"}],
},
- {"id": "group34.task3", "label": "task3"},
{"id": "group34.downstream_join_id", "label": ""},
],
},
- {"id": "task2", "label": "task2"},
{"id": "group234.upstream_join_id", "label": ""},
],
},
- {"id": "task1", "label": "task1"},
{"id": "task5", "label": "task5"},
],
}
@@ -389,6 +389,7 @@ def test_build_task_group_with_task_decorator():
expected_node_id = {
"id": None,
"children": [
+ {"id": "task_1"},
{
"id": "group234",
"children": [
@@ -399,7 +400,6 @@ def test_build_task_group_with_task_decorator():
{"id": "group234.downstream_join_id"},
],
},
- {"id": "task_1"},
{"id": "task_5"},
],
}
@@ -448,6 +448,7 @@ def test_sub_dag_task_group():
expected_node_id = {
"id": None,
"children": [
+ {"id": "task1"},
{
"id": "group234",
"children": [
@@ -462,7 +463,6 @@ def test_sub_dag_task_group():
{"id": "group234.upstream_join_id"},
],
},
- {"id": "task1"},
{"id": "task5"},
],
}
@@ -540,6 +540,7 @@ def test_dag_edges():
expected_node_id = {
"id": None,
"children": [
+ {"id": "task1"},
{
"id": "group_a",
"children": [
@@ -567,6 +568,8 @@ def test_dag_edges():
{"id": "group_c.downstream_join_id"},
],
},
+ {"id": "task9"},
+ {"id": "task10"},
{
"id": "group_d",
"children": [
@@ -575,9 +578,6 @@ def test_dag_edges():
{"id": "group_d.upstream_join_id"},
],
},
- {"id": "task1"},
- {"id": "task10"},
- {"id": "task9"},
],
}
@@ -818,9 +818,12 @@ def test_build_task_group_deco_context_manager():
node_ids = {
"id": None,
"children": [
+ {"id": "task_start"},
{
"id": "section_1",
"children": [
+ {"id": "section_1.task_1"},
+ {"id": "section_1.task_2"},
{
"id": "section_1.section_2",
"children": [
@@ -828,12 +831,9 @@ def test_build_task_group_deco_context_manager():
{"id": "section_1.section_2.task_4"},
],
},
- {"id": "section_1.task_1"},
- {"id": "section_1.task_2"},
],
},
{"id": "task_end"},
- {"id": "task_start"},
],
}
@@ -992,6 +992,7 @@ def test_task_group_context_mix():
node_ids = {
"id": None,
"children": [
+ {"id": "task_start"},
{
"id": "section_1",
"children": [
@@ -1011,7 +1012,6 @@ def test_task_group_context_mix():
],
},
{"id": "task_end"},
- {"id": "task_start"},
],
}
@@ -1153,17 +1153,17 @@ def test_call_taskgroup_twice():
{
"id": "task_group1",
"children": [
- {"id": "task_group1.end_task"},
{"id": "task_group1.start_task"},
{"id": "task_group1.task"},
+ {"id": "task_group1.end_task"},
],
},
{
"id": "task_group1__1",
"children": [
- {"id": "task_group1__1.end_task"},
{"id": "task_group1__1.start_task"},
{"id": "task_group1__1.task"},
+ {"id": "task_group1__1.end_task"},
],
},
],