This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c77a6d2480 Stream grid task-instance summaries as NDJSON to eliminate 
N+1 requests (#62369)
7c77a6d2480 is described below

commit 7c77a6d2480cb27eea5d29724bbbfdcf2fbd140b
Author: Xavi Vega <[email protected]>
AuthorDate: Wed Mar 18 12:29:34 2026 +0100

    Stream grid task-instance summaries as NDJSON to eliminate N+1 requests 
(#62369)
    
    * Stream task instance summaries for multiple DAG runs over a single NDJSON 
connection to eliminate N+1 requests
    
    * Stream task instance summaries for multiple DAG runs over a single NDJSON 
connection, replacing individual requests to improve performance and eliminate 
N+1 query issues.
    
    * Fix capitalization of "Dag" in documentation and code comments for 
consistency
    
    * Refactor GridTISummaries schema and update streaming endpoint to improve 
clarity and performance
    
    * Fix formatting and linter issues
    
    * Fix static check
    
    * Fix static check
    
    * Fix static check
---
 airflow-core/newsfragments/62369.significant.rst   |  30 ++++
 .../api_fastapi/core_api/openapi/_private_ui.yaml  |  50 +++---
 .../airflow/api_fastapi/core_api/routes/ui/grid.py | 187 +++++++++++----------
 .../src/airflow/ui/openapi-gen/queries/common.ts   |  12 +-
 .../ui/openapi-gen/queries/ensureQueryData.ts      |  28 ++-
 .../src/airflow/ui/openapi-gen/queries/prefetch.ts |  28 ++-
 .../src/airflow/ui/openapi-gen/queries/queries.ts  |  28 ++-
 .../src/airflow/ui/openapi-gen/queries/suspense.ts |  28 ++-
 .../ui/openapi-gen/requests/services.gen.ts        |  34 ++--
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |  14 +-
 .../airflow/ui/src/layouts/Details/Gantt/Gantt.tsx |  11 +-
 .../airflow/ui/src/layouts/Details/Graph/Graph.tsx |   5 +-
 .../airflow/ui/src/layouts/Details/Grid/Grid.tsx   |   8 +
 .../layouts/Details/Grid/TaskInstancesColumn.tsx   |  15 +-
 .../pages/GroupTaskInstance/GroupTaskInstance.tsx  |   5 +-
 .../MappedTaskInstance/MappedTaskInstance.tsx      |   5 +-
 .../ui/src/pages/TaskInstance/TaskInstance.tsx     |   5 +-
 .../src/airflow/ui/src/queries/useClearRun.ts      |   2 -
 .../ui/src/queries/useClearTaskInstances.ts        |   9 -
 .../ui/src/queries/useDeleteTaskInstance.ts        |   2 -
 .../airflow/ui/src/queries/useGridTISummaries.ts   | 134 +++++++++++----
 .../src/airflow/ui/src/queries/usePatchDagRun.ts   |   2 -
 .../airflow/ui/src/queries/usePatchTaskInstance.ts |  19 +--
 .../api_fastapi/core_api/routes/ui/test_grid.py    |  68 +++++++-
 24 files changed, 418 insertions(+), 311 deletions(-)

diff --git a/airflow-core/newsfragments/62369.significant.rst 
b/airflow-core/newsfragments/62369.significant.rst
new file mode 100644
index 00000000000..b5d4113ae58
--- /dev/null
+++ b/airflow-core/newsfragments/62369.significant.rst
@@ -0,0 +1,30 @@
+Replace per-run TI summary requests with a single NDJSON stream
+
+The grid, graph, gantt, and task-detail views now fetch task-instance
+summaries through a single streaming HTTP request
+(``GET /ui/grid/ti_summaries/{dag_id}?run_ids=...``) instead of one request
+per run.  The server emits one JSON line per run as soon as that run's task
+instances are ready, so columns appear progressively rather than all at once.
+
+**What changed:**
+
+- ``GET /ui/grid/ti_summaries/{dag_id}?run_ids=...`` is now the sole endpoint
+  for TI summaries, returning an ``application/x-ndjson`` stream where each
+  line is a serialized ``GridTISummaries`` object for one run.
+- The old single-run endpoint ``GET /ui/grid/ti_summaries/{dag_id}/{run_id}``
+  has been removed.
+- The serialized Dag structure is loaded once and shared across all runs that
+  share the same ``dag_version_id``, avoiding redundant deserialization.
+- All UI views (grid, graph, gantt, task instance, mapped task instance, group
+  task instance) use the stream endpoint, passing one or more ``run_ids``.
+
+* Types of change
+
+  * [ ] Dag changes
+  * [ ] Config changes
+  * [x] API changes
+  * [ ] CLI changes
+  * [x] Behaviour changes
+  * [ ] Plugin changes
+  * [ ] Dependency changes
+  * [ ] Code interface changes
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index 2b10156b4f5..9e81e8344fb 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -1129,35 +1129,26 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
-  /ui/grid/ti_summaries/{dag_id}/{run_id}:
+  /ui/grid/ti_summaries/{dag_id}:
     get:
       tags:
       - Grid
-      summary: Get Grid Ti Summaries
-      description: 'Get states for TIs / "groups" of TIs.
+      summary: Get Grid Ti Summaries Stream
+      description: 'Stream TI summaries for multiple Dag runs as NDJSON (one 
JSON
+        line per run).
 
 
-        Essentially this is to know what color to put in the squares in the 
grid.
+        Each line is a serialized ``GridTISummaries`` object emitted as soon 
as that
 
+        run''s task instances have been processed, so the client can render 
columns
 
-        The tricky part here is that we aggregate the state for groups and 
mapped
-        tasks.
+        progressively without waiting for all runs to complete.
 
 
-        We don''t add all the TIs for mapped TIs -- we only add one entry for 
the
-        mapped task and
+        The serialized Dag structure is loaded once and reused for all runs 
that
 
-        its state is an aggregate of its TI states.
-
-
-        And for task groups, we add a "task" for that which is not really a 
task but
-        is just
-
-        an entry that represents the group (so that we can show a filled in 
box when
-        the group
-
-        is not expanded) and its state is an agg of those within it.'
-      operationId: get_grid_ti_summaries
+        share the same ``dag_version_id``, avoiding repeated deserialization.'
+      operationId: get_grid_ti_summaries_stream
       security:
       - OAuth2PasswordBearer: []
       - HTTPBearer: []
@@ -1168,19 +1159,24 @@ paths:
         schema:
           type: string
           title: Dag Id
-      - name: run_id
-        in: path
-        required: true
+      - name: run_ids
+        in: query
+        required: false
         schema:
-          type: string
-          title: Run Id
+          anyOf:
+          - type: array
+            items:
+              type: string
+          - type: 'null'
+          title: Run Ids
       responses:
         '200':
-          description: Successful Response
+          description: "NDJSON stream \u2014 one ``GridTISummaries`` JSON 
object per\
+            \ line, one per Dag run"
           content:
-            application/json:
+            application/x-ndjson:
               schema:
-                $ref: '#/components/schemas/GridTISummaries'
+                type: string
         '400':
           content:
             application/json:
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
index ca363c3adab..0143ae81e14 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -18,10 +18,12 @@
 from __future__ import annotations
 
 import collections
+from collections.abc import Generator, Sequence
 from typing import TYPE_CHECKING, Annotated, Any
 
 import structlog
-from fastapi import Depends, HTTPException, status
+from fastapi import Depends, HTTPException, Query, status
+from fastapi.responses import StreamingResponse
 from sqlalchemy import exists, select
 from sqlalchemy.orm import joinedload, load_only, selectinload
 
@@ -337,96 +339,30 @@ def get_grid_runs(
     return grid_runs
 
 
-@grid_router.get(
-    "/ti_summaries/{dag_id}/{run_id}",
-    responses=create_openapi_http_exception_doc(
-        [
-            status.HTTP_400_BAD_REQUEST,
-            status.HTTP_404_NOT_FOUND,
-        ]
-    ),
-    dependencies=[
-        Depends(
-            requires_access_dag(
-                method="GET",
-                access_entity=DagAccessEntity.TASK_INSTANCE,
-            )
-        ),
-        Depends(
-            requires_access_dag(
-                method="GET",
-                access_entity=DagAccessEntity.RUN,
-            )
-        ),
-    ],
-)
-def get_grid_ti_summaries(
-    dag_id: str,
-    run_id: str,
-    session: SessionDep,
-) -> GridTISummaries:
-    """
-    Get states for TIs / "groups" of TIs.
-
-    Essentially this is to know what color to put in the squares in the grid.
-
-    The tricky part here is that we aggregate the state for groups and mapped 
tasks.
-
-    We don't add all the TIs for mapped TIs -- we only add one entry for the 
mapped task and
-    its state is an aggregate of its TI states.
-
-    And for task groups, we add a "task" for that which is not really a task 
but is just
-    an entry that represents the group (so that we can show a filled in box 
when the group
-    is not expanded) and its state is an agg of those within it.
-    """
-    tis_of_dag_runs, _ = paginated_select(
-        statement=(
-            select(
-                TaskInstance.task_id,
-                TaskInstance.state,
-                TaskInstance.dag_version_id,
-                TaskInstance.start_date,
-                TaskInstance.end_date,
-                DagVersion.version_number,
-            )
-            .outerjoin(DagVersion, TaskInstance.dag_version_id == 
DagVersion.id)
-            .where(TaskInstance.dag_id == dag_id)
-            .where(
-                TaskInstance.run_id == run_id,
-            )
-        ),
-        filters=[],
-        order_by=SortParam(allowed_attrs=["task_id", "run_id"], 
model=TaskInstance).set_value(["task_id"]),
-        limit=None,
-        return_total_entries=False,
-    )
-    task_instances = list(session.execute(tis_of_dag_runs))
-    if not task_instances:
-        raise HTTPException(
-            status.HTTP_404_NOT_FOUND, f"No task instances for dag_id={dag_id} 
run_id={run_id}"
-        )
-    ti_details = collections.defaultdict(list)
+def _build_ti_summaries(
+    dag_id: str, run_id: str, task_instances: Sequence, session, serdag: 
SerializedDagModel | None = None
+) -> dict:
+    ti_details: dict = collections.defaultdict(list)
     for ti in task_instances:
         ti_details[ti.task_id].append(
             {
                 "state": ti.state,
                 "start_date": ti.start_date,
                 "end_date": ti.end_date,
-                "dag_version_number": ti.version_number,
+                "dag_version_number": getattr(ti, "version_number", None),
             }
         )
-    serdag = _get_serdag(
-        dag_id=dag_id,
-        dag_version_id=task_instances[0].dag_version_id,
-        session=session,
-    )
+    if serdag is None:
+        serdag = _get_serdag(
+            dag_id=dag_id,
+            dag_version_id=task_instances[0].dag_version_id,
+            session=session,
+        )
     if TYPE_CHECKING:
         assert serdag
 
-    def get_node_sumaries():
+    def get_node_summaries():
         yielded_task_ids: set[str] = set()
-
-        # Yield all nodes discoverable from the serialized DAG structure
         for node in _find_aggregates(
             node=serdag.dag.task_group,
             parent_node=None,
@@ -437,13 +373,9 @@ def get_grid_ti_summaries(
                 if node["type"] == "task":
                     node["child_states"] = None
             yield node
-
-        # For good history: add synthetic leaf nodes for task_ids that have 
TIs in this run
-        # but are not present in the current DAG structure (e.g. removed tasks)
         missing_task_ids = set(ti_details.keys()) - yielded_task_ids
         for task_id in sorted(missing_task_ids):
             detail = ti_details[task_id]
-            # Create a leaf task node with aggregated state from its TIs
             agg = _get_aggs_for_node(detail)
             yield {
                 "task_id": task_id,
@@ -451,17 +383,86 @@ def get_grid_ti_summaries(
                 "type": "task",
                 "parent_id": None,
                 **agg,
-                # Leaf tasks have no children
                 "child_states": None,
             }
 
-    task_instances = list(get_node_sumaries())
+    nodes = list(get_node_summaries())
     # If a group id and a task id collide, prefer the group record
-    group_ids = {n.get("task_id") for n in task_instances if n.get("type") == 
"group"}
-    filtered = [n for n in task_instances if not (n.get("type") == "task" and 
n.get("task_id") in group_ids)]
-
-    return {  # type: ignore[return-value]
-        "run_id": run_id,
-        "dag_id": dag_id,
-        "task_instances": filtered,
-    }
+    group_ids = {n.get("task_id") for n in nodes if n.get("type") == "group"}
+    filtered = [n for n in nodes if not (n.get("type") == "task" and 
n.get("task_id") in group_ids)]
+    return {"run_id": run_id, "dag_id": dag_id, "task_instances": filtered}
+
+
+@grid_router.get(
+    "/ti_summaries/{dag_id}",
+    response_class=StreamingResponse,
+    response_model=GridTISummaries,
+    responses={
+        **create_openapi_http_exception_doc(
+            [
+                status.HTTP_400_BAD_REQUEST,
+                status.HTTP_404_NOT_FOUND,
+            ]
+        ),
+        200: {
+            "content": {"application/x-ndjson": {"schema": {"type": 
"string"}}},
+            "description": "NDJSON stream — one ``GridTISummaries`` JSON 
object per line, one per Dag run",
+        },
+    },
+    dependencies=[
+        Depends(
+            requires_access_dag(
+                method="GET",
+                access_entity=DagAccessEntity.TASK_INSTANCE,
+            )
+        ),
+        Depends(
+            requires_access_dag(
+                method="GET",
+                access_entity=DagAccessEntity.RUN,
+            )
+        ),
+    ],
+)
+def get_grid_ti_summaries_stream(
+    dag_id: str,
+    session: SessionDep,
+    run_ids: Annotated[list[str] | None, Query()] = None,
+) -> StreamingResponse:
+    """
+    Stream TI summaries for multiple Dag runs as NDJSON (one JSON line per 
run).
+
+    Each line is a serialized ``GridTISummaries`` object emitted as soon as 
that
+    run's task instances have been processed, so the client can render columns
+    progressively without waiting for all runs to complete.
+
+    The serialized Dag structure is loaded once and reused for all runs that
+    share the same ``dag_version_id``, avoiding repeated deserialization.
+    """
+
+    def _generate() -> Generator[str, None, None]:
+        serdag_cache: dict = {}
+        for run_id in run_ids or []:
+            tis = session.execute(
+                select(
+                    TaskInstance.task_id,
+                    TaskInstance.state,
+                    TaskInstance.dag_version_id,
+                    TaskInstance.start_date,
+                    TaskInstance.end_date,
+                    DagVersion.version_number,
+                )
+                .outerjoin(DagVersion, TaskInstance.dag_version_id == 
DagVersion.id)
+                .where(TaskInstance.dag_id == dag_id)
+                .where(TaskInstance.run_id == run_id)
+                .order_by(TaskInstance.task_id)
+            ).all()
+            if not tis:
+                continue
+            version_id = tis[0].dag_version_id
+            if version_id not in serdag_cache:
+                serdag_cache[version_id] = _get_serdag(dag_id, version_id, 
session)
+            summary = _build_ti_summaries(dag_id, run_id, tis, session, 
serdag=serdag_cache[version_id])
+            yield GridTISummaries.model_validate(summary).model_dump_json() + 
"\n"
+
+    return StreamingResponse(content=_generate(), 
media_type="application/x-ndjson")
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
index 5640e4cce55..612a3d56747 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
@@ -880,13 +880,13 @@ export const UseGridServiceGetGridRunsKeyFn = ({ dagId, 
limit, offset, orderBy,
   state?: string[];
   triggeringUser?: string;
 }, queryKey?: Array<unknown>) => [useGridServiceGetGridRunsKey, ...(queryKey 
?? [{ dagId, limit, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, 
runAfterLte, runType, state, triggeringUser }])];
-export type GridServiceGetGridTiSummariesDefaultResponse = 
Awaited<ReturnType<typeof GridService.getGridTiSummaries>>;
-export type GridServiceGetGridTiSummariesQueryResult<TData = 
GridServiceGetGridTiSummariesDefaultResponse, TError = unknown> = 
UseQueryResult<TData, TError>;
-export const useGridServiceGetGridTiSummariesKey = 
"GridServiceGetGridTiSummaries";
-export const UseGridServiceGetGridTiSummariesKeyFn = ({ dagId, runId }: {
+export type GridServiceGetGridTiSummariesStreamDefaultResponse = 
Awaited<ReturnType<typeof GridService.getGridTiSummariesStream>>;
+export type GridServiceGetGridTiSummariesStreamQueryResult<TData = 
GridServiceGetGridTiSummariesStreamDefaultResponse, TError = unknown> = 
UseQueryResult<TData, TError>;
+export const useGridServiceGetGridTiSummariesStreamKey = 
"GridServiceGetGridTiSummariesStream";
+export const UseGridServiceGetGridTiSummariesStreamKeyFn = ({ dagId, runIds }: 
{
   dagId: string;
-  runId: string;
-}, queryKey?: Array<unknown>) => [useGridServiceGetGridTiSummariesKey, 
...(queryKey ?? [{ dagId, runId }])];
+  runIds?: string[];
+}, queryKey?: Array<unknown>) => [useGridServiceGetGridTiSummariesStreamKey, 
...(queryKey ?? [{ dagId, runIds }])];
 export type GanttServiceGetGanttDataDefaultResponse = 
Awaited<ReturnType<typeof GanttService.getGanttData>>;
 export type GanttServiceGetGanttDataQueryResult<TData = 
GanttServiceGetGanttDataDefaultResponse, TError = unknown> = 
UseQueryResult<TData, TError>;
 export const useGanttServiceGetGanttDataKey = "GanttServiceGetGanttData";
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
index a6bf227ca40..8bc9e9df8e6 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
@@ -1672,29 +1672,25 @@ export const ensureUseGridServiceGetGridRunsData = 
(queryClient: QueryClient, {
   triggeringUser?: string;
 }) => queryClient.ensureQueryData({ queryKey: 
Common.UseGridServiceGetGridRunsKeyFn({ dagId, limit, offset, orderBy, 
runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, 
triggeringUser }), queryFn: () => GridService.getGridRuns({ dagId, limit, 
offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, 
state, triggeringUser }) });
 /**
-* Get Grid Ti Summaries
-* Get states for TIs / "groups" of TIs.
+* Get Grid Ti Summaries Stream
+* Stream TI summaries for multiple Dag runs as NDJSON (one JSON line per run).
 *
-* Essentially this is to know what color to put in the squares in the grid.
+* Each line is a serialized ``GridTISummaries`` object emitted as soon as that
+* run's task instances have been processed, so the client can render columns
+* progressively without waiting for all runs to complete.
 *
-* The tricky part here is that we aggregate the state for groups and mapped 
tasks.
-*
-* We don't add all the TIs for mapped TIs -- we only add one entry for the 
mapped task and
-* its state is an aggregate of its TI states.
-*
-* And for task groups, we add a "task" for that which is not really a task but 
is just
-* an entry that represents the group (so that we can show a filled in box when 
the group
-* is not expanded) and its state is an agg of those within it.
+* The serialized Dag structure is loaded once and reused for all runs that
+* share the same ``dag_version_id``, avoiding repeated deserialization.
 * @param data The data for the request.
 * @param data.dagId
-* @param data.runId
-* @returns GridTISummaries Successful Response
+* @param data.runIds
+* @returns string NDJSON stream — one ``GridTISummaries`` JSON object per 
line, one per Dag run
 * @throws ApiError
 */
-export const ensureUseGridServiceGetGridTiSummariesData = (queryClient: 
QueryClient, { dagId, runId }: {
+export const ensureUseGridServiceGetGridTiSummariesStreamData = (queryClient: 
QueryClient, { dagId, runIds }: {
   dagId: string;
-  runId: string;
-}) => queryClient.ensureQueryData({ queryKey: 
Common.UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId }), queryFn: () => 
GridService.getGridTiSummaries({ dagId, runId }) });
+  runIds?: string[];
+}) => queryClient.ensureQueryData({ queryKey: 
Common.UseGridServiceGetGridTiSummariesStreamKeyFn({ dagId, runIds }), queryFn: 
() => GridService.getGridTiSummariesStream({ dagId, runIds }) });
 /**
 * Get Gantt Data
 * Get all task instance tries for Gantt chart.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
index c6cc3d62ccf..f4cb6f482bd 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -1672,29 +1672,25 @@ export const prefetchUseGridServiceGetGridRuns = 
(queryClient: QueryClient, { da
   triggeringUser?: string;
 }) => queryClient.prefetchQuery({ queryKey: 
Common.UseGridServiceGetGridRunsKeyFn({ dagId, limit, offset, orderBy, 
runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, 
triggeringUser }), queryFn: () => GridService.getGridRuns({ dagId, limit, 
offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, 
state, triggeringUser }) });
 /**
-* Get Grid Ti Summaries
-* Get states for TIs / "groups" of TIs.
+* Get Grid Ti Summaries Stream
+* Stream TI summaries for multiple Dag runs as NDJSON (one JSON line per run).
 *
-* Essentially this is to know what color to put in the squares in the grid.
+* Each line is a serialized ``GridTISummaries`` object emitted as soon as that
+* run's task instances have been processed, so the client can render columns
+* progressively without waiting for all runs to complete.
 *
-* The tricky part here is that we aggregate the state for groups and mapped 
tasks.
-*
-* We don't add all the TIs for mapped TIs -- we only add one entry for the 
mapped task and
-* its state is an aggregate of its TI states.
-*
-* And for task groups, we add a "task" for that which is not really a task but 
is just
-* an entry that represents the group (so that we can show a filled in box when 
the group
-* is not expanded) and its state is an agg of those within it.
+* The serialized Dag structure is loaded once and reused for all runs that
+* share the same ``dag_version_id``, avoiding repeated deserialization.
 * @param data The data for the request.
 * @param data.dagId
-* @param data.runId
-* @returns GridTISummaries Successful Response
+* @param data.runIds
+* @returns string NDJSON stream — one ``GridTISummaries`` JSON object per 
line, one per Dag run
 * @throws ApiError
 */
-export const prefetchUseGridServiceGetGridTiSummaries = (queryClient: 
QueryClient, { dagId, runId }: {
+export const prefetchUseGridServiceGetGridTiSummariesStream = (queryClient: 
QueryClient, { dagId, runIds }: {
   dagId: string;
-  runId: string;
-}) => queryClient.prefetchQuery({ queryKey: 
Common.UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId }), queryFn: () => 
GridService.getGridTiSummaries({ dagId, runId }) });
+  runIds?: string[];
+}) => queryClient.prefetchQuery({ queryKey: 
Common.UseGridServiceGetGridTiSummariesStreamKeyFn({ dagId, runIds }), queryFn: 
() => GridService.getGridTiSummariesStream({ dagId, runIds }) });
 /**
 * Get Gantt Data
 * Get all task instance tries for Gantt chart.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index 7a0cc07eb20..8e9ef5aa29d 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -1672,29 +1672,25 @@ export const useGridServiceGetGridRuns = <TData = 
Common.GridServiceGetGridRunsD
   triggeringUser?: string;
 }, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: 
Common.UseGridServiceGetGridRunsKeyFn({ dagId, limit, offset, orderBy, 
runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, 
triggeringUser }, queryKey), queryFn: () => GridService.getGridRuns({ dagId, 
limit, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, 
runType, state, triggeringUser }) as TData, ...options });
 /**
-* Get Grid Ti Summaries
-* Get states for TIs / "groups" of TIs.
+* Get Grid Ti Summaries Stream
+* Stream TI summaries for multiple Dag runs as NDJSON (one JSON line per run).
 *
-* Essentially this is to know what color to put in the squares in the grid.
+* Each line is a serialized ``GridTISummaries`` object emitted as soon as that
+* run's task instances have been processed, so the client can render columns
+* progressively without waiting for all runs to complete.
 *
-* The tricky part here is that we aggregate the state for groups and mapped 
tasks.
-*
-* We don't add all the TIs for mapped TIs -- we only add one entry for the 
mapped task and
-* its state is an aggregate of its TI states.
-*
-* And for task groups, we add a "task" for that which is not really a task but 
is just
-* an entry that represents the group (so that we can show a filled in box when 
the group
-* is not expanded) and its state is an agg of those within it.
+* The serialized Dag structure is loaded once and reused for all runs that
+* share the same ``dag_version_id``, avoiding repeated deserialization.
 * @param data The data for the request.
 * @param data.dagId
-* @param data.runId
-* @returns GridTISummaries Successful Response
+* @param data.runIds
+* @returns string NDJSON stream — one ``GridTISummaries`` JSON object per 
line, one per Dag run
 * @throws ApiError
 */
-export const useGridServiceGetGridTiSummaries = <TData = 
Common.GridServiceGetGridTiSummariesDefaultResponse, TError = unknown, 
TQueryKey extends Array<unknown> = unknown[]>({ dagId, runId }: {
+export const useGridServiceGetGridTiSummariesStream = <TData = 
Common.GridServiceGetGridTiSummariesStreamDefaultResponse, TError = unknown, 
TQueryKey extends Array<unknown> = unknown[]>({ dagId, runIds }: {
   dagId: string;
-  runId: string;
-}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: 
Common.UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId }, queryKey), 
queryFn: () => GridService.getGridTiSummaries({ dagId, runId }) as TData, 
...options });
+  runIds?: string[];
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: 
Common.UseGridServiceGetGridTiSummariesStreamKeyFn({ dagId, runIds }, 
queryKey), queryFn: () => GridService.getGridTiSummariesStream({ dagId, runIds 
}) as TData, ...options });
 /**
 * Get Gantt Data
 * Get all task instance tries for Gantt chart.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
index 518f42ef01e..c4a41691b1a 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
@@ -1672,29 +1672,25 @@ export const useGridServiceGetGridRunsSuspense = <TData 
= Common.GridServiceGetG
   triggeringUser?: string;
 }, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: 
Common.UseGridServiceGetGridRunsKeyFn({ dagId, limit, offset, orderBy, 
runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, 
triggeringUser }, queryKey), queryFn: () => GridService.getGridRuns({ dagId, 
limit, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, 
runType, state, triggeringUser }) as TData, ...options });
 /**
-* Get Grid Ti Summaries
-* Get states for TIs / "groups" of TIs.
+* Get Grid Ti Summaries Stream
+* Stream TI summaries for multiple Dag runs as NDJSON (one JSON line per run).
 *
-* Essentially this is to know what color to put in the squares in the grid.
+* Each line is a serialized ``GridTISummaries`` object emitted as soon as that
+* run's task instances have been processed, so the client can render columns
+* progressively without waiting for all runs to complete.
 *
-* The tricky part here is that we aggregate the state for groups and mapped 
tasks.
-*
-* We don't add all the TIs for mapped TIs -- we only add one entry for the 
mapped task and
-* its state is an aggregate of its TI states.
-*
-* And for task groups, we add a "task" for that which is not really a task but 
is just
-* an entry that represents the group (so that we can show a filled in box when 
the group
-* is not expanded) and its state is an agg of those within it.
+* The serialized Dag structure is loaded once and reused for all runs that
+* share the same ``dag_version_id``, avoiding repeated deserialization.
 * @param data The data for the request.
 * @param data.dagId
-* @param data.runId
-* @returns GridTISummaries Successful Response
+* @param data.runIds
+* @returns string NDJSON stream — one ``GridTISummaries`` JSON object per 
line, one per Dag run
 * @throws ApiError
 */
