This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 40f6ec1c602 Flatten grid structure endpoint memory consumption (#61273)
40f6ec1c602 is described below
commit 40f6ec1c6021f242e80e010043273d2a4cbd4887
Author: Jed Cunningham <[email protected]>
AuthorDate: Sun Feb 1 07:18:46 2026 -0700
Flatten grid structure endpoint memory consumption (#61273)
The grid structure endpoint was loading all serdags for the shown
dagruns into memory at once, before merging them together.
Now, we load 5 at a time and also expunge so they can be gc'd more
quickly.
---
.../airflow/api_fastapi/core_api/routes/ui/grid.py | 50 ++++++++++++++--------
1 file changed, 31 insertions(+), 19 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
index f52c3e7f75c..0f061933b0d 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -143,6 +143,8 @@ def get_dag_structure(
"""Return dag structure for grid view."""
latest_serdag = _get_latest_serdag(dag_id, session)
latest_dag = latest_serdag.dag
+ latest_serdag_id = latest_serdag.id
+ session.expunge(latest_serdag) # allow GC of serdag; only latest_dag is
needed from here
# Apply filtering if root task is specified
if root:
@@ -176,12 +178,22 @@ def get_dag_structure(
nodes = [task_group_to_dict_grid(x) for x in
task_group_sort(latest_dag.task_group)]
return [GridNodeResponse(**n) for n in nodes]
- serdags = session.scalars(
- select(SerializedDagModel).where(
+ # Process and merge the latest serdag first
+ merged_nodes: list[dict[str, Any]] = []
+ nodes = [task_group_to_dict_grid(x) for x in
task_group_sort(latest_dag.task_group)]
+ _merge_node_dicts(merged_nodes, nodes)
+ del latest_dag
+
+ # Process serdags one by one and merge immediately to reduce memory usage.
+ # Use yield_per() for streaming results and expunge each serdag after
processing
+ # to allow garbage collection and prevent memory buildup in the session
identity map.
+ serdags_query = (
+ select(SerializedDagModel)
+ .where(
# Even though dag_id is filtered in base_query,
# adding this line here can improve the performance of this
endpoint
SerializedDagModel.dag_id == dag_id,
- SerializedDagModel.id != latest_serdag.id,
+ SerializedDagModel.id != latest_serdag_id,
SerializedDagModel.dag_version_id.in_(
select(TaskInstance.dag_version_id)
.join(TaskInstance.dag_run)
@@ -191,25 +203,25 @@ def get_dag_structure(
.distinct()
),
)
+ .execution_options(yield_per=5) # balance between peak memory usage
and round trips
)
- merged_nodes: list[dict[str, Any]] = []
- dags = [latest_dag]
- for serdag in serdags:
- if serdag:
- filtered_dag = serdag.dag
- # Apply the same filtering to historical DAG versions
- if root:
- filtered_dag = filtered_dag.partial_subset(
- task_ids=root,
- include_upstream=include_upstream,
- include_downstream=include_downstream,
- depth=depth,
- )
- dags.append(filtered_dag)
- for dag in dags:
- nodes = [task_group_to_dict_grid(x) for x in
task_group_sort(dag.task_group)]
+
+ for serdag in session.scalars(serdags_query):
+ filtered_dag = serdag.dag
+ # Apply the same filtering to historical DAG versions
+ if root:
+ filtered_dag = filtered_dag.partial_subset(
+ task_ids=root,
+ include_upstream=include_upstream,
+ include_downstream=include_downstream,
+ depth=depth,
+ )
+ # Merge immediately instead of collecting all DAGs in memory
+ nodes = [task_group_to_dict_grid(x) for x in
task_group_sort(filtered_dag.task_group)]
_merge_node_dicts(merged_nodes, nodes)
+ session.expunge(serdag) # to allow garbage collection
+
return [GridNodeResponse(**n) for n in merged_nodes]