This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7651e4b9859 UI: Use bulk clearDagRuns endpoint for bulk Dag run clear 
(#67846)
7651e4b9859 is described below

commit 7651e4b98594a5c93fc6d6959d600ab353e0ed21
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Mon Jun 8 18:45:09 2026 +0200

    UI: Use bulk clearDagRuns endpoint for bulk Dag run clear (#67846)
    
    * UI: Use bulk clearDagRuns endpoint instead of per-run fan-out
    
    * Fix error handling
    
    * Use generated clearDagRuns mutation hook for bulk clear dag runs
    
    * Small adjustments
---
 .../src/pages/DagRuns/BulkClearDagRunsButton.tsx   |   6 +-
 .../airflow/ui/src/queries/useBulkClearDagRuns.ts  | 130 +++++++--------------
 .../ui/src/queries/useBulkClearDagRunsDryRun.ts    |  67 +++++------
 .../airflow/ui/src/queries/useBulkMarkAsDryRun.ts  |   2 +-
 4 files changed, 74 insertions(+), 131 deletions(-)

diff --git 
a/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkClearDagRunsButton.tsx 
b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkClearDagRunsButton.tsx
index 084a739e483..99751c525a0 100644
--- a/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkClearDagRunsButton.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkClearDagRunsButton.tsx
@@ -39,7 +39,7 @@ const BulkClearDagRunsButton = ({ deselectKeys, 
selectedDagRuns }: Props) => {
   const { onClose, onOpen, open } = useDisclosure();
   const [selectedOptions, setSelectedOptions] = 
useState<Array<string>>(["existingTasks"]);
   const [note, setNote] = useState<string | null>(null);
-  const { bulkClear, data, isPending } = useBulkClearDagRuns({
+  const { bulkClear, error, isPending } = useBulkClearDagRuns({
     deselectKeys,
     onSuccessConfirm: onClose,
   });
@@ -97,13 +97,13 @@ const BulkClearDagRunsButton = ({ deselectKeys, 
selectedDagRuns }: Props) => {
               />
             </Flex>
             <ActionAccordion affectedTasks={affectedTasks} groupByRunId 
note={note} setNote={setNote} />
-            <ActionErrors actionResponse={data?.clear} error={undefined} />
+            <ActionErrors error={error} />
             <Flex justifyContent="end" mt={3}>
               <Button
                 disabled={affectedTasks.total_entries === 0}
                 loading={isPending || isFetching}
                 onClick={() => {
-                  void bulkClear(selectedDagRuns, { note, onlyFailed, onlyNew 
});
+                  bulkClear(selectedDagRuns, { note, onlyFailed, onlyNew });
                 }}
               >
                 <CgRedo />
diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRuns.ts 
b/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRuns.ts
index b3caa1a77ac..bcb13226fbb 100644
--- a/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRuns.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRuns.ts
@@ -17,10 +17,10 @@
  * under the License.
  */
 import { useQueryClient } from "@tanstack/react-query";
-import { useState } from "react";
 import { useTranslation } from "react-i18next";
 
 import {
+  useDagRunServiceClearDagRuns,
   UseDagRunServiceGetDagRunKeyFn,
   useDagRunServiceGetDagRunsKey,
   UseGanttServiceGetGanttDataKeyFn,
@@ -28,8 +28,7 @@ import {
   useTaskInstanceServiceGetTaskInstanceKey,
   useTaskInstanceServiceGetTaskInstancesKey,
 } from "openapi/queries";
-import { DagRunService } from "openapi/requests/services.gen";
-import type { BulkActionResponse, DAGRunResponse } from 
"openapi/requests/types.gen";
+import type { ClearDagRunsResponse, DAGRunResponse } from 
"openapi/requests/types.gen";
 import { toaster } from "src/components/ui";
 
 import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
@@ -47,37 +46,14 @@ export type BulkClearDagRunsOptions = {
   onlyNew: boolean;
 };
 
-// Mirrors the bulk-endpoint success key (``{dag_id}.{run_id}``) so callers 
can pass
-// the result straight into ``deselectKeys`` without an extra mapping.
-const getRowKey = (dagRun: DAGRunResponse) => 
`${dagRun.dag_id}.${dagRun.dag_run_id}`;
-
-const formatError = (reason: unknown): string => {
-  if (reason instanceof Error) {
-    return reason.message;
-  }
-  if (typeof reason === "object" && reason !== null && "body" in reason) {
-    const { body } = reason as { body?: { detail?: unknown } };
-
-    if (body?.detail !== undefined) {
-      return typeof body.detail === "string" ? body.detail : 
JSON.stringify(body.detail);
-    }
-  }
-
-  return String(reason);
-};
-
 export const useBulkClearDagRuns = ({ deselectKeys, onSuccessConfirm }: Props) 
=> {
   const queryClient = useQueryClient();
-  const [data, setData] = useState<{ clear: BulkActionResponse } | 
undefined>(undefined);
-  const [isPending, setIsPending] = useState(false);
   const { t: translate } = useTranslation(["common", "dags"]);
 
-  const reset = () => {
-    setData(undefined);
-  };
+  const onSuccess = async (responseData: ClearDagRunsResponse) => {
+    const clearedRuns = "dag_runs" in responseData ? 
[...responseData.dag_runs] : [];
+    const dagIds = new Set(clearedRuns.map((dagRun) => dagRun.dag_id));
 
-  const invalidateQueries = async (dagRuns: ReadonlyArray<DAGRunResponse>) => {
-    const dagIds = new Set(dagRuns.map((dagRun) => dagRun.dag_id));
     const keys = [
       [useDagRunServiceGetDagRunsKey],
       [useTaskInstanceServiceGetTaskInstancesKey],
@@ -86,77 +62,49 @@ export const useBulkClearDagRuns = ({ deselectKeys, 
onSuccessConfirm }: Props) =
       [useBulkClearDagRunsDryRunKey],
       ...tiPerAttemptQueryKeys,
       ...[...dagIds].flatMap((dagId) => [...gridQueryKeys(dagId), 
[useClearDagRunDryRunKey, dagId]]),
-      ...dagRuns.flatMap((dagRun) => [
+      ...clearedRuns.flatMap((dagRun) => [
         UseDagRunServiceGetDagRunKeyFn({ dagId: dagRun.dag_id, dagRunId: 
dagRun.dag_run_id }),
         UseGanttServiceGetGanttDataKeyFn({ dagId: dagRun.dag_id, runId: 
dagRun.dag_run_id }),
       ]),
     ];
 
     await Promise.all(keys.map((queryKey) => queryClient.invalidateQueries({ 
queryKey })));
-  };
-
-  const bulkClear = async (dagRuns: Array<DAGRunResponse>, options: 
BulkClearDagRunsOptions) => {
-    reset();
-    setIsPending(true);
-
-    const settled = await Promise.allSettled(
-      dagRuns.map((dagRun) =>
-        DagRunService.clearDagRun({
-          dagId: dagRun.dag_id,
-          dagRunId: dagRun.dag_run_id,
-          requestBody: {
-            dry_run: false,
-            note: options.note ?? undefined,
-            only_failed: options.onlyFailed,
-            only_new: options.onlyNew,
-          },
-        }).then(() => dagRun),
-      ),
-    );
 
-    const succeeded: Array<DAGRunResponse> = [];
-    const errors: Array<Record<string, unknown>> = [];
-
-    settled.forEach((outcome, index) => {
-      if (outcome.status === "fulfilled") {
-        succeeded.push(outcome.value);
-      } else {
-        const dagRun = dagRuns[index];
-
-        errors.push({
-          error: dagRun
-            ? `${getRowKey(dagRun)}: ${formatError(outcome.reason)}`
-            : formatError(outcome.reason),
-        });
-      }
+    toaster.create({
+      description: translate("toaster.bulkClear.success.description", {
+        count: clearedRuns.length,
+        keys: clearedRuns.map((dagRun) => dagRun.dag_run_id).join(", "),
+        resourceName: translate("dagRun_other"),
+      }),
+      title: translate("toaster.bulkClear.success.title", {
+        resourceName: translate("dagRun_other"),
+      }),
+      type: "success",
     });
+    deselectKeys(clearedRuns.map((dagRun) => 
`${dagRun.dag_id}.${dagRun.dag_run_id}`));
+    onSuccessConfirm();
+  };
 
-    await invalidateQueries(dagRuns);
-
-    if (succeeded.length > 0) {
-      toaster.create({
-        description: translate("toaster.bulkClear.success.description", {
-          count: succeeded.length,
-          keys: succeeded.map((dagRun) => dagRun.dag_run_id).join(", "),
-          resourceName: translate("dagRun_other"),
-        }),
-        title: translate("toaster.bulkClear.success.title", {
-          resourceName: translate("dagRun_other"),
-        }),
-        type: "success",
-      });
-      deselectKeys(succeeded.map(getRowKey));
-    }
-
-    setData({ clear: { errors, success: succeeded.map(getRowKey) } });
-    setIsPending(false);
-
-    // Per-run failures keep the dialog open so the user can see what failed;
-    // the consumer renders ``data.clear.errors``.
-    if (errors.length === 0) {
-      onSuccessConfirm();
-    }
+  const clearDagRuns = useDagRunServiceClearDagRuns({ onSuccess });
+
+  const bulkClear = (dagRuns: Array<DAGRunResponse>, options: 
BulkClearDagRunsOptions) => {
+    clearDagRuns.reset();
+    clearDagRuns.mutate({
+      dagId: "~",
+      requestBody: {
+        dag_runs: dagRuns.map((dagRun) => ({ dag_id: dagRun.dag_id, 
dag_run_id: dagRun.dag_run_id })),
+        dry_run: false,
+        note: options.note ?? undefined,
+        only_failed: options.onlyFailed,
+        only_new: options.onlyNew,
+      },
+    });
   };
 
-  return { bulkClear, data, isPending, reset };
+  return {
+    bulkClear,
+    error: clearDagRuns.error,
+    isPending: clearDagRuns.isPending,
+    reset: clearDagRuns.reset,
+  };
 };
diff --git 
a/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRunsDryRun.ts 
b/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRunsDryRun.ts
index fbdf858bcc5..949a1905017 100644
--- a/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRunsDryRun.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkClearDagRunsDryRun.ts
@@ -16,11 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-import { useQueries } from "@tanstack/react-query";
+import { useQuery } from "@tanstack/react-query";
 
 import { DagRunService } from "openapi/requests/services.gen";
 import type {
-  ClearTaskInstanceCollectionResponse,
   DAGRunResponse,
   TaskInstanceCollectionResponse,
   TaskInstanceResponse,
@@ -40,43 +39,39 @@ export const useBulkClearDagRunsDryRun = (
   selectedDagRuns: Array<DAGRunResponse>,
   options: Options,
 ) => {
-  const results = useQueries({
-    queries: selectedDagRuns.map((dagRun) => ({
-      enabled,
-      queryFn: () =>
-        DagRunService.clearDagRun({
-          dagId: dagRun.dag_id,
-          dagRunId: dagRun.dag_run_id,
-          requestBody: {
-            dry_run: true,
-            only_failed: options.onlyFailed,
-            only_new: options.onlyNew,
-          },
-        }) as Promise<ClearTaskInstanceCollectionResponse>,
-      queryKey: [
-        useBulkClearDagRunsDryRunKey,
-        dagRun.dag_id,
-        dagRun.dag_run_id,
-        { only_failed: options.onlyFailed, only_new: options.onlyNew },
-      ],
-      refetchOnMount: "always" as const,
-    })),
+  const { data: response, isFetching } = useQuery({
+    enabled: enabled && selectedDagRuns.length > 0,
+    queryFn: () =>
+      DagRunService.clearDagRuns({
+        dagId: "~",
+        requestBody: {
+          dag_runs: selectedDagRuns.map((dagRun) => ({
+            dag_id: dagRun.dag_id,
+            dag_run_id: dagRun.dag_run_id,
+          })),
+          dry_run: true,
+          only_failed: options.onlyFailed,
+          only_new: options.onlyNew,
+        },
+      }),
+    queryKey: [
+      useBulkClearDagRunsDryRunKey,
+      selectedDagRuns.map((dagRun) => 
`${dagRun.dag_id}.${dagRun.dag_run_id}`).sort(),
+      { only_failed: options.onlyFailed, only_new: options.onlyNew },
+    ],
+    refetchOnMount: "always",
   });
 
-  const isFetching = results.some((result) => result.isFetching);
-  // Each per-run call is scoped to a distinct ``(dag_id, dag_run_id)`` so the
-  // concatenated array can't contain duplicates; the response is also
-  // homogeneous (``only_new=true`` yields ``NewTaskResponse`` placeholders,
-  // ``false`` yields real ``TaskInstanceResponse``), so the cast is safe even
-  // though the OpenAPI type widens to a union.
-  const taskInstances = results.flatMap((result) => 
result.data?.task_instances ?? []);
+  // ``clearDagRuns`` returns a union; ``dry_run`` always yields the 
task-instance
+  // collection arm, so narrow on its shape. ``only_new=true`` yields
+  // ``NewTaskResponse`` placeholders that render in the same affected-tasks 
table.
   const data: TaskInstanceCollectionResponse =
-    taskInstances.length === 0
-      ? EMPTY
-      : {
-          task_instances: taskInstances as Array<TaskInstanceResponse>,
-          total_entries: taskInstances.length,
-        };
+    response && "task_instances" in response
+      ? {
+          task_instances: response.task_instances as 
Array<TaskInstanceResponse>,
+          total_entries: response.total_entries,
+        }
+      : EMPTY;
 
   return { data, isFetching };
 };
diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts 
b/airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts
index b8d5eb06758..e7c2f218735 100644
--- a/airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts
@@ -85,7 +85,7 @@ export const useBulkMarkAsDryRun = (
           new_state: targetState,
         },
       ],
-      refetchOnMount: "always" as const,
+      refetchOnMount: "always",
     })),
   });
 

Reply via email to