This is an automated email from the ASF dual-hosted git repository.

rahulvats pushed a commit to branch backport-61273
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit e6661ff680a24d1fa11519be264a95edf3832755
Author: Jed Cunningham <[email protected]>
AuthorDate: Sun Feb 1 07:18:46 2026 -0700

    Flatten grid structure endpoint memory consumption (#61273)
    
    The grid structure endpoint was loading all serdags for the shown
    dagruns into memory at once, before merging them together.
    
    Now, we load 5 at a time and also expunge so they can be gc'd more
    quickly.
---
 .../airflow/api_fastapi/core_api/routes/ui/grid.py | 60 ++++++++--------------
 1 file changed, 21 insertions(+), 39 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
index 0379bd957ca..f0d077b277e 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -135,6 +135,8 @@ def get_dag_structure(
     """Return dag structure for grid view."""
     latest_serdag = _get_latest_serdag(dag_id, session)
     latest_dag = latest_serdag.dag
+    latest_serdag_id = latest_serdag.id
+    session.expunge(latest_serdag)  # allow GC of serdag; only latest_dag is 
needed from here
 
     # Retrieve, sort the previous DAG Runs
     base_query = select(DagRun.id).where(DagRun.dag_id == dag_id)
@@ -159,12 +161,22 @@ def get_dag_structure(
         nodes = [task_group_to_dict_grid(x) for x in 
task_group_sort(latest_dag.task_group)]
         return [GridNodeResponse(**n) for n in nodes]
 
-    serdags = session.scalars(
-        select(SerializedDagModel).where(
+    # Process and merge the latest serdag first
+    merged_nodes: list[dict[str, Any]] = []
+    nodes = [task_group_to_dict_grid(x) for x in 
task_group_sort(latest_dag.task_group)]
+    _merge_node_dicts(merged_nodes, nodes)
+    del latest_dag
+
+    # Process serdags one by one and merge immediately to reduce memory usage.
+    # Use yield_per() for streaming results and expunge each serdag after 
processing
+    # to allow garbage collection and prevent memory buildup in the session 
identity map.
+    serdags_query = (
+        select(SerializedDagModel)
+        .where(
             # Even though dag_id is filtered in base_query,
             # adding this line here can improve the performance of this 
endpoint
             SerializedDagModel.dag_id == dag_id,
-            SerializedDagModel.id != latest_serdag.id,
+            SerializedDagModel.id != latest_serdag_id,
             SerializedDagModel.dag_version_id.in_(
                 select(TaskInstance.dag_version_id)
                 .join(TaskInstance.dag_run)
@@ -174,45 +186,15 @@ def get_dag_structure(
                 .distinct()
             ),
         )
+        .execution_options(yield_per=5)  # balance between peak memory usage 
and round trips
     )
-    merged_nodes: list[dict[str, Any]] = []
-    dags = [latest_dag]
-    for serdag in serdags:
-        if serdag:
-            dags.append(serdag.dag)
-    for dag in dags:
-        nodes = [task_group_to_dict_grid(x) for x in 
task_group_sort(dag.task_group)]
+
+    for serdag in session.scalars(serdags_query):
+        # Merge immediately instead of collecting all DAGs in memory
+        nodes = [task_group_to_dict_grid(x) for x in 
task_group_sort(serdag.dag.task_group)]
         _merge_node_dicts(merged_nodes, nodes)
 
-    # Ensure historical tasks (e.g. removed) that exist in TIs for the 
selected runs are represented
-    def _collect_ids(nodes: list[dict[str, Any]]) -> set[str]:
-        ids: set[str] = set()
-        for n in nodes:
-            nid = n.get("id")
-            if nid:
-                ids.add(nid)
-            children = n.get("children")
-            if children:
-                ids |= _collect_ids(children)  # recurse
-        return ids
-
-    existing_ids = _collect_ids(merged_nodes)
-    historical_task_ids = session.scalars(
-        select(TaskInstance.task_id)
-        .join(TaskInstance.dag_run)
-        .where(TaskInstance.dag_id == dag_id, DagRun.id.in_(run_ids))
-        .distinct()
-    )
-    for task_id in historical_task_ids:
-        if task_id not in existing_ids:
-            merged_nodes.append(
-                {
-                    "id": task_id,
-                    "label": task_id,
-                    "is_mapped": None,
-                    "children": None,
-                }
-            )
+        session.expunge(serdag)  # to allow garbage collection
 
     return [GridNodeResponse(**n) for n in merged_nodes]
 

Reply via email to