This is an automated email from the ASF dual-hosted git repository. rahulvats pushed a commit to branch backport-61273-to-v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0f3e6a8ea536658be778a37416177fe8d19ead51 Author: vatsrahul1001 <[email protected]> AuthorDate: Tue Feb 3 19:01:44 2026 +0530 Flatten grid structure endpoint memory consumption (#61273) The grid structure endpoint was loading all serdags for the shown dagruns into memory at once, before merging them together. Now, we load 5 at a time and also expunge so they can be gc'd more quickly. (cherry picked from commit 40f6ec1c6021f242e80e010043273d2a4cbd4887) --- .../airflow/api_fastapi/core_api/routes/ui/grid.py | 32 +++++++++++++++------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py index 0379bd957ca..a95bb5c7f56 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py @@ -135,6 +135,8 @@ def get_dag_structure( """Return dag structure for grid view.""" latest_serdag = _get_latest_serdag(dag_id, session) latest_dag = latest_serdag.dag + latest_serdag_id = latest_serdag.id + session.expunge(latest_serdag) # allow GC of serdag; only latest_dag is needed from here # Retrieve, sort the previous DAG Runs base_query = select(DagRun.id).where(DagRun.dag_id == dag_id) @@ -159,12 +161,22 @@ def get_dag_structure( nodes = [task_group_to_dict_grid(x) for x in task_group_sort(latest_dag.task_group)] return [GridNodeResponse(**n) for n in nodes] - serdags = session.scalars( - select(SerializedDagModel).where( + # Process and merge the latest serdag first + merged_nodes: list[dict[str, Any]] = [] + nodes = [task_group_to_dict_grid(x) for x in task_group_sort(latest_dag.task_group)] + _merge_node_dicts(merged_nodes, nodes) + del latest_dag + + # Process serdags one by one and merge immediately to reduce memory usage. + # Use yield_per() for streaming results and expunge each serdag after processing + # to allow garbage collection and prevent memory buildup in the session identity map. + serdags_query = ( + select(SerializedDagModel) + .where( # Even though dag_id is filtered in base_query, # adding this line here can improve the performance of this endpoint SerializedDagModel.dag_id == dag_id, - SerializedDagModel.id != latest_serdag.id, + SerializedDagModel.id != latest_serdag_id, SerializedDagModel.dag_version_id.in_( select(TaskInstance.dag_version_id) .join(TaskInstance.dag_run) @@ -174,16 +186,16 @@ def get_dag_structure( .distinct() ), ) + .execution_options(yield_per=5) # balance between peak memory usage and round trips ) - merged_nodes: list[dict[str, Any]] = [] - dags = [latest_dag] - for serdag in serdags: - if serdag: - dags.append(serdag.dag) - for dag in dags: - nodes = [task_group_to_dict_grid(x) for x in task_group_sort(dag.task_group)] + + for serdag in session.scalars(serdags_query): + # Merge immediately instead of collecting all DAGs in memory + nodes = [task_group_to_dict_grid(x) for x in task_group_sort(serdag.dag.task_group)] _merge_node_dicts(merged_nodes, nodes) + session.expunge(serdag) # to allow garbage collection + # Ensure historical tasks (e.g. removed) that exist in TIs for the selected runs are represented def _collect_ids(nodes: list[dict[str, Any]]) -> set[str]: ids: set[str] = set()
