pierrejeambrun commented on code in PR #62369:
URL: https://github.com/apache/airflow/pull/62369#discussion_r2925040655
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py:
##########
@@ -1114,3 +1115,62 @@ def test_structure_with_depth(self, test_client, dag_id,
params, expected_task_i
nodes = response.json()
task_ids = sorted([node["id"] for node in nodes])
assert task_ids == expected_task_ids, description
+
+ # ------------------------------------------------------------------
helpers
+
+ @staticmethod
+ def _parse_ndjson(response) -> list[dict]:
+ """Parse NDJSON streaming response into a list of dicts."""
+ return [json.loads(line) for line in response.text.splitlines() if
line.strip()]
+
+ # ------------------------------------------------------------------ tests
+
+ def test_grid_ti_summaries_stream_returns_all_runs(self, session,
test_client):
+ """Streaming endpoint returns one NDJSON line per requested run_id."""
+ session.commit()
+
+ run_ids = ["run_1", "run_2"]
+ query = "&".join(f"run_ids={r}" for r in run_ids)
+ response = test_client.get(f"/grid/ti_summaries/{DAG_ID}?{query}")
Review Comment:
No need to do that manually. use (params argument give your list directly
there, it will be correctly passed down by the query client)
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py:
##########
@@ -1114,3 +1115,62 @@ def test_structure_with_depth(self, test_client, dag_id,
params, expected_task_i
nodes = response.json()
task_ids = sorted([node["id"] for node in nodes])
assert task_ids == expected_task_ids, description
+
+ # ------------------------------------------------------------------
helpers
Review Comment:
To remove (comment)
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py:
##########
@@ -1114,3 +1115,62 @@ def test_structure_with_depth(self, test_client, dag_id,
params, expected_task_i
nodes = response.json()
task_ids = sorted([node["id"] for node in nodes])
assert task_ids == expected_task_ids, description
+
+ # ------------------------------------------------------------------
helpers
+
+ @staticmethod
+ def _parse_ndjson(response) -> list[dict]:
+ """Parse NDJSON streaming response into a list of dicts."""
+ return [json.loads(line) for line in response.text.splitlines() if
line.strip()]
+
+ # ------------------------------------------------------------------ tests
+
+ def test_grid_ti_summaries_stream_returns_all_runs(self, session,
test_client):
+ """Streaming endpoint returns one NDJSON line per requested run_id."""
+ session.commit()
+
+ run_ids = ["run_1", "run_2"]
+ query = "&".join(f"run_ids={r}" for r in run_ids)
+ response = test_client.get(f"/grid/ti_summaries/{DAG_ID}?{query}")
+ assert response.status_code == 200
+ assert "ndjson" in response.headers.get("content-type", "")
+
+ summaries = self._parse_ndjson(response)
+ assert len(summaries) == len(run_ids)
+ returned_run_ids = {s["run_id"] for s in summaries}
+ assert returned_run_ids == set(run_ids)
+
+ for summary in summaries:
+ assert summary["dag_id"] == DAG_ID
+ assert len(summary["task_instances"]) > 0
+
+ def test_grid_ti_summaries_stream_skips_missing_runs(self, session,
test_client):
+ """Streaming endpoint silently skips run_ids that have no task
instances."""
+ session.commit()
+
+ response =
test_client.get(f"/grid/ti_summaries/{DAG_ID}?run_ids=run_1&run_ids=nonexistent_run")
+ assert response.status_code == 200
+ summaries = self._parse_ndjson(response)
+ assert len(summaries) == 1
+ assert summaries[0]["run_id"] == "run_1"
+
+ def test_grid_ti_summaries_stream_empty_run_ids(self, session,
test_client):
+ """Streaming endpoint with no run_ids returns an empty body."""
+ session.commit()
+
+ response = test_client.get(f"/grid/ti_summaries/{DAG_ID}")
+ assert response.status_code == 200
+ assert self._parse_ndjson(response) == []
+
+ def test_grid_ti_summaries_stream_deduplicates_serdag_loads(self, session,
test_client):
+ """Serialized Dag is loaded once even when multiple runs share the
same version."""
+ session.commit()
+
+ run_ids = ["run_1", "run_2"]
+ query = "&".join(f"run_ids={r}" for r in run_ids)
+ # 2 auth queries + 1 serdag query shared across both runs
+ # + 1 TI query per run = 5 total (not 1 serdag per run which would be
6+).
+ with assert_queries_count(5):
+ response = test_client.get(f"/grid/ti_summaries/{DAG_ID}?{query}")
Review Comment:
same
##########
airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts:
##########
@@ -79,9 +77,6 @@ export const usePatchTaskInstance = ({
[usePatchTaskInstanceDryRunKey, dagId, dagRunId, { mapIndex, taskId }],
[useClearTaskInstancesDryRunKey, dagId],
UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]),
- affectsMultipleRuns
- ? [useGridServiceGetGridTiSummariesKey, { dagId }]
- : UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }),
Review Comment:
At multiple places we are not invalidating the streaming response ? We are
relying on the 'hasActiveRuns` to refresh the stream right? Since
`UseGridServiceGetGridRunsKeyFn` I guess it's enough.
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py:
##########
@@ -1114,3 +1115,62 @@ def test_structure_with_depth(self, test_client, dag_id,
params, expected_task_i
nodes = response.json()
task_ids = sorted([node["id"] for node in nodes])
assert task_ids == expected_task_ids, description
+
+ # ------------------------------------------------------------------
helpers
+
+ @staticmethod
+ def _parse_ndjson(response) -> list[dict]:
+ """Parse NDJSON streaming response into a list of dicts."""
+ return [json.loads(line) for line in response.text.splitlines() if
line.strip()]
+
+ # ------------------------------------------------------------------ tests
Review Comment:
To remove (comment)
##########
airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts:
##########
@@ -16,45 +16,110 @@
* specific language governing permissions and limitations
* under the License.
*/
-import { useGridServiceGetGridTiSummaries } from "openapi/queries";
-import type { TaskInstanceState } from "openapi/requests";
+import { useEffect, useState } from "react";
+
+import { OpenAPI } from "openapi/requests/core/OpenAPI";
+import type { GridTISummaries, TaskInstanceState } from "openapi/requests";
import { isStatePending, useAutoRefresh } from "src/utils";
-export const useGridTiSummaries = ({
+/**
+ * Streams TI summaries for all grid runs over a single HTTP connection
(NDJSON).
+ *
+ * The server emits one JSON line per Dag run as soon as that run's task
+ * instances have been computed, so the grid renders each column progressively
+ * rather than waiting for the entire payload. This eliminates the N+1 request
+ * pattern without loading all runs into one large query.
+ *
+ * Auto-refreshes while any run is still in a pending state.
+ */
+export const useGridTiSummariesStream = ({
dagId,
- enabled,
- isSelected,
- runId,
- state,
+ runIds,
+ states,
}: {
dagId: string;
- enabled?: boolean;
- isSelected?: boolean;
- runId: string;
- state?: TaskInstanceState | null | undefined;
+ runIds: Array<string>;
+ states?: Array<TaskInstanceState | null | undefined>;
}) => {
+ const [summariesByRunId, setSummariesByRunId] = useState<Map<string,
GridTISummaries>>(new Map());
+ const [isStreaming, setIsStreaming] = useState(false);
+ const [refreshTick, setRefreshTick] = useState(0);
+
const baseRefetchInterval = useAutoRefresh({ dagId });
- const slowRefreshMultiplier = 5;
- const refetchInterval =
- typeof baseRefetchInterval === "number"
- ? baseRefetchInterval * (isSelected ? 1 : slowRefreshMultiplier)
- : baseRefetchInterval;
-
- const { data: gridTiSummaries, ...rest } = useGridServiceGetGridTiSummaries(
- {
- dagId,
- runId,
- },
- undefined,
- {
- enabled: Boolean(runId) && Boolean(dagId) && enabled,
- placeholderData: (prev) => prev,
- refetchInterval: (query) =>
- ((state !== undefined && isStatePending(state)) ||
- query.state.data?.task_instances.some((ti) =>
isStatePending(ti.state))) &&
- refetchInterval,
- },
- );
-
- return { data: gridTiSummaries, ...rest };
+ const hasActiveRuns = states?.some((s) => s !== undefined &&
isStatePending(s)) ?? false;
+
+ // Stable key so the effect only re-fires when the run list actually changes.
+ const runIdsKey = runIds.join(",");
+
+ // Stream (or re-stream) whenever the run list or refresh tick changes.
+ useEffect(() => {
Review Comment:
useEffect are usually not necessary and a bad practice in most cases. Is
there a work around to get rid of this?
##########
airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts:
##########
@@ -16,45 +16,110 @@
* specific language governing permissions and limitations
* under the License.
*/
-import { useGridServiceGetGridTiSummaries } from "openapi/queries";
-import type { TaskInstanceState } from "openapi/requests";
+import { useEffect, useState } from "react";
+
+import { OpenAPI } from "openapi/requests/core/OpenAPI";
+import type { GridTISummaries, TaskInstanceState } from "openapi/requests";
import { isStatePending, useAutoRefresh } from "src/utils";
-export const useGridTiSummaries = ({
+/**
+ * Streams TI summaries for all grid runs over a single HTTP connection
(NDJSON).
+ *
+ * The server emits one JSON line per Dag run as soon as that run's task
+ * instances have been computed, so the grid renders each column progressively
+ * rather than waiting for the entire payload. This eliminates the N+1 request
+ * pattern without loading all runs into one large query.
+ *
+ * Auto-refreshes while any run is still in a pending state.
+ */
+export const useGridTiSummariesStream = ({
dagId,
- enabled,
- isSelected,
- runId,
- state,
+ runIds,
+ states,
}: {
dagId: string;
- enabled?: boolean;
- isSelected?: boolean;
- runId: string;
- state?: TaskInstanceState | null | undefined;
+ runIds: Array<string>;
+ states?: Array<TaskInstanceState | null | undefined>;
}) => {
+ const [summariesByRunId, setSummariesByRunId] = useState<Map<string,
GridTISummaries>>(new Map());
+ const [isStreaming, setIsStreaming] = useState(false);
+ const [refreshTick, setRefreshTick] = useState(0);
+
const baseRefetchInterval = useAutoRefresh({ dagId });
- const slowRefreshMultiplier = 5;
- const refetchInterval =
- typeof baseRefetchInterval === "number"
- ? baseRefetchInterval * (isSelected ? 1 : slowRefreshMultiplier)
- : baseRefetchInterval;
-
- const { data: gridTiSummaries, ...rest } = useGridServiceGetGridTiSummaries(
- {
- dagId,
- runId,
- },
- undefined,
- {
- enabled: Boolean(runId) && Boolean(dagId) && enabled,
- placeholderData: (prev) => prev,
- refetchInterval: (query) =>
- ((state !== undefined && isStatePending(state)) ||
- query.state.data?.task_instances.some((ti) =>
isStatePending(ti.state))) &&
- refetchInterval,
- },
- );
-
- return { data: gridTiSummaries, ...rest };
+ const hasActiveRuns = states?.some((s) => s !== undefined &&
isStatePending(s)) ?? false;
+
+ // Stable key so the effect only re-fires when the run list actually changes.
+ const runIdsKey = runIds.join(",");
+
+ // Stream (or re-stream) whenever the run list or refresh tick changes.
+ useEffect(() => {
+ if (!dagId || runIds.length === 0) return;
+
+ const abortController = new AbortController();
+ let reader: ReadableStreamDefaultReader<Uint8Array> | null = null;
+
+ const fetchStream = async () => {
+ setIsStreaming(true);
+ // Keep stale data visible while the new stream loads — columns update in
+ // place as fresh lines arrive rather than flashing blank.
+ try {
+ const queryString = runIds.map((id) =>
`run_ids=${encodeURIComponent(id)}`).join("&");
+ const response = await
fetch(`${OpenAPI.BASE}/ui/grid/ti_summaries/${dagId}?${queryString}`, {
+ signal: abortController.signal,
+ });
+
+ if (!response.ok || !response.body) return;
+
+ reader = response.body.getReader();
+ const decoder = new TextDecoder();
+ let buffer = "";
+
+ while (true) {
+ const { done, value } = await reader.read();
+
+ if (done) break;
+ buffer += decoder.decode(value, { stream: true });
+
+ // Each complete line is one serialised GridTISummaries object.
+ const lines = buffer.split("\n");
+ buffer = lines.pop() ?? "";
+
+ for (const line of lines) {
+ if (line.trim()) {
+ const summary = JSON.parse(line) as GridTISummaries;
+
+ setSummariesByRunId((prev) => new Map(prev).set(summary.run_id,
summary));
+ }
+ }
+ }
+ } catch (error) {
+ if ((error as Error).name !== "AbortError") {
+ console.error("TI summaries stream error:", error);
+ }
+ } finally {
+ setIsStreaming(false);
+ }
+ };
+
+ fetchStream();
+
+ return () => {
+ abortController.abort();
+ reader?.cancel();
+ };
+ // eslint-disable-next-line react-hooks/exhaustive-deps
Review Comment:
Why ignoreing deps?
##########
airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts:
##########
@@ -16,45 +16,110 @@
* specific language governing permissions and limitations
* under the License.
*/
-import { useGridServiceGetGridTiSummaries } from "openapi/queries";
-import type { TaskInstanceState } from "openapi/requests";
+import { useEffect, useState } from "react";
+
+import { OpenAPI } from "openapi/requests/core/OpenAPI";
+import type { GridTISummaries, TaskInstanceState } from "openapi/requests";
import { isStatePending, useAutoRefresh } from "src/utils";
-export const useGridTiSummaries = ({
+/**
+ * Streams TI summaries for all grid runs over a single HTTP connection
(NDJSON).
+ *
+ * The server emits one JSON line per Dag run as soon as that run's task
+ * instances have been computed, so the grid renders each column progressively
+ * rather than waiting for the entire payload. This eliminates the N+1 request
+ * pattern without loading all runs into one large query.
+ *
+ * Auto-refreshes while any run is still in a pending state.
+ */
+export const useGridTiSummariesStream = ({
dagId,
- enabled,
- isSelected,
- runId,
- state,
+ runIds,
+ states,
}: {
dagId: string;
- enabled?: boolean;
- isSelected?: boolean;
- runId: string;
- state?: TaskInstanceState | null | undefined;
+ runIds: Array<string>;
+ states?: Array<TaskInstanceState | null | undefined>;
}) => {
+ const [summariesByRunId, setSummariesByRunId] = useState<Map<string,
GridTISummaries>>(new Map());
+ const [isStreaming, setIsStreaming] = useState(false);
+ const [refreshTick, setRefreshTick] = useState(0);
+
const baseRefetchInterval = useAutoRefresh({ dagId });
- const slowRefreshMultiplier = 5;
- const refetchInterval =
- typeof baseRefetchInterval === "number"
- ? baseRefetchInterval * (isSelected ? 1 : slowRefreshMultiplier)
- : baseRefetchInterval;
-
- const { data: gridTiSummaries, ...rest } = useGridServiceGetGridTiSummaries(
- {
- dagId,
- runId,
- },
- undefined,
- {
- enabled: Boolean(runId) && Boolean(dagId) && enabled,
- placeholderData: (prev) => prev,
- refetchInterval: (query) =>
- ((state !== undefined && isStatePending(state)) ||
- query.state.data?.task_instances.some((ti) =>
isStatePending(ti.state))) &&
- refetchInterval,
- },
- );
-
- return { data: gridTiSummaries, ...rest };
+ const hasActiveRuns = states?.some((s) => s !== undefined &&
isStatePending(s)) ?? false;
+
+ // Stable key so the effect only re-fires when the run list actually changes.
+ const runIdsKey = runIds.join(",");
+
+ // Stream (or re-stream) whenever the run list or refresh tick changes.
+ useEffect(() => {
+ if (!dagId || runIds.length === 0) return;
+
+ const abortController = new AbortController();
+ let reader: ReadableStreamDefaultReader<Uint8Array> | null = null;
+
+ const fetchStream = async () => {
+ setIsStreaming(true);
+ // Keep stale data visible while the new stream loads — columns update in
+ // place as fresh lines arrive rather than flashing blank.
+ try {
+ const queryString = runIds.map((id) =>
`run_ids=${encodeURIComponent(id)}`).join("&");
+ const response = await
fetch(`${OpenAPI.BASE}/ui/grid/ti_summaries/${dagId}?${queryString}`, {
+ signal: abortController.signal,
+ });
+
+ if (!response.ok || !response.body) return;
+
Review Comment:
Can we use react query and the generated hook? To achieve this? Manual
encoding of parameters and fetch is really inconsistent with the rest of the
code base.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -437,31 +374,93 @@ def get_node_sumaries():
if node["type"] == "task":
node["child_states"] = None
yield node
-
- # For good history: add synthetic leaf nodes for task_ids that have
TIs in this run
- # but are not present in the current DAG structure (e.g. removed tasks)
missing_task_ids = set(ti_details.keys()) - yielded_task_ids
for task_id in sorted(missing_task_ids):
detail = ti_details[task_id]
- # Create a leaf task node with aggregated state from its TIs
agg = _get_aggs_for_node(detail)
yield {
"task_id": task_id,
"task_display_name": task_id,
"type": "task",
"parent_id": None,
**agg,
- # Leaf tasks have no children
"child_states": None,
}
- task_instances = list(get_node_sumaries())
- # If a group id and a task id collide, prefer the group record
- group_ids = {n.get("task_id") for n in task_instances if n.get("type") ==
"group"}
- filtered = [n for n in task_instances if not (n.get("type") == "task" and
n.get("task_id") in group_ids)]
+ nodes = list(get_node_summaries())
+ group_ids = {n.get("task_id") for n in nodes if n.get("type") == "group"}
Review Comment:
Nit: I think we should keep the comment.
```
# If a group id and a task id collide, prefer the group record
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]