This is an automated email from the ASF dual-hosted git repository.
jasonliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b328dd9a9ad Improve Grid endpoint response time (#48771)
b328dd9a9ad is described below
commit b328dd9a9ad673994ad283f0e3629dd889ecfc93
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Fri Apr 4 03:39:22 2025 +0200
Improve Grid endpoint response time (#48771)
---
.../airflow/api_fastapi/core_api/services/ui/grid.py | 18 ++++++++++++++++--
1 file changed, 16 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
index d49756d75bb..d95d801116b 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
@@ -20,6 +20,7 @@ from __future__ import annotations
from functools import cache
from operator import methodcaller
from typing import Callable
+from uuid import UUID
from typing_extensions import Any
@@ -40,6 +41,7 @@ from airflow.models.taskmap import TaskMap
from airflow.sdk import BaseOperator
from airflow.sdk.definitions.mappedoperator import MappedOperator
from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup
+from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils.state import TaskInstanceState
from airflow.utils.task_group import task_group_to_dict
@@ -176,9 +178,20 @@ def fill_task_instance_summaries(
)
for (task_id, run_id), tis in grouped_task_instances.items()
}
+
+ serdag_cache: dict[UUID, SerializedDAG] = {}
+ task_group_map_cache: dict[UUID, dict[str, dict[str, Any]]] = {}
+
for (task_id, run_id), tis in grouped_task_instances.items():
- serdag = tis[0].dag_version.serialized_dag.dag
- task_node_map = get_task_group_map(dag=serdag)
+ serdag_id = tis[0].dag_version.serialized_dag.id
+
+ serdag_cache[serdag_id] = serdag_cache.get(serdag_id) or
tis[0].dag_version.serialized_dag.dag
+ serdag = serdag_cache[serdag_id]
+
+ task_group_map_cache[serdag_id] = task_group_map_cache.get(serdag_id)
or get_task_group_map(
+ dag=serdag
+ )
+ task_node_map = task_group_map_cache[serdag_id]
ti_try_number = max([ti.try_number for ti in tis])
ti_start_date = min([ti.start_date for ti in tis if ti.start_date],
default=None)
@@ -204,6 +217,7 @@ def fill_task_instance_summaries(
for state in state_priority
}
)
+
# Update Nested Task Group States by aggregating the child states
child_states.update(
{