This is an automated email from the ASF dual-hosted git repository.

rahulvats pushed a commit to branch backport-61273-to-v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0f3e6a8ea536658be778a37416177fe8d19ead51
Author: vatsrahul1001 <[email protected]>
AuthorDate: Tue Feb 3 19:01:44 2026 +0530

    Flatten grid structure endpoint memory consumption (#61273)
    
    The grid structure endpoint was loading all serdags for the shown
    dagruns into memory at once, before merging them together.
    
    Now, we load 5 at a time and also expunge so they can be gc'd more
    quickly.
    
    (cherry picked from commit 40f6ec1c6021f242e80e010043273d2a4cbd4887)
---
 .../airflow/api_fastapi/core_api/routes/ui/grid.py | 32 +++++++++++++++-------
 1 file changed, 22 insertions(+), 10 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
index 0379bd957ca..a95bb5c7f56 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -135,6 +135,8 @@ def get_dag_structure(
     """Return dag structure for grid view."""
     latest_serdag = _get_latest_serdag(dag_id, session)
     latest_dag = latest_serdag.dag
+    latest_serdag_id = latest_serdag.id
+    session.expunge(latest_serdag)  # allow GC of serdag; only latest_dag is 
needed from here
 
     # Retrieve, sort the previous DAG Runs
     base_query = select(DagRun.id).where(DagRun.dag_id == dag_id)
@@ -159,12 +161,22 @@ def get_dag_structure(
         nodes = [task_group_to_dict_grid(x) for x in 
task_group_sort(latest_dag.task_group)]
         return [GridNodeResponse(**n) for n in nodes]
 
-    serdags = session.scalars(
-        select(SerializedDagModel).where(
+    # Process and merge the latest serdag first
+    merged_nodes: list[dict[str, Any]] = []
+    nodes = [task_group_to_dict_grid(x) for x in 
task_group_sort(latest_dag.task_group)]
+    _merge_node_dicts(merged_nodes, nodes)
+    del latest_dag
+
+    # Process serdags one by one and merge immediately to reduce memory usage.
+    # Use yield_per() for streaming results and expunge each serdag after 
processing
+    # to allow garbage collection and prevent memory buildup in the session 
identity map.
+    serdags_query = (
+        select(SerializedDagModel)
+        .where(
             # Even though dag_id is filtered in base_query,
             # adding this line here can improve the performance of this 
endpoint
             SerializedDagModel.dag_id == dag_id,
-            SerializedDagModel.id != latest_serdag.id,
+            SerializedDagModel.id != latest_serdag_id,
             SerializedDagModel.dag_version_id.in_(
                 select(TaskInstance.dag_version_id)
                 .join(TaskInstance.dag_run)
@@ -174,16 +186,16 @@ def get_dag_structure(
                 .distinct()
             ),
         )
+        .execution_options(yield_per=5)  # balance between peak memory usage 
and round trips
     )
-    merged_nodes: list[dict[str, Any]] = []
-    dags = [latest_dag]
-    for serdag in serdags:
-        if serdag:
-            dags.append(serdag.dag)
-    for dag in dags:
-        nodes = [task_group_to_dict_grid(x) for x in 
task_group_sort(dag.task_group)]
+
+    for serdag in session.scalars(serdags_query):
+        # Merge immediately instead of collecting all DAGs in memory
+        nodes = [task_group_to_dict_grid(x) for x in 
task_group_sort(serdag.dag.task_group)]
         _merge_node_dicts(merged_nodes, nodes)
 
+        session.expunge(serdag)  # to allow garbage collection
+
     # Ensure historical tasks (e.g. removed) that exist in TIs for the 
selected runs are represented
     def _collect_ids(nodes: list[dict[str, Any]]) -> set[str]:
         ids: set[str] = set()

Reply via email to