This is an automated email from the ASF dual-hosted git repository.
choo121600 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 41ec7a09398 Improve query validation, including for streaming (#67212)
41ec7a09398 is described below
commit 41ec7a09398337b433745e5b3f9af9fe1371d1b8
Author: Brent Bovenzi <[email protected]>
AuthorDate: Wed May 20 10:22:26 2026 -0400
Improve query validation, including for streaming (#67212)
---
.../airflow/ui/src/queries/gridViewQueryKeys.ts | 10 ++++++
.../ui/src/queries/useBulkClearTaskInstances.ts | 17 +++++++--
.../airflow/ui/src/queries/useBulkTaskInstances.ts | 3 ++
.../src/airflow/ui/src/queries/useClearRun.ts | 3 +-
.../ui/src/queries/useClearTaskInstances.ts | 3 +-
.../src/airflow/ui/src/queries/useDeleteDagRun.ts | 9 +++--
.../ui/src/queries/useDeleteTaskInstance.ts | 12 ++++++-
.../airflow/ui/src/queries/useGridTISummaries.ts | 42 ++++++++++++++++++++++
.../src/airflow/ui/src/queries/usePatchDagRun.ts | 9 ++++-
.../airflow/ui/src/queries/usePatchTaskGroup.ts | 3 +-
.../airflow/ui/src/queries/usePatchTaskInstance.ts | 3 +-
.../airflow/ui/src/queries/useUpdateHITLDetail.ts | 10 +++++-
12 files changed, 112 insertions(+), 12 deletions(-)
diff --git a/airflow-core/src/airflow/ui/src/queries/gridViewQueryKeys.ts
b/airflow-core/src/airflow/ui/src/queries/gridViewQueryKeys.ts
index 42060cac610..2c43ff47eae 100644
--- a/airflow-core/src/airflow/ui/src/queries/gridViewQueryKeys.ts
+++ b/airflow-core/src/airflow/ui/src/queries/gridViewQueryKeys.ts
@@ -21,7 +21,10 @@ import {
UseDagServiceGetDagDetailsKeyFn,
UseDagServiceGetLatestRunInfoKeyFn,
UseGridServiceGetGridRunsKeyFn,
+ useTaskInstanceServiceGetExtraLinksKey,
+ useTaskInstanceServiceGetLogKey,
UseTaskInstanceServiceGetTaskInstancesKeyFn,
+ useTaskInstanceServiceGetTaskInstanceTryDetailsKey,
} from "openapi/queries";
export const gridQueryKeys = (dagId: string) =>
@@ -32,3 +35,10 @@ export const gridQueryKeys = (dagId: string) =>
UseDagRunServiceGetDagRunsKeyFn({ dagId }, [{ dagId }]),
UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId: "~" }, [{
dagId, dagRunId: "~" }]),
] as const;
+
+/** Prefix keys for per-attempt TI caches that become stale after any
mutation. */
+export const tiPerAttemptQueryKeys = [
+ [useTaskInstanceServiceGetLogKey],
+ [useTaskInstanceServiceGetExtraLinksKey],
+ [useTaskInstanceServiceGetTaskInstanceTryDetailsKey],
+] as const;
diff --git
a/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts
b/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts
index 063b1c6d77d..8c14e36664c 100644
--- a/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts
@@ -20,11 +20,17 @@ import { useQueryClient } from "@tanstack/react-query";
import { useState } from "react";
import { useTranslation } from "react-i18next";
-import { useDagRunServiceGetDagRunsKey,
useTaskInstanceServiceGetTaskInstancesKey } from "openapi/queries";
+import {
+ useDagRunServiceGetDagRunsKey,
+ useTaskInstanceServiceGetMappedTaskInstanceKey,
+ useTaskInstanceServiceGetTaskInstancesKey,
+} from "openapi/queries";
import { TaskInstanceService } from "openapi/requests/services.gen";
import type { TaskInstanceResponse } from "openapi/requests/types.gen";
import { toaster } from "src/components/ui";
+import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
+
type Props = {
readonly clearSelections: VoidFunction;
readonly onSuccessConfirm: VoidFunction;
@@ -46,10 +52,15 @@ export const useBulkClearTaskInstances = ({
clearSelections, onSuccessConfirm }:
const [isPending, setIsPending] = useState(false);
const { t: translate } = useTranslation(["common", "dags"]);
- const invalidateQueries = async () => {
+ const invalidateQueries = async (dagIds: ReadonlySet<string>) => {
await Promise.all([
queryClient.invalidateQueries({ queryKey:
[useTaskInstanceServiceGetTaskInstancesKey] }),
queryClient.invalidateQueries({ queryKey:
[useDagRunServiceGetDagRunsKey] }),
+ queryClient.invalidateQueries({ queryKey:
[useTaskInstanceServiceGetMappedTaskInstanceKey] }),
+ ...tiPerAttemptQueryKeys.map((key) => queryClient.invalidateQueries({
queryKey: key })),
+ ...[...dagIds].flatMap((dagId) =>
+ gridQueryKeys(dagId).map((key) => queryClient.invalidateQueries({
queryKey: key })),
+ ),
]);
};
@@ -92,7 +103,7 @@ export const useBulkClearTaskInstances = ({ clearSelections,
onSuccessConfirm }:
),
);
- await invalidateQueries();
+ await invalidateQueries(new Set([...byDagRun.values()].map(({ dagId })
=> dagId)));
toaster.create({
description: translate("toaster.bulkClear.success.description", {
diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts
b/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts
index 8ad4d616197..ac47c550b37 100644
--- a/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts
@@ -32,6 +32,8 @@ import type {
} from "openapi/requests/types.gen";
import { toaster } from "src/components/ui";
+import { tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
+
type Props = {
readonly clearSelections: VoidFunction;
readonly onSuccessConfirm: VoidFunction;
@@ -62,6 +64,7 @@ export const useBulkTaskInstances = ({ clearSelections,
onSuccessConfirm }: Prop
await Promise.all([
queryClient.invalidateQueries({ queryKey:
[useTaskInstanceServiceGetTaskInstancesKey] }),
queryClient.invalidateQueries({ queryKey:
[useDagRunServiceGetDagRunsKey] }),
+ ...tiPerAttemptQueryKeys.map((key) => queryClient.invalidateQueries({
queryKey: key })),
]);
const isDelete = Boolean(responseData.delete);
diff --git a/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
b/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
index f4b23c90bd4..18629915a75 100644
--- a/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
@@ -28,7 +28,7 @@ import {
} from "openapi/queries";
import { createErrorToaster } from "src/utils";
-import { gridQueryKeys } from "./gridViewQueryKeys";
+import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
import { useClearDagRunDryRunKey } from "./useClearDagRunDryRun";
export const useClearDagRun = ({
@@ -61,6 +61,7 @@ export const useClearDagRun = ({
[useDagRunServiceGetDagRunsKey],
[useClearDagRunDryRunKey, dagId],
UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
+ ...tiPerAttemptQueryKeys,
];
await Promise.all([
diff --git a/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
b/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
index 2d70c49bc5b..33103445472 100644
--- a/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
@@ -31,7 +31,7 @@ import type { ApiError } from "openapi/requests";
import type { ClearTaskInstancesBody, TaskInstanceCollectionResponse } from
"openapi/requests/types.gen";
import { toaster } from "src/components/ui";
-import { gridQueryKeys } from "./gridViewQueryKeys";
+import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
import { useClearTaskInstancesDryRunKey } from "./useClearTaskInstancesDryRun";
import { usePatchTaskInstanceDryRunKey } from "./usePatchTaskInstanceDryRun";
@@ -119,6 +119,7 @@ export const useClearTaskInstances = ({
[useClearTaskInstancesDryRunKey, dagId],
[usePatchTaskInstanceDryRunKey, dagId, dagRunId],
UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
+ ...tiPerAttemptQueryKeys,
];
await Promise.all([
diff --git a/airflow-core/src/airflow/ui/src/queries/useDeleteDagRun.ts
b/airflow-core/src/airflow/ui/src/queries/useDeleteDagRun.ts
index fc45986ac26..c92dcdd3a4e 100644
--- a/airflow-core/src/airflow/ui/src/queries/useDeleteDagRun.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useDeleteDagRun.ts
@@ -23,11 +23,13 @@ import {
useDagRunServiceDeleteDagRun,
useDagRunServiceGetDagRunsKey,
UseDagRunServiceGetDagRunKeyFn,
- useTaskInstanceServiceGetTaskInstancesKey,
+ UseGanttServiceGetGanttDataKeyFn,
useTaskInstanceServiceGetHitlDetailsKey,
+ useTaskInstanceServiceGetMappedTaskInstanceKey,
+ useTaskInstanceServiceGetTaskInstancesKey,
} from "openapi/queries";
import { toaster } from "src/components/ui";
-import { gridQueryKeys } from "src/queries/gridViewQueryKeys";
+import { gridQueryKeys, tiPerAttemptQueryKeys } from
"src/queries/gridViewQueryKeys";
import { createErrorToaster } from "src/utils";
type DeleteDagRunParams = {
@@ -57,6 +59,9 @@ export const useDeleteDagRun = ({ dagId, dagRunId,
onSuccessConfirm }: DeleteDag
[useDagRunServiceGetDagRunsKey],
[useTaskInstanceServiceGetTaskInstancesKey],
[useTaskInstanceServiceGetHitlDetailsKey],
+ UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
+ [useTaskInstanceServiceGetMappedTaskInstanceKey],
+ ...tiPerAttemptQueryKeys,
];
await Promise.all([
diff --git a/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts
b/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts
index cdf07e49056..4a387d86cd7 100644
--- a/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts
@@ -25,11 +25,15 @@ import {
useTaskInstanceServiceGetTaskInstancesKey,
useDagRunServiceGetDagRunsKey,
UseDagRunServiceGetDagRunKeyFn,
+ UseGanttServiceGetGanttDataKeyFn,
useTaskInstanceServiceGetHitlDetailsKey,
+ useTaskInstanceServiceGetMappedTaskInstanceKey,
} from "openapi/queries";
import { toaster } from "src/components/ui";
import { createErrorToaster } from "src/utils";
+import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
+
type DeleteTaskInstanceParams = {
dagId: string;
dagRunId: string;
@@ -66,9 +70,15 @@ export const useDeleteTaskInstance = ({
[useTaskInstanceServiceGetTaskInstancesKey],
[useTaskInstanceServiceGetTaskInstanceKey, { dagId, dagRunId, mapIndex,
taskId }],
[useTaskInstanceServiceGetHitlDetailsKey],
+ UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
+ [useTaskInstanceServiceGetMappedTaskInstanceKey],
+ ...tiPerAttemptQueryKeys,
];
- await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({
queryKey: key })));
+ await Promise.all([
+ ...queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key
})),
+ ...gridQueryKeys(dagId).map((key) => queryClient.invalidateQueries({
queryKey: key })),
+ ]);
toaster.create({
description:
translate("dags:runAndTaskActions.delete.success.description", {
diff --git a/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
b/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
index 35ff56988fc..953f1d14933 100644
--- a/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
@@ -16,12 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
+import { useQueryClient } from "@tanstack/react-query";
import { useEffect, useState } from "react";
+import {
+ useDagRunServiceGetDagRunsKey,
+ useGridServiceGetGridRunsKey,
+ useTaskInstanceServiceGetTaskInstancesKey,
+} from "openapi/queries";
import type { GridTISummaries, TaskInstanceState } from "openapi/requests";
import { OpenAPI } from "openapi/requests/core/OpenAPI";
import { isStatePending, useAutoRefresh } from "src/utils";
+const GRID_MUTATION_WATCHED_KEYS = new Set([
+ useTaskInstanceServiceGetTaskInstancesKey,
+ useGridServiceGetGridRunsKey,
+ useDagRunServiceGetDagRunsKey,
+]);
+
/**
* Streams TI summaries for all grid runs over a single HTTP connection
(NDJSON).
*
@@ -41,6 +53,7 @@ export const useGridTiSummariesStream = ({
runIds: Array<string>;
states?: Array<TaskInstanceState | null | undefined>;
}) => {
+ const queryClient = useQueryClient();
const [summariesByRunId, setSummariesByRunId] = useState<Map<string,
GridTISummaries>>(new Map());
const [refreshTick, setRefreshTick] = useState(0);
@@ -124,5 +137,34 @@ export const useGridTiSummariesStream = ({
return () => clearInterval(timer);
}, [hasActiveRuns, baseRefetchInterval]);
+ // Re-stream whenever a mutation invalidates a grid-related query (TI states,
+ // run states, or grid structure). Invalidation events only fire from
explicit
+ // invalidateQueries() calls — never from polling intervals — so this never
+ // double-fires with the interval-based refresh above.
+ useEffect(() => {
+ let timeoutId: ReturnType<typeof setTimeout> | undefined;
+
+ const unsubscribe = queryClient.getQueryCache().subscribe((event) => {
+ const [firstKey] = event.query.queryKey as Array<unknown>;
+
+ if (
+ event.type === "updated" &&
+ event.action.type === "invalidate" &&
+ typeof firstKey === "string" &&
+ GRID_MUTATION_WATCHED_KEYS.has(firstKey)
+ ) {
+ // Debounce: a single mutation invalidates several matching queries in
one tick.
+ clearTimeout(timeoutId);
+ // eslint-disable-next-line max-nested-callbacks
+ timeoutId = setTimeout(() => setRefreshTick((tick) => tick + 1), 0);
+ }
+ });
+
+ return () => {
+ unsubscribe();
+ clearTimeout(timeoutId);
+ };
+ }, [queryClient]);
+
return { summariesByRunId };
};
diff --git a/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts
b/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts
index 6b78396f891..c032926dc95 100644
--- a/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts
+++ b/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts
@@ -23,11 +23,14 @@ import {
UseDagRunServiceGetDagRunKeyFn,
useDagRunServiceGetDagRunsKey,
useDagRunServicePatchDagRun,
+ UseGanttServiceGetGanttDataKeyFn,
+ useTaskInstanceServiceGetMappedTaskInstanceKey,
+ useTaskInstanceServiceGetTaskInstanceKey,
useTaskInstanceServiceGetTaskInstancesKey,
} from "openapi/queries";
import { createErrorToaster } from "src/utils";
-import { gridQueryKeys } from "./gridViewQueryKeys";
+import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
import { useClearDagRunDryRunKey } from "./useClearDagRunDryRun";
export const usePatchDagRun = ({
@@ -58,7 +61,11 @@ export const usePatchDagRun = ({
UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
[useDagRunServiceGetDagRunsKey],
[useTaskInstanceServiceGetTaskInstancesKey, { dagId, dagRunId }],
+ [useTaskInstanceServiceGetTaskInstanceKey, { dagId, dagRunId }],
+ [useTaskInstanceServiceGetMappedTaskInstanceKey, { dagId, dagRunId }],
[useClearDagRunDryRunKey, dagId],
+ UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
+ ...tiPerAttemptQueryKeys,
];
await Promise.all([
diff --git a/airflow-core/src/airflow/ui/src/queries/usePatchTaskGroup.ts
b/airflow-core/src/airflow/ui/src/queries/usePatchTaskGroup.ts
index f3e95c5dd8d..dcdcaca8c52 100644
--- a/airflow-core/src/airflow/ui/src/queries/usePatchTaskGroup.ts
+++ b/airflow-core/src/airflow/ui/src/queries/usePatchTaskGroup.ts
@@ -25,7 +25,7 @@ import {
} from "openapi/queries";
import { createErrorToaster } from "src/utils";
-import { gridQueryKeys } from "./gridViewQueryKeys";
+import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
import { useClearTaskInstancesDryRunKey } from "./useClearTaskInstancesDryRun";
import { usePatchTaskGroupDryRunKey } from "./usePatchTaskGroupDryRun";
@@ -59,6 +59,7 @@ export const usePatchTaskGroup = ({
[useTaskInstanceServiceGetTaskInstancesKey],
[usePatchTaskGroupDryRunKey, dagId, dagRunId, groupId],
[useClearTaskInstancesDryRunKey, dagId],
+ ...tiPerAttemptQueryKeys,
];
await Promise.all([
diff --git a/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts
b/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts
index 33e1f689e4b..0f0a5988ed8 100644
--- a/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts
+++ b/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts
@@ -28,7 +28,7 @@ import {
} from "openapi/queries";
import { createErrorToaster } from "src/utils";
-import { gridQueryKeys } from "./gridViewQueryKeys";
+import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
import { useClearTaskInstancesDryRunKey } from "./useClearTaskInstancesDryRun";
import { usePatchTaskInstanceDryRunKey } from "./usePatchTaskInstanceDryRun";
@@ -65,6 +65,7 @@ export const usePatchTaskInstance = ({
[useTaskInstanceServiceGetTaskInstancesKey],
[usePatchTaskInstanceDryRunKey, dagId, dagRunId, { mapIndex, taskId }],
[useClearTaskInstancesDryRunKey, dagId],
+ ...tiPerAttemptQueryKeys,
];
if (mapIndex !== undefined) {
diff --git a/airflow-core/src/airflow/ui/src/queries/useUpdateHITLDetail.ts
b/airflow-core/src/airflow/ui/src/queries/useUpdateHITLDetail.ts
index 656ba03f82e..358b8b87bbc 100644
--- a/airflow-core/src/airflow/ui/src/queries/useUpdateHITLDetail.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useUpdateHITLDetail.ts
@@ -23,6 +23,7 @@ import { useTranslation } from "react-i18next";
import {
UseDagRunServiceGetDagRunKeyFn,
useDagRunServiceGetDagRunsKey,
+ UseGanttServiceGetGanttDataKeyFn,
useTaskInstanceServiceGetHitlDetailsKey,
useTaskInstanceServiceGetHitlDetailKey,
useTaskInstanceServiceUpdateHitlDetail,
@@ -33,6 +34,8 @@ import { toaster } from "src/components/ui/Toaster";
import { createErrorToaster } from "src/utils";
import type { HITLResponseParams } from "src/utils/hitl";
+import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
+
export const useUpdateHITLDetail = ({
dagId,
dagRunId,
@@ -55,9 +58,14 @@ export const useUpdateHITLDetail = ({
[useTaskInstanceServiceGetTaskInstanceKey, { dagId, dagRunId, mapIndex,
taskId }],
[useTaskInstanceServiceGetHitlDetailsKey, { dagIdPrefixPattern: dagId,
dagRunId }],
[useTaskInstanceServiceGetHitlDetailKey, { dagId, dagRunId }],
+ UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
+ ...tiPerAttemptQueryKeys,
];
- await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({
queryKey: key })));
+ await Promise.all([
+ ...queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key
})),
+ ...gridQueryKeys(dagId).map((key) => queryClient.invalidateQueries({
queryKey: key })),
+ ]);
toaster.create({
title: translate("response.success", { taskId }),