This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7651e4b9859 UI: Use bulk clearDagRuns endpoint for bulk Dag run clear
(#67846)
7651e4b9859 is described below
commit 7651e4b98594a5c93fc6d6959d600ab353e0ed21
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Mon Jun 8 18:45:09 2026 +0200
UI: Use bulk clearDagRuns endpoint for bulk Dag run clear (#67846)
* UI: Use bulk clearDagRuns endpoint instead of per-run fan-out
* Fix error handling
* Use generated clearDagRuns mutation hook for bulk clear dag runs
* Small adjustments
---
.../src/pages/DagRuns/BulkClearDagRunsButton.tsx | 6 +-
.../airflow/ui/src/queries/useBulkClearDagRuns.ts | 130 +++++++--------------
.../ui/src/queries/useBulkClearDagRunsDryRun.ts | 67 +++++------
.../airflow/ui/src/queries/useBulkMarkAsDryRun.ts | 2 +-
4 files changed, 74 insertions(+), 131 deletions(-)
diff --git
a/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkClearDagRunsButton.tsx
b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkClearDagRunsButton.tsx
index 084a739e483..99751c525a0 100644
--- a/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkClearDagRunsButton.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkClearDagRunsButton.tsx
@@ -39,7 +39,7 @@ const BulkClearDagRunsButton = ({ deselectKeys,
selectedDagRuns }: Props) => {
const { onClose, onOpen, open } = useDisclosure();
const [selectedOptions, setSelectedOptions] =
useState<Array<string>>(["existingTasks"]);
const [note, setNote] = useState<string | null>(null);
- const { bulkClear, data, isPending } = useBulkClearDagRuns({
+ const { bulkClear, error, isPending } = useBulkClearDagRuns({
deselectKeys,
onSuccessConfirm: onClose,
});
@@ -97,13 +97,13 @@ const BulkClearDagRunsButton = ({ deselectKeys,
selectedDagRuns }: Props) => {
/>
</Flex>
<ActionAccordion affectedTasks={affectedTasks} groupByRunId
note={note} setNote={setNote} />
- <ActionErrors actionResponse={data?.clear} error={undefined} />
+ <ActionErrors error={error} />
<Flex justifyContent="end" mt={3}>
<Button
disabled={affectedTasks.total_entries === 0}
loading={isPending || isFetching}
onClick={() => {
- void bulkClear(selectedDagRuns, { note, onlyFailed, onlyNew
});
+ bulkClear(selectedDagRuns, { note, onlyFailed, onlyNew });
}}
>
<CgRedo />
diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRuns.ts
b/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRuns.ts
index b3caa1a77ac..bcb13226fbb 100644
--- a/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRuns.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRuns.ts
@@ -17,10 +17,10 @@
* under the License.
*/
import { useQueryClient } from "@tanstack/react-query";
-import { useState } from "react";
import { useTranslation } from "react-i18next";
import {
+ useDagRunServiceClearDagRuns,
UseDagRunServiceGetDagRunKeyFn,
useDagRunServiceGetDagRunsKey,
UseGanttServiceGetGanttDataKeyFn,
@@ -28,8 +28,7 @@ import {
useTaskInstanceServiceGetTaskInstanceKey,
useTaskInstanceServiceGetTaskInstancesKey,
} from "openapi/queries";
-import { DagRunService } from "openapi/requests/services.gen";
-import type { BulkActionResponse, DAGRunResponse } from
"openapi/requests/types.gen";
+import type { ClearDagRunsResponse, DAGRunResponse } from
"openapi/requests/types.gen";
import { toaster } from "src/components/ui";
import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
@@ -47,37 +46,14 @@ export type BulkClearDagRunsOptions = {
onlyNew: boolean;
};
-// Mirrors the bulk-endpoint success key (``{dag_id}.{run_id}``) so callers
can pass
-// the result straight into ``deselectKeys`` without an extra mapping.
-const getRowKey = (dagRun: DAGRunResponse) =>
`${dagRun.dag_id}.${dagRun.dag_run_id}`;
-
-const formatError = (reason: unknown): string => {
- if (reason instanceof Error) {
- return reason.message;
- }
- if (typeof reason === "object" && reason !== null && "body" in reason) {
- const { body } = reason as { body?: { detail?: unknown } };
-
- if (body?.detail !== undefined) {
- return typeof body.detail === "string" ? body.detail :
JSON.stringify(body.detail);
- }
- }
-
- return String(reason);
-};
-
export const useBulkClearDagRuns = ({ deselectKeys, onSuccessConfirm }: Props)
=> {
const queryClient = useQueryClient();
- const [data, setData] = useState<{ clear: BulkActionResponse } |
undefined>(undefined);
- const [isPending, setIsPending] = useState(false);
const { t: translate } = useTranslation(["common", "dags"]);
- const reset = () => {
- setData(undefined);
- };
+ const onSuccess = async (responseData: ClearDagRunsResponse) => {
+ const clearedRuns = "dag_runs" in responseData ?
[...responseData.dag_runs] : [];
+ const dagIds = new Set(clearedRuns.map((dagRun) => dagRun.dag_id));
- const invalidateQueries = async (dagRuns: ReadonlyArray<DAGRunResponse>) => {
- const dagIds = new Set(dagRuns.map((dagRun) => dagRun.dag_id));
const keys = [
[useDagRunServiceGetDagRunsKey],
[useTaskInstanceServiceGetTaskInstancesKey],
@@ -86,77 +62,49 @@ export const useBulkClearDagRuns = ({ deselectKeys,
onSuccessConfirm }: Props) =
[useBulkClearDagRunsDryRunKey],
...tiPerAttemptQueryKeys,
...[...dagIds].flatMap((dagId) => [...gridQueryKeys(dagId),
[useClearDagRunDryRunKey, dagId]]),
- ...dagRuns.flatMap((dagRun) => [
+ ...clearedRuns.flatMap((dagRun) => [
UseDagRunServiceGetDagRunKeyFn({ dagId: dagRun.dag_id, dagRunId:
dagRun.dag_run_id }),
UseGanttServiceGetGanttDataKeyFn({ dagId: dagRun.dag_id, runId:
dagRun.dag_run_id }),
]),
];
await Promise.all(keys.map((queryKey) => queryClient.invalidateQueries({
queryKey })));
- };
-
- const bulkClear = async (dagRuns: Array<DAGRunResponse>, options:
BulkClearDagRunsOptions) => {
- reset();
- setIsPending(true);
-
- const settled = await Promise.allSettled(
- dagRuns.map((dagRun) =>
- DagRunService.clearDagRun({
- dagId: dagRun.dag_id,
- dagRunId: dagRun.dag_run_id,
- requestBody: {
- dry_run: false,
- note: options.note ?? undefined,
- only_failed: options.onlyFailed,
- only_new: options.onlyNew,
- },
- }).then(() => dagRun),
- ),
- );
- const succeeded: Array<DAGRunResponse> = [];
- const errors: Array<Record<string, unknown>> = [];
-
- settled.forEach((outcome, index) => {
- if (outcome.status === "fulfilled") {
- succeeded.push(outcome.value);
- } else {
- const dagRun = dagRuns[index];
-
- errors.push({
- error: dagRun
- ? `${getRowKey(dagRun)}: ${formatError(outcome.reason)}`
- : formatError(outcome.reason),
- });
- }
+ toaster.create({
+ description: translate("toaster.bulkClear.success.description", {
+ count: clearedRuns.length,
+ keys: clearedRuns.map((dagRun) => dagRun.dag_run_id).join(", "),
+ resourceName: translate("dagRun_other"),
+ }),
+ title: translate("toaster.bulkClear.success.title", {
+ resourceName: translate("dagRun_other"),
+ }),
+ type: "success",
});
+ deselectKeys(clearedRuns.map((dagRun) =>
`${dagRun.dag_id}.${dagRun.dag_run_id}`));
+ onSuccessConfirm();
+ };
- await invalidateQueries(dagRuns);
-
- if (succeeded.length > 0) {
- toaster.create({
- description: translate("toaster.bulkClear.success.description", {
- count: succeeded.length,
- keys: succeeded.map((dagRun) => dagRun.dag_run_id).join(", "),
- resourceName: translate("dagRun_other"),
- }),
- title: translate("toaster.bulkClear.success.title", {
- resourceName: translate("dagRun_other"),
- }),
- type: "success",
- });
- deselectKeys(succeeded.map(getRowKey));
- }
-
- setData({ clear: { errors, success: succeeded.map(getRowKey) } });
- setIsPending(false);
-
- // Per-run failures keep the dialog open so the user can see what failed;
- // the consumer renders ``data.clear.errors``.
- if (errors.length === 0) {
- onSuccessConfirm();
- }
+ const clearDagRuns = useDagRunServiceClearDagRuns({ onSuccess });
+
+ const bulkClear = (dagRuns: Array<DAGRunResponse>, options:
BulkClearDagRunsOptions) => {
+ clearDagRuns.reset();
+ clearDagRuns.mutate({
+ dagId: "~",
+ requestBody: {
+ dag_runs: dagRuns.map((dagRun) => ({ dag_id: dagRun.dag_id,
dag_run_id: dagRun.dag_run_id })),
+ dry_run: false,
+ note: options.note ?? undefined,
+ only_failed: options.onlyFailed,
+ only_new: options.onlyNew,
+ },
+ });
};
- return { bulkClear, data, isPending, reset };
+ return {
+ bulkClear,
+ error: clearDagRuns.error,
+ isPending: clearDagRuns.isPending,
+ reset: clearDagRuns.reset,
+ };
};
diff --git
a/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRunsDryRun.ts
b/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRunsDryRun.ts
index fbdf858bcc5..949a1905017 100644
--- a/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRunsDryRun.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRunsDryRun.ts
@@ -16,11 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-import { useQueries } from "@tanstack/react-query";
+import { useQuery } from "@tanstack/react-query";
import { DagRunService } from "openapi/requests/services.gen";
import type {
- ClearTaskInstanceCollectionResponse,
DAGRunResponse,
TaskInstanceCollectionResponse,
TaskInstanceResponse,
@@ -40,43 +39,39 @@ export const useBulkClearDagRunsDryRun = (
selectedDagRuns: Array<DAGRunResponse>,
options: Options,
) => {
- const results = useQueries({
- queries: selectedDagRuns.map((dagRun) => ({
- enabled,
- queryFn: () =>
- DagRunService.clearDagRun({
- dagId: dagRun.dag_id,
- dagRunId: dagRun.dag_run_id,
- requestBody: {
- dry_run: true,
- only_failed: options.onlyFailed,
- only_new: options.onlyNew,
- },
- }) as Promise<ClearTaskInstanceCollectionResponse>,
- queryKey: [
- useBulkClearDagRunsDryRunKey,
- dagRun.dag_id,
- dagRun.dag_run_id,
- { only_failed: options.onlyFailed, only_new: options.onlyNew },
- ],
- refetchOnMount: "always" as const,
- })),
+ const { data: response, isFetching } = useQuery({
+ enabled: enabled && selectedDagRuns.length > 0,
+ queryFn: () =>
+ DagRunService.clearDagRuns({
+ dagId: "~",
+ requestBody: {
+ dag_runs: selectedDagRuns.map((dagRun) => ({
+ dag_id: dagRun.dag_id,
+ dag_run_id: dagRun.dag_run_id,
+ })),
+ dry_run: true,
+ only_failed: options.onlyFailed,
+ only_new: options.onlyNew,
+ },
+ }),
+ queryKey: [
+ useBulkClearDagRunsDryRunKey,
+ selectedDagRuns.map((dagRun) =>
`${dagRun.dag_id}.${dagRun.dag_run_id}`).sort(),
+ { only_failed: options.onlyFailed, only_new: options.onlyNew },
+ ],
+ refetchOnMount: "always",
});
- const isFetching = results.some((result) => result.isFetching);
- // Each per-run call is scoped to a distinct ``(dag_id, dag_run_id)`` so the
- // concatenated array can't contain duplicates; the response is also
- // homogeneous (``only_new=true`` yields ``NewTaskResponse`` placeholders,
- // ``false`` yields real ``TaskInstanceResponse``), so the cast is safe even
- // though the OpenAPI type widens to a union.
- const taskInstances = results.flatMap((result) =>
result.data?.task_instances ?? []);
+ // ``clearDagRuns`` returns a union; ``dry_run`` always yields the
task-instance
+ // collection arm, so narrow on its shape. ``only_new=true`` yields
+ // ``NewTaskResponse`` placeholders that render in the same affected-tasks
table.
const data: TaskInstanceCollectionResponse =
- taskInstances.length === 0
- ? EMPTY
- : {
- task_instances: taskInstances as Array<TaskInstanceResponse>,
- total_entries: taskInstances.length,
- };
+ response && "task_instances" in response
+ ? {
+ task_instances: response.task_instances as
Array<TaskInstanceResponse>,
+ total_entries: response.total_entries,
+ }
+ : EMPTY;
return { data, isFetching };
};
diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts
b/airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts
index b8d5eb06758..e7c2f218735 100644
--- a/airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts
@@ -85,7 +85,7 @@ export const useBulkMarkAsDryRun = (
new_state: targetState,
},
],
- refetchOnMount: "always" as const,
+ refetchOnMount: "always",
})),
});