-export const useGridServiceGetGridTiSummariesSuspense = <TData = 
Common.GridServiceGetGridTiSummariesDefaultResponse, TError = unknown, 
TQueryKey extends Array<unknown> = unknown[]>({ dagId, runId }: {
+export const useGridServiceGetGridTiSummariesStreamSuspense = <TData = 
Common.GridServiceGetGridTiSummariesStreamDefaultResponse, TError = unknown, 
TQueryKey extends Array<unknown> = unknown[]>({ dagId, runIds }: {
   dagId: string;
-  runId: string;
-}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: 
Common.UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId }, queryKey), 
queryFn: () => GridService.getGridTiSummaries({ dagId, runId }) as TData, 
...options });
+  runIds?: string[];
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: 
Common.UseGridServiceGetGridTiSummariesStreamKeyFn({ dagId, runIds }, 
queryKey), queryFn: () => GridService.getGridTiSummariesStream({ dagId, runIds 
}) as TData, ...options });
 /**
 * Get Gantt Data
 * Get all task instance tries for Gantt chart.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index 81facc3769f..6e701bf68dc 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -3,7 +3,7 @@
 import type { CancelablePromise } from './core/CancelablePromise';
 import { OpenAPI } from './core/OpenAPI';
 import { request as __request } from './core/request';
-import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, 
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, 
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, 
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, 
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, 
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, 
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, 
Dele [...]
+import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, 
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, 
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, 
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, 
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, 
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, 
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, 
Dele [...]
 
 export class AssetService {
     /**
@@ -4159,32 +4159,30 @@ export class GridService {
     }
     
     /**
-     * Get Grid Ti Summaries
-     * Get states for TIs / "groups" of TIs.
+     * Get Grid Ti Summaries Stream
+     * Stream TI summaries for multiple Dag runs as NDJSON (one JSON line per 
run).
      *
-     * Essentially this is to know what color to put in the squares in the 
grid.
+     * Each line is a serialized ``GridTISummaries`` object emitted as soon as 
that
+     * run's task instances have been processed, so the client can render 
columns
+     * progressively without waiting for all runs to complete.
      *
-     * The tricky part here is that we aggregate the state for groups and 
mapped tasks.
-     *
-     * We don't add all the TIs for mapped TIs -- we only add one entry for 
the mapped task and
-     * its state is an aggregate of its TI states.
-     *
-     * And for task groups, we add a "task" for that which is not really a 
task but is just
-     * an entry that represents the group (so that we can show a filled in box 
when the group
-     * is not expanded) and its state is an agg of those within it.
+     * The serialized Dag structure is loaded once and reused for all runs that
+     * share the same ``dag_version_id``, avoiding repeated deserialization.
      * @param data The data for the request.
      * @param data.dagId
-     * @param data.runId
-     * @returns GridTISummaries Successful Response
+     * @param data.runIds
+     * @returns string NDJSON stream — one ``GridTISummaries`` JSON object per 
line, one per Dag run
      * @throws ApiError
      */
-    public static getGridTiSummaries(data: GetGridTiSummariesData): 
CancelablePromise<GetGridTiSummariesResponse> {
+    public static getGridTiSummariesStream(data: 
GetGridTiSummariesStreamData): 
CancelablePromise<GetGridTiSummariesStreamResponse> {
         return __request(OpenAPI, {
             method: 'GET',
-            url: '/ui/grid/ti_summaries/{dag_id}/{run_id}',
+            url: '/ui/grid/ti_summaries/{dag_id}',
             path: {
-                dag_id: data.dagId,
-                run_id: data.runId
+                dag_id: data.dagId
+            },
+            query: {
+                run_ids: data.runIds
             },
             errors: {
                 400: 'Bad Request',
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index a59532b6c16..7423c08d42c 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -3653,12 +3653,12 @@ export type GetGridRunsData = {
 
 export type GetGridRunsResponse = Array<GridRunsResponse>;
 
-export type GetGridTiSummariesData = {
+export type GetGridTiSummariesStreamData = {
     dagId: string;
-    runId: string;
+    runIds?: Array<(string)> | null;
 };
 
-export type GetGridTiSummariesResponse = GridTISummaries;
+export type GetGridTiSummariesStreamResponse = string;
 
 export type GetGanttDataData = {
     dagId: string;
@@ -6936,14 +6936,14 @@ export type $OpenApiTs = {
             };
         };
     };
-    '/ui/grid/ti_summaries/{dag_id}/{run_id}': {
+    '/ui/grid/ti_summaries/{dag_id}': {
         get: {
-            req: GetGridTiSummariesData;
+            req: GetGridTiSummariesStreamData;
             res: {
                 /**
-                 * Successful Response
+                 * NDJSON stream — one ``GridTISummaries`` JSON object per 
line, one per Dag run
                  */
-                200: GridTISummaries;
+                200: string;
                 /**
                  * Bad Request
                  */
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx 
b/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx
index 6c546c58699..71d14ccfbe7 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx
@@ -48,7 +48,7 @@ import { GRID_BODY_OFFSET_PX } from 
"src/layouts/Details/Grid/constants";
 import { flattenNodes } from "src/layouts/Details/Grid/utils";
 import { useGridRuns } from "src/queries/useGridRuns";
 import { useGridStructure } from "src/queries/useGridStructure";
-import { useGridTiSummaries } from "src/queries/useGridTISummaries";
+import { useGridTiSummariesStream } from "src/queries/useGridTISummaries";
 import { getComputedCSSVariableValue } from "src/theme";
 import { isStatePending, useAutoRefresh } from "src/utils";
 
@@ -131,12 +131,13 @@ export const Gantt = ({ dagRunState, limit, runType, 
triggeringUser }: Props) =>
   const refetchInterval = useAutoRefresh({ dagId });
 
   // Get grid summaries for groups and mapped tasks (which have min/max times)
-  const { data: gridTiSummaries, isLoading: summariesLoading } = 
useGridTiSummaries({
+  const { summariesByRunId } = useGridTiSummariesStream({
     dagId,
-    enabled: Boolean(selectedRun),
-    runId,
-    state: selectedRun?.state,
+    runIds: runId && selectedRun ? [runId] : [],
+    states: selectedRun ? [selectedRun.state] : [],
   });
+  const gridTiSummaries = summariesByRunId.get(runId);
+  const summariesLoading = Boolean(runId && selectedRun && 
!summariesByRunId.has(runId));
 
   // Single fetch for all Gantt data (individual task tries)
   const { data: ganttData, isLoading: ganttLoading } = 
useGanttServiceGetGanttData(
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx 
b/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx
index 0b69efb7d43..6e3fe794021 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx
@@ -34,7 +34,7 @@ import { useOpenGroups } from "src/context/openGroups";
 import useSelectedVersion from "src/hooks/useSelectedVersion";
 import { flattenGraphNodes } from "src/layouts/Details/Grid/utils.ts";
 import { useDependencyGraph } from "src/queries/useDependencyGraph";
-import { useGridTiSummaries } from "src/queries/useGridTISummaries.ts";
+import { useGridTiSummariesStream } from "src/queries/useGridTISummaries.ts";
 import { getReactFlowThemeStyle } from "src/theme";
 
 const nodeColor = (
@@ -134,7 +134,8 @@ export const Graph = () => {
     versionNumber: selectedVersion,
   });
 
-  const { data: gridTISummaries } = useGridTiSummaries({ dagId, runId });
+  const { summariesByRunId } = useGridTiSummariesStream({ dagId, runIds: runId 
? [runId] : [] });
+  const gridTISummaries = runId ? summariesByRunId.get(runId) : undefined;
 
   // Add task instances to the node data but without having to recalculate how 
the graph is laid out
   const nodes = data?.nodes.map((node) => {
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx 
b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
index 517f5d9f7d4..07537c2cb2b 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx
@@ -31,6 +31,7 @@ import { useOpenGroups } from "src/context/openGroups";
 import { NavigationModes, useNavigation } from "src/hooks/navigation";
 import { useGridRuns } from "src/queries/useGridRuns.ts";
 import { useGridStructure } from "src/queries/useGridStructure.ts";
+import { useGridTiSummariesStream } from "src/queries/useGridTISummaries.ts";
 import { isStatePending } from "src/utils";
 
 import { Bar } from "./Bar";
@@ -95,6 +96,12 @@ export const Grid = ({
     }
   }, [runId, gridRuns, selectedIsVisible, setSelectedIsVisible]);
 
+  const { summariesByRunId } = useGridTiSummariesStream({
+    dagId,
+    runIds: gridRuns?.map((dr: GridRunsResponse) => dr.run_id) ?? [],
+    states: gridRuns?.map((dr: GridRunsResponse) => dr.state),
+  });
+
   const { data: dagStructure } = useGridStructure({
     dagRunState,
     depth,
@@ -225,6 +232,7 @@ export const Grid = ({
                 onCellClick={handleCellClick}
                 run={dr}
                 showVersionIndicatorMode={showVersionIndicatorMode}
+                tiSummaries={summariesByRunId.get(dr.run_id)}
                 virtualItems={virtualItems}
               />
             ))}
diff --git 
a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskInstancesColumn.tsx 
b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskInstancesColumn.tsx
index 7727748fba7..05b32145514 100644
--- 
a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskInstancesColumn.tsx
+++ 
b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/TaskInstancesColumn.tsx
@@ -20,11 +20,10 @@ import { Box } from "@chakra-ui/react";
 import type { VirtualItem } from "@tanstack/react-virtual";
 import { useParams } from "react-router-dom";
 
-import type { GridRunsResponse } from "openapi/requests";
+import type { GridRunsResponse, GridTISummaries } from "openapi/requests";
 import type { LightGridTaskInstanceSummary } from "openapi/requests/types.gen";
 import { VersionIndicatorOptions } from 
"src/constants/showVersionIndicatorOptions";
 import { useHover } from "src/context/hover";
-import { useGridTiSummaries } from "src/queries/useGridTISummaries.ts";
 
 import { GridTI } from "./GridTI";
 import { DagVersionIndicator } from "./VersionIndicator";
@@ -35,6 +34,7 @@ type Props = {
   readonly onCellClick?: () => void;
   readonly run: GridRunsResponse;
   readonly showVersionIndicatorMode?: VersionIndicatorOptions;
+  readonly tiSummaries?: GridTISummaries;
   readonly virtualItems?: Array<VirtualItem>;
 };
 
@@ -45,23 +45,18 @@ export const TaskInstancesColumn = ({
   onCellClick,
   run,
   showVersionIndicatorMode,
+  tiSummaries,
   virtualItems,
 }: Props) => {
   const { dagId = "", runId } = useParams();
   const isSelected = runId === run.run_id;
-  const { data: gridTISummaries } = useGridTiSummaries({
-    dagId,
-    isSelected,
-    runId: run.run_id,
-    state: run.state,
-  });
+
   const { hoveredRunId, setHoveredRunId } = useHover();
 
   const itemsToRender =
     virtualItems ?? nodes.map((_, index) => ({ index, size: ROW_HEIGHT, start: 
index * ROW_HEIGHT }));
 
-  const taskInstances = gridTISummaries?.task_instances ?? [];
-
+  const taskInstances = tiSummaries?.task_instances ?? [];
   const taskInstanceMap = new Map<string, LightGridTaskInstanceSummary>();
 
   for (const ti of taskInstances) {
diff --git 
a/airflow-core/src/airflow/ui/src/pages/GroupTaskInstance/GroupTaskInstance.tsx 
b/airflow-core/src/airflow/ui/src/pages/GroupTaskInstance/GroupTaskInstance.tsx
index a4efef77265..abb741e9498 100644
--- 
a/airflow-core/src/airflow/ui/src/pages/GroupTaskInstance/GroupTaskInstance.tsx
+++ 
b/airflow-core/src/airflow/ui/src/pages/GroupTaskInstance/GroupTaskInstance.tsx
@@ -22,14 +22,15 @@ import { MdOutlineTask } from "react-icons/md";
 import { useParams } from "react-router-dom";
 
 import { DetailsLayout } from "src/layouts/Details/DetailsLayout";
-import { useGridTiSummaries } from "src/queries/useGridTISummaries.ts";
+import { useGridTiSummariesStream } from "src/queries/useGridTISummaries.ts";
 
 import { Header } from "./Header";
 
 export const GroupTaskInstance = () => {
   const { dagId = "", groupId = "", runId = "" } = useParams();
   const { t: translate } = useTranslation("dag");
-  const { data: gridTISummaries } = useGridTiSummaries({ dagId, runId });
+  const { summariesByRunId } = useGridTiSummariesStream({ dagId, runIds: runId 
? [runId] : [] });
+  const gridTISummaries = summariesByRunId.get(runId);
   const taskInstance = gridTISummaries?.task_instances.find((ti) => ti.task_id 
=== groupId);
 
   const tabs = [{ icon: <MdOutlineTask />, label: 
translate("tabs.taskInstances"), value: "" }];
diff --git 
a/airflow-core/src/airflow/ui/src/pages/MappedTaskInstance/MappedTaskInstance.tsx
 
b/airflow-core/src/airflow/ui/src/pages/MappedTaskInstance/MappedTaskInstance.tsx
index 6c231c35bcd..6302254eafa 100644
--- 
a/airflow-core/src/airflow/ui/src/pages/MappedTaskInstance/MappedTaskInstance.tsx
+++ 
b/airflow-core/src/airflow/ui/src/pages/MappedTaskInstance/MappedTaskInstance.tsx
@@ -22,14 +22,15 @@ import { MdOutlineTask } from "react-icons/md";
 import { useParams } from "react-router-dom";
 
 import { DetailsLayout } from "src/layouts/Details/DetailsLayout";
-import { useGridTiSummaries } from "src/queries/useGridTISummaries.ts";
+import { useGridTiSummariesStream } from "src/queries/useGridTISummaries.ts";
 
 import { Header } from "./Header";
 
 export const MappedTaskInstance = () => {
   const { dagId = "", runId = "", taskId = "" } = useParams();
   const { t: translate } = useTranslation("dag");
-  const { data: gridTISummaries } = useGridTiSummaries({ dagId, runId });
+  const { summariesByRunId } = useGridTiSummariesStream({ dagId, runIds: runId 
? [runId] : [] });
+  const gridTISummaries = summariesByRunId.get(runId);
 
   const taskInstance = gridTISummaries?.task_instances.find((ti) => ti.task_id 
=== taskId);
   let taskCount: number = 0;
diff --git 
a/airflow-core/src/airflow/ui/src/pages/TaskInstance/TaskInstance.tsx 
b/airflow-core/src/airflow/ui/src/pages/TaskInstance/TaskInstance.tsx
index dd956a8d4dd..9f22e299c83 100644
--- a/airflow-core/src/airflow/ui/src/pages/TaskInstance/TaskInstance.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/TaskInstance/TaskInstance.tsx
@@ -28,7 +28,7 @@ import { useTaskInstanceServiceGetMappedTaskInstance } from 
"openapi/queries";
 import { usePluginTabs } from "src/hooks/usePluginTabs";
 import { useRequiredActionTabs } from "src/hooks/useRequiredActionTabs";
 import { DetailsLayout } from "src/layouts/Details/DetailsLayout";
-import { useGridTiSummaries } from "src/queries/useGridTISummaries.ts";
+import { useGridTiSummariesStream } from "src/queries/useGridTISummaries.ts";
 import { isStatePending, useAutoRefresh } from "src/utils";
 
 import { Header } from "./Header";
@@ -76,7 +76,8 @@ export const TaskInstance = () => {
     },
   );
 
-  const { data: gridTISummaries } = useGridTiSummaries({ dagId, runId });
+  const { summariesByRunId } = useGridTiSummariesStream({ dagId, runIds: runId 
? [runId] : [] });
+  const gridTISummaries = summariesByRunId.get(runId);
 
   const taskInstanceSummary = gridTISummaries?.task_instances.find((ti) => 
ti.task_id === taskId);
   const taskCount = Object.entries(taskInstanceSummary?.child_states ?? {})
diff --git a/airflow-core/src/airflow/ui/src/queries/useClearRun.ts 
b/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
index b631eb48cf4..af234cc39fc 100644
--- a/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
@@ -27,7 +27,6 @@ import {
   UseGanttServiceGetGanttDataKeyFn,
   useTaskInstanceServiceGetTaskInstancesKey,
   UseGridServiceGetGridRunsKeyFn,
-  UseGridServiceGetGridTiSummariesKeyFn,
 } from "openapi/queries";
 import { toaster } from "src/components/ui";
 
@@ -62,7 +61,6 @@ export const useClearDagRun = ({
       [useClearDagRunDryRunKey, dagId],
       UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]),
       UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
-      UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }, [{ 
dagId, runId: dagRunId }]),
     ];
 
     await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ 
queryKey: key })));
diff --git a/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts 
b/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
index 3130041a152..e8d13ad000e 100644
--- a/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
@@ -26,8 +26,6 @@ import {
   UseTaskInstanceServiceGetMappedTaskInstanceKeyFn,
   useTaskInstanceServicePostClearTaskInstances,
   UseGridServiceGetGridRunsKeyFn,
-  UseGridServiceGetGridTiSummariesKeyFn,
-  useGridServiceGetGridTiSummariesKey,
 } from "openapi/queries";
 import type { ApiError } from "openapi/requests";
 import type { ClearTaskInstancesBody, TaskInstanceCollectionResponse } from 
"openapi/requests/types.gen";
@@ -103,10 +101,6 @@ export const useClearTaskInstances = ({
       ),
     ];
 
-    // Check if this clear operation affects multiple DAG runs
-    const { include_future: includeFuture, include_past: includePast } = 
variables.requestBody;
-    const affectsMultipleRuns = includeFuture === true || includePast === true;
-
     const queryKeys = [
       ...taskInstanceKeys,
       UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
@@ -115,9 +109,6 @@ export const useClearTaskInstances = ({
       [usePatchTaskInstanceDryRunKey, dagId, dagRunId],
       UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]),
       UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
-      affectsMultipleRuns
-        ? [useGridServiceGetGridTiSummariesKey, { dagId }]
-        : UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }),
     ];
 
     await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ 
queryKey: key })));
diff --git a/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts 
b/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts
index 545168280e1..0d7e8f266c3 100644
--- a/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts
@@ -25,7 +25,6 @@ import {
   useTaskInstanceServiceGetTaskInstancesKey,
   useDagRunServiceGetDagRunsKey,
   UseDagRunServiceGetDagRunKeyFn,
-  UseGridServiceGetGridTiSummariesKeyFn,
   useTaskInstanceServiceGetHitlDetailsKey,
 } from "openapi/queries";
 import { toaster } from "src/components/ui";
@@ -59,7 +58,6 @@ export const useDeleteTaskInstance = ({
   const onSuccess = async () => {
     const queryKeys = [
       UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
-      UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }, [{ 
dagId, runId: dagRunId }]),
       [useDagRunServiceGetDagRunsKey],
       [useTaskInstanceServiceGetTaskInstancesKey],
       [useTaskInstanceServiceGetTaskInstanceKey, { dagId, dagRunId, mapIndex, 
taskId }],
diff --git a/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts 
b/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
index 6b75402ab6c..28de4bc473d 100644
--- a/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
@@ -16,45 +16,111 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-import { useGridServiceGetGridTiSummaries } from "openapi/queries";
-import type { TaskInstanceState } from "openapi/requests";
+import { useEffect, useState } from "react";
+
+import type { GridTISummaries, TaskInstanceState } from "openapi/requests";
+import { OpenAPI } from "openapi/requests/core/OpenAPI";
 import { isStatePending, useAutoRefresh } from "src/utils";
 
-export const useGridTiSummaries = ({
+/**
+ * Streams TI summaries for all grid runs over a single HTTP connection 
(NDJSON).
+ *
+ * The server emits one JSON line per Dag run as soon as that run's task
+ * instances have been computed, so the grid renders each column progressively
+ * rather than waiting for the entire payload.  This eliminates the N+1 request
+ * pattern without loading all runs into one large query.
+ *
+ * Auto-refreshes while any run is still in a pending state.
+ */
+export const useGridTiSummariesStream = ({
   dagId,
-  enabled,
-  isSelected,
-  runId,
-  state,
+  runIds,
+  states,
 }: {
   dagId: string;
-  enabled?: boolean;
-  isSelected?: boolean;
-  runId: string;
-  state?: TaskInstanceState | null | undefined;
+  runIds: Array<string>;
+  states?: Array<TaskInstanceState | null | undefined>;
 }) => {
+  const [summariesByRunId, setSummariesByRunId] = useState<Map<string, 
GridTISummaries>>(new Map());
+  const [refreshTick, setRefreshTick] = useState(0);
+
   const baseRefetchInterval = useAutoRefresh({ dagId });
-  const slowRefreshMultiplier = 5;
-  const refetchInterval =
-    typeof baseRefetchInterval === "number"
-      ? baseRefetchInterval * (isSelected ? 1 : slowRefreshMultiplier)
-      : baseRefetchInterval;
-
-  const { data: gridTiSummaries, ...rest } = useGridServiceGetGridTiSummaries(
-    {
-      dagId,
-      runId,
-    },
-    undefined,
-    {
-      enabled: Boolean(runId) && Boolean(dagId) && enabled,
-      placeholderData: (prev) => prev,
-      refetchInterval: (query) =>
-        ((state !== undefined && isStatePending(state)) ||
-          query.state.data?.task_instances.some((ti) => 
isStatePending(ti.state))) &&
-        refetchInterval,
-    },
-  );
-
-  return { data: gridTiSummaries, ...rest };
+  const hasActiveRuns = states?.some((state) => state !== undefined && 
isStatePending(state)) ?? false;
+
+  // Stable key so the effect only re-fires when the run list actually changes.
+  const runIdsKey = runIds.join(",");
+
+  // Stream (or re-stream) whenever the run list or refresh tick changes.
+  useEffect(() => {
+    if (!dagId || runIds.length === 0) {
+      return undefined;
+    }
+
+    const abortController = new AbortController();
+    let reader: ReadableStreamDefaultReader<Uint8Array> | undefined;
+
+    const fetchStream = async () => {
+      // Keep stale data visible while the new stream loads — columns update in
+      // place as fresh lines arrive rather than flashing blank.
+      try {
+        const params = new URLSearchParams(runIds.map((id) => ["run_ids", 
id]));
+        const response = await 
fetch(`${OpenAPI.BASE}/ui/grid/ti_summaries/${dagId}?${params}`, {
+          signal: abortController.signal,
+        });
+
+        if (!response.ok || !response.body) {
+          return;
+        }
+
+        reader = response.body.getReader();
+        const decoder = new TextDecoder();
+        let buffer = "";
+
+        // eslint-disable-next-line no-await-in-loop -- sequential reads 
required; each chunk depends on the previous buffer state
+        for (let result = await reader.read(); !result.done; result = await 
reader.read()) {
+          const { value } = result;
+
+          buffer += decoder.decode(value, { stream: true });
+
+          const lines = buffer.split("\n");
+
+          buffer = lines.pop() ?? "";
+
+          for (const line of lines.filter((ln) => ln.trim())) {
+            const summary = JSON.parse(line) as GridTISummaries;
+
+            setSummariesByRunId((prev) => new Map(prev).set(summary.run_id, 
summary));
+          }
+        }
+      } catch (error) {
+        if ((error as Error).name !== "AbortError") {
+          // eslint-disable-next-line no-console
+          console.error("TI summaries stream error:", error);
+        }
+      }
+    };
+
+    void fetchStream();
+
+    return () => {
+      abortController.abort();
+      void reader?.cancel();
+    };
+    // eslint-disable-next-line react-hooks/exhaustive-deps -- runIdsKey 
(stable join) intentionally replaces runIds array to avoid spurious re-streams
+  }, [dagId, runIdsKey, refreshTick]);
+
+  // Trigger a re-stream periodically while active runs are in flight.
+  useEffect(() => {
+    if (!hasActiveRuns || typeof baseRefetchInterval !== "number") {
+      return undefined;
+    }
+
+    const timer = setInterval(() => {
+      setRefreshTick((tick) => tick + 1);
+    }, baseRefetchInterval);
+
+    return () => clearInterval(timer);
+  }, [hasActiveRuns, baseRefetchInterval]);
+
+  return { summariesByRunId };
 };
diff --git a/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts 
b/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts
index 25ea7fad646..a49fcba8592 100644
--- a/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts
+++ b/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts
@@ -25,7 +25,6 @@ import {
   useDagRunServicePatchDagRun,
   useTaskInstanceServiceGetTaskInstancesKey,
   UseGridServiceGetGridRunsKeyFn,
-  UseGridServiceGetGridTiSummariesKeyFn,
 } from "openapi/queries";
 import { toaster } from "src/components/ui";
 
@@ -60,7 +59,6 @@ export const usePatchDagRun = ({
       [useTaskInstanceServiceGetTaskInstancesKey, { dagId, dagRunId }],
       [useClearDagRunDryRunKey, dagId],
       UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]),
-      UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }, [{ 
dagId, runId: dagRunId }]),
     ];
 
     await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ 
queryKey: key })));
diff --git a/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts 
b/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts
index c1ae88cacb1..b94fadf8e13 100644
--- a/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts
+++ b/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts
@@ -25,8 +25,6 @@ import {
   useTaskInstanceServiceGetTaskInstancesKey,
   useTaskInstanceServicePatchTaskInstance,
   UseGridServiceGetGridRunsKeyFn,
-  UseGridServiceGetGridTiSummariesKeyFn,
-  useGridServiceGetGridTiSummariesKey,
 } from "openapi/queries";
 import { toaster } from "src/components/ui";
 
@@ -59,19 +57,7 @@ export const usePatchTaskInstance = ({
     });
   };
 
-  const onSuccessFn = async (
-    _: unknown,
-    variables: {
-      dagId: string;
-      dagRunId: string;
-      requestBody: { include_future?: boolean; include_past?: boolean };
-      taskId: string;
-    },
-  ) => {
-    // Check if this patch operation affects multiple DAG runs
-    const { include_future: includeFuture, include_past: includePast } = 
variables.requestBody;
-    const affectsMultipleRuns = includeFuture === true || includePast === true;
-
+  const onSuccessFn = async () => {
     const queryKeys = [
       UseTaskInstanceServiceGetTaskInstanceKeyFn({ dagId, dagRunId, taskId }),
       UseTaskInstanceServiceGetMappedTaskInstanceKeyFn({ dagId, dagRunId, 
mapIndex, taskId }),
@@ -79,9 +65,6 @@ export const usePatchTaskInstance = ({
       [usePatchTaskInstanceDryRunKey, dagId, dagRunId, { mapIndex, taskId }],
       [useClearTaskInstancesDryRunKey, dagId],
       UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]),
-      affectsMultipleRuns
-        ? [useGridServiceGetGridTiSummariesKey, { dagId }]
-        : UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }),
     ];
 
     await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ 
queryKey: key })));
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
index c9642f7c492..dbcf87149e5 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
@@ -17,6 +17,7 @@
 
 from __future__ import annotations
 
+import json
 from datetime import timedelta
 from operator import attrgetter
 
@@ -566,9 +567,9 @@ class TestGetGridDataEndpoint:
 
         # Also verify that TI summaries include a leaf entry for the removed 
task
         with assert_queries_count(4):
-            ti_resp = test_client.get(f"/grid/ti_summaries/{DAG_ID_3}/run_3")
+            ti_resp = 
test_client.get(f"/grid/ti_summaries/{DAG_ID_3}?run_ids=run_3")
         assert ti_resp.status_code == 200
-        ti_payload = ti_resp.json()
+        [ti_payload] = self._parse_ndjson(ti_resp)
         assert ti_payload["dag_id"] == DAG_ID_3
         assert ti_payload["run_id"] == "run_3"
         # Find the removed task summary; it should exist even if not in 
current serialized DAG structure
@@ -697,9 +698,9 @@ class TestGetGridDataEndpoint:
         session.commit()
 
         with assert_queries_count(4):
-            response = 
test_client.get(f"/grid/ti_summaries/{DAG_ID_4}/{run_id}")
+            response = 
test_client.get(f"/grid/ti_summaries/{DAG_ID_4}?run_ids={run_id}")
         assert response.status_code == 200
-        actual = response.json()
+        [actual] = self._parse_ndjson(response)
         expected = {
             "dag_id": "test_dag_4",
             "run_id": "run_4-1",
@@ -797,9 +798,9 @@ class TestGetGridDataEndpoint:
         session.commit()
 
         with assert_queries_count(4):
-            response = test_client.get(f"/grid/ti_summaries/{DAG_ID}/{run_id}")
+            response = 
test_client.get(f"/grid/ti_summaries/{DAG_ID}?run_ids={run_id}")
         assert response.status_code == 200
-        data = response.json()
+        [data] = self._parse_ndjson(response)
         actual = data["task_instances"]
 
         def sort_dict(in_dict):
@@ -1114,3 +1115,58 @@ class TestGetGridDataEndpoint:
         nodes = response.json()
         task_ids = sorted([node["id"] for node in nodes])
         assert task_ids == expected_task_ids, description
+
+    @staticmethod
+    def _parse_ndjson(response) -> list[dict]:
+        """Parse NDJSON streaming response into a list of dicts."""
+        return [json.loads(line) for line in response.text.splitlines() if 
line.strip()]
+
+    def test_grid_ti_summaries_stream_returns_all_runs(self, session, 
test_client):
+        """Streaming endpoint returns one NDJSON line per requested run_id."""
+        session.commit()
+
+        run_ids = ["run_1", "run_2"]
+        response = test_client.get(f"/grid/ti_summaries/{DAG_ID}", 
params={"run_ids": run_ids})
+        assert response.status_code == 200
+        assert "ndjson" in response.headers.get("content-type", "")
+
+        summaries = self._parse_ndjson(response)
+        assert len(summaries) == len(run_ids)
+        returned_run_ids = {s["run_id"] for s in summaries}
+        assert returned_run_ids == set(run_ids)
+
+        for summary in summaries:
+            assert summary["dag_id"] == DAG_ID
+            assert len(summary["task_instances"]) > 0
+
+    def test_grid_ti_summaries_stream_skips_missing_runs(self, session, 
test_client):
+        """Streaming endpoint silently skips run_ids that have no task 
instances."""
+        session.commit()
+
+        response = test_client.get(
+            f"/grid/ti_summaries/{DAG_ID}", params={"run_ids": ["run_1", 
"nonexistent_run"]}
+        )
+        assert response.status_code == 200
+        summaries = self._parse_ndjson(response)
+        assert len(summaries) == 1
+        assert summaries[0]["run_id"] == "run_1"
+
+    def test_grid_ti_summaries_stream_empty_run_ids(self, session, 
test_client):
+        """Streaming endpoint with no run_ids returns an empty body."""
+        session.commit()
+
+        response = test_client.get(f"/grid/ti_summaries/{DAG_ID}")
+        assert response.status_code == 200
+        assert self._parse_ndjson(response) == []
+
+    def test_grid_ti_summaries_stream_deduplicates_serdag_loads(self, session, 
test_client):
+        """Serialized Dag is loaded once even when multiple runs share the 
same version."""
+        session.commit()
+
+        run_ids = ["run_1", "run_2"]
+        # 2 auth queries + 1 serdag query shared across both runs
+        # + 1 TI query per run = 5 total (not 1 serdag per run which would be 
6+).
+        with assert_queries_count(5):
+            response = test_client.get(f"/grid/ti_summaries/{DAG_ID}", 
params={"run_ids": run_ids})
+        assert response.status_code == 200
+        assert len(self._parse_ndjson(response)) == len(run_ids)

Reply via email to