This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5e64571ca95 Add Bulk operations for edge workers page (#64033)
5e64571ca95 is described below
commit 5e64571ca955e09d19dec6cfc361c61780e2a9c3
Author: Shubham Raj <[email protected]>
AuthorDate: Sat Mar 21 23:41:19 2026 +0530
Add Bulk operations for edge workers page (#64033)
* initial checkbox addition
* refactor to use hooks
---
.../www/src/components/BulkWorkerOperations.tsx | 181 ++++++++++++++++++++
.../providers/edge3/plugins/www/src/constants.ts | 16 ++
.../plugins/www/src/hooks/useBulkWorkerActions.ts | 185 +++++++++++++++++++++
.../edge3/plugins/www/src/pages/WorkerPage.tsx | 120 ++++++++++++-
4 files changed, 496 insertions(+), 6 deletions(-)
diff --git
a/providers/edge3/src/airflow/providers/edge3/plugins/www/src/components/BulkWorkerOperations.tsx
b/providers/edge3/src/airflow/providers/edge3/plugins/www/src/components/BulkWorkerOperations.tsx
new file mode 100644
index 00000000000..a65ad262609
--- /dev/null
+++
b/providers/edge3/src/airflow/providers/edge3/plugins/www/src/components/BulkWorkerOperations.tsx
@@ -0,0 +1,181 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Button, Dialog, HStack, List, Portal, Text, useDisclosure } from
"@chakra-ui/react";
+import type { Worker } from "openapi/requests/types.gen";
+import { FaPowerOff } from "react-icons/fa";
+import { FaRegTrashCan } from "react-icons/fa6";
+
+import { useBulkWorkerActions } from "src/hooks/useBulkWorkerActions";
+
+type BulkWorkerOperationsProps = {
+ readonly onClearSelection: VoidFunction;
+ readonly onOperations: () => void;
+ readonly selectedWorkers: Array<Worker>;
+};
+
+export const BulkWorkerOperations = ({
+ onClearSelection,
+ onOperations,
+ selectedWorkers,
+}: BulkWorkerOperationsProps) => {
+ const {
+ onClose: onCloseShutdownDialog,
+ onOpen: onOpenShutdownDialog,
+ open: isShutdownDialogOpen,
+ } = useDisclosure();
+ const {
+ onClose: onCloseDeleteDialog,
+ onOpen: onOpenDeleteDialog,
+ open: isDeleteDialogOpen,
+ } = useDisclosure();
+ const {
+ deleteWorkers,
+ handleBulkDelete,
+ handleBulkShutdown,
+ isBulkDeletePending,
+ isBulkShutdownPending,
+ shutdownWorkers,
+ } = useBulkWorkerActions({
+ onClearSelection,
+ onOperations,
+ selectedWorkers,
+ });
+
+ const onBulkShutdown = async () => {
+ try {
+ await handleBulkShutdown();
+ } finally {
+ onCloseShutdownDialog();
+ }
+ };
+
+ const onBulkDelete = async () => {
+ try {
+ await handleBulkDelete();
+ } finally {
+ onCloseDeleteDialog();
+ }
+ };
+
+ return (
+ <>
+ <HStack>
+ <Button
+ colorPalette="danger"
+ disabled={shutdownWorkers.length === 0}
+ onClick={onOpenShutdownDialog}
+ size="sm"
+ variant="outline"
+ >
+ <FaPowerOff />
+ Shutdown ({shutdownWorkers.length})
+ </Button>
+ <Button
+ colorPalette="danger"
+ disabled={deleteWorkers.length === 0}
+ onClick={onOpenDeleteDialog}
+ size="sm"
+ variant="outline"
+ >
+ <FaRegTrashCan />
+ Delete ({deleteWorkers.length})
+ </Button>
+ </HStack>
+
+ <Dialog.Root onOpenChange={onCloseShutdownDialog}
open={isShutdownDialogOpen} size="lg">
+ <Portal>
+ <Dialog.Backdrop />
+ <Dialog.Positioner>
+ <Dialog.Content>
+ <Dialog.Header>
+ <Dialog.Title>
+ Shutdown {shutdownWorkers.length} selected worker(s)
+ </Dialog.Title>
+ </Dialog.Header>
+ <Dialog.Body>
+ <Text mb={3}>
+ Shutdown can be requested only for workers in states: idle,
running, maintenance pending,
+ maintenance mode, or maintenance request.
+ </Text>
+ <List.Root ps={5}>
+ {shutdownWorkers.map((worker) => (
+ <List.Item
key={worker.worker_name}>{worker.worker_name}</List.Item>
+ ))}
+ </List.Root>
+ </Dialog.Body>
+ <Dialog.Footer>
+ <Dialog.ActionTrigger asChild>
+ <Button variant="outline">Cancel</Button>
+ </Dialog.ActionTrigger>
+ <Button
+ colorPalette="danger"
+ loading={isBulkShutdownPending}
+ loadingText="Shutting down..."
+ onClick={onBulkShutdown}
+ >
+ <FaPowerOff />
+ Request Shutdown
+ </Button>
+ </Dialog.Footer>
+ </Dialog.Content>
+ </Dialog.Positioner>
+ </Portal>
+ </Dialog.Root>
+
+ <Dialog.Root onOpenChange={onCloseDeleteDialog}
open={isDeleteDialogOpen} size="lg">
+ <Portal>
+ <Dialog.Backdrop />
+ <Dialog.Positioner>
+ <Dialog.Content>
+ <Dialog.Header>
+ <Dialog.Title>
+ Delete {deleteWorkers.length} selected worker(s)
+ </Dialog.Title>
+ </Dialog.Header>
+ <Dialog.Body>
+ <Text mb={3}>
+ Delete is available only for workers in states: offline,
unknown, or offline maintenance.
+ </Text>
+ <List.Root ps={5}>
+ {deleteWorkers.map((worker) => (
+ <List.Item
key={worker.worker_name}>{worker.worker_name}</List.Item>
+ ))}
+ </List.Root>
+ </Dialog.Body>
+ <Dialog.Footer>
+ <Dialog.ActionTrigger asChild>
+ <Button variant="outline">Cancel</Button>
+ </Dialog.ActionTrigger>
+ <Button
+ colorPalette="danger"
+ loading={isBulkDeletePending}
+ loadingText="Deleting..."
+ onClick={onBulkDelete}
+ >
+ <FaRegTrashCan />
+ Delete Workers
+ </Button>
+ </Dialog.Footer>
+ </Dialog.Content>
+ </Dialog.Positioner>
+ </Portal>
+ </Dialog.Root>
+ </>
+ );
+};
diff --git
a/providers/edge3/src/airflow/providers/edge3/plugins/www/src/constants.ts
b/providers/edge3/src/airflow/providers/edge3/plugins/www/src/constants.ts
index 6e82544f7be..1b23645de86 100644
--- a/providers/edge3/src/airflow/providers/edge3/plugins/www/src/constants.ts
+++ b/providers/edge3/src/airflow/providers/edge3/plugins/www/src/constants.ts
@@ -54,3 +54,19 @@ export const jobStateOptions = createListCollection<{
})),
],
});
+
+export const bulkWorkerShutdownEligibleStates = new Set<EdgeWorkerState>([
+ "idle",
+ "running",
+ "maintenance pending",
+ "maintenance mode",
+ "maintenance request",
+]);
+
+export const bulkWorkerDeleteEligibleStates = new Set<EdgeWorkerState>([
+ "offline",
+ "unknown",
+ "offline maintenance",
+]);
+
+export const bulkWorkerActionBatchSize = 10;
diff --git
a/providers/edge3/src/airflow/providers/edge3/plugins/www/src/hooks/useBulkWorkerActions.ts
b/providers/edge3/src/airflow/providers/edge3/plugins/www/src/hooks/useBulkWorkerActions.ts
new file mode 100644
index 00000000000..c9bc1eb8bca
--- /dev/null
+++
b/providers/edge3/src/airflow/providers/edge3/plugins/www/src/hooks/useBulkWorkerActions.ts
@@ -0,0 +1,185 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useUiServiceDeleteWorker, useUiServiceRequestWorkerShutdown } from
"openapi/queries";
+import type { Worker } from "openapi/requests/types.gen";
+import { useMemo, useState } from "react";
+
+import { toaster } from "src/components/ui";
+import {
+ bulkWorkerActionBatchSize,
+ bulkWorkerDeleteEligibleStates,
+ bulkWorkerShutdownEligibleStates,
+} from "src/constants";
+
+type UseBulkWorkerActionsProps = {
+ readonly onClearSelection: VoidFunction;
+ readonly onOperations: () => void;
+ readonly selectedWorkers: Array<Worker>;
+};
+
+type BulkActionResult = {
+ failedWorkers: Array<string>;
+ successCount: number;
+};
+
+type BulkActionConfig = {
+ readonly failureTitle: string;
+ readonly setPending: (pending: boolean) => void;
+ readonly successToast: (successCount: number) => {
+ description: string;
+ title: string;
+ type: "success";
+ };
+ readonly workers: Array<Worker>;
+ readonly workerMutation: (worker: Worker) => Promise<unknown>;
+};
+
+const getFailureDescription = (
+ failedWorkers: Array<string>,
+): string => {
+ const maxFailuresToDisplay = 5;
+ const displayedFailedWorkers = failedWorkers.slice(0, maxFailuresToDisplay);
+ const additionalFailedWorkersCount = failedWorkers.length -
displayedFailedWorkers.length;
+ const additionalFailuresMessage =
+ additionalFailedWorkersCount > 0 ? ` and ${additionalFailedWorkersCount}
more` : "";
+
+ return `Failed for ${failedWorkers.length} worker(s):
${displayedFailedWorkers.join(", ")}${additionalFailuresMessage}.`;
+};
+
+const getBulkActionResult = (
+ workers: Array<Worker>,
+ results: Array<PromiseSettledResult<unknown>>,
+): BulkActionResult => {
+ const failedWorkers = results.flatMap((result, index) =>
+ result.status === "rejected" ? [workers[index]?.worker_name ?? "unknown"]
: [],
+ );
+
+ return {
+ failedWorkers,
+ successCount: workers.length - failedWorkers.length,
+ };
+};
+
+const runMutationsInBatches = async (
+ workers: Array<Worker>,
+ workerMutation: (worker: Worker) => Promise<unknown>,
+): Promise<Array<PromiseSettledResult<unknown>>> => {
+ const results: Array<PromiseSettledResult<unknown>> = [];
+
+ for (let index = 0; index < workers.length; index +=
bulkWorkerActionBatchSize) {
+ const batchWorkers = workers.slice(index, index +
bulkWorkerActionBatchSize);
+ const batchResults = await Promise.allSettled(batchWorkers.map((worker) =>
workerMutation(worker)));
+
+ results.push(...batchResults);
+ }
+
+ return results;
+};
+
+export const useBulkWorkerActions = ({
+ onClearSelection,
+ onOperations,
+ selectedWorkers,
+}: UseBulkWorkerActionsProps) => {
+ const [isBulkDeletePending, setIsBulkDeletePending] = useState(false);
+ const [isBulkShutdownPending, setIsBulkShutdownPending] = useState(false);
+
+ const shutdownMutation = useUiServiceRequestWorkerShutdown();
+ const deleteMutation = useUiServiceDeleteWorker();
+
+ const shutdownWorkers = useMemo(
+ () => selectedWorkers.filter((worker) =>
bulkWorkerShutdownEligibleStates.has(worker.state)),
+ [selectedWorkers],
+ );
+ const deleteWorkers = useMemo(
+ () => selectedWorkers.filter((worker) =>
bulkWorkerDeleteEligibleStates.has(worker.state)),
+ [selectedWorkers],
+ );
+
+ const handleBulkAction = async ({
+ failureTitle,
+ setPending,
+ successToast,
+ workers,
+ workerMutation,
+ }: BulkActionConfig): Promise<void> => {
+ setPending(true);
+
+ try {
+ const results = await runMutationsInBatches(workers, workerMutation);
+ const { failedWorkers, successCount } = getBulkActionResult(workers,
results);
+
+ if (successCount > 0) {
+ toaster.create(successToast(successCount));
+ }
+
+ if (failedWorkers.length > 0) {
+ toaster.create({
+ description: getFailureDescription(failedWorkers),
+ title: failureTitle,
+ type: "error",
+ });
+ }
+
+ if (successCount > 0) {
+ onOperations();
+ onClearSelection();
+ }
+ } finally {
+ setPending(false);
+ }
+ };
+
+ const handleBulkShutdown = async (): Promise<void> => {
+ await handleBulkAction({
+ failureTitle: "Bulk Shutdown Partially Failed",
+ setPending: setIsBulkShutdownPending,
+ successToast: (successCount) => ({
+ description: `Shutdown requested for ${successCount} worker(s).`,
+ title: "Bulk Shutdown Requested",
+ type: "success",
+ }),
+ workers: shutdownWorkers,
+ workerMutation: (worker) => shutdownMutation.mutateAsync({ workerName:
worker.worker_name }),
+ });
+ };
+
+ const handleBulkDelete = async (): Promise<void> => {
+ await handleBulkAction({
+ failureTitle: "Bulk Delete Partially Failed",
+ setPending: setIsBulkDeletePending,
+ successToast: (successCount) => ({
+ description: `${successCount} worker(s) deleted.`,
+ title: "Bulk Delete Completed",
+ type: "success",
+ }),
+ workers: deleteWorkers,
+ workerMutation: (worker) => deleteMutation.mutateAsync({ workerName:
worker.worker_name }),
+ });
+ };
+
+ return {
+ deleteWorkers,
+ handleBulkDelete,
+ handleBulkShutdown,
+ isBulkDeletePending,
+ isBulkShutdownPending,
+ shutdownWorkers,
+ };
+};
diff --git
a/providers/edge3/src/airflow/providers/edge3/plugins/www/src/pages/WorkerPage.tsx
b/providers/edge3/src/airflow/providers/edge3/plugins/www/src/pages/WorkerPage.tsx
index 6f1758f2ef3..23da0d86d82 100644
---
a/providers/edge3/src/airflow/providers/edge3/plugins/www/src/pages/WorkerPage.tsx
+++
b/providers/edge3/src/airflow/providers/edge3/plugins/www/src/pages/WorkerPage.tsx
@@ -16,13 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
-import { Box, Code, HStack, Link as ChakraLink, List, Table, Text, type
SelectValueChangeDetails } from "@chakra-ui/react";
-import { useState, useCallback } from "react";
+import {
+ ActionBar,
+ Box,
+ Checkbox as ChakraCheckbox,
+ CloseButton,
+ Code,
+ HStack,
+ Link as ChakraLink,
+ List,
+ Portal,
+ Table,
+ Text,
+ type SelectValueChangeDetails,
+} from "@chakra-ui/react";
+import { useCallback, useEffect, useMemo, useState } from "react";
import { useUiServiceWorker } from "openapi/queries";
+import type { EdgeWorkerState, Worker } from "openapi/requests/types.gen";
import { Link } from "react-router-dom";
import { LuExternalLink } from "react-icons/lu";
import TimeAgo from "react-timeago";
+import { BulkWorkerOperations } from "src/components/BulkWorkerOperations";
import { ErrorAlert } from "src/components/ErrorAlert";
import { SearchBar } from "src/components/SearchBar";
import { WorkerOperations } from "src/components/WorkerOperations";
@@ -30,12 +45,12 @@ import { WorkerStateBadge } from
"src/components/WorkerStateBadge";
import { ScrollToAnchor, Select } from "src/components/ui";
import { workerStateOptions } from "src/constants";
import { autoRefreshInterval } from "src/utils";
-import type { EdgeWorkerState } from "openapi/requests/types.gen";
export const WorkerPage = () => {
const [workerNamePattern, setWorkerNamePattern] = useState("");
const [queueNamePattern, setQueueNamePattern] = useState("");
const [filteredState, setFilteredState] = useState<string[]>([]);
+ const [selectedWorkerNames, setSelectedWorkerNames] =
useState<Set<string>>(new Set());
const hasFilteredState = filteredState.length > 0;
@@ -51,6 +66,22 @@ export const WorkerPage = () => {
refetchInterval: autoRefreshInterval,
},
);
+ const workers = useMemo(() => data?.workers ?? [], [data?.workers]);
+
+ useEffect(() => {
+ setSelectedWorkerNames((previousSelectedWorkers) => {
+ const availableWorkerNames = new Set(workers.map((worker) =>
worker.worker_name));
+ const nextSelectedWorkers = new Set(
+ [...previousSelectedWorkers].filter((workerName) =>
availableWorkerNames.has(workerName)),
+ );
+
+ if (nextSelectedWorkers.size === previousSelectedWorkers.size) {
+ return previousSelectedWorkers;
+ }
+
+ return nextSelectedWorkers;
+ });
+ }, [workers]);
const handleWorkerSearchChange = (value: string) => {
setWorkerNamePattern(value);
@@ -69,6 +100,41 @@ export const WorkerPage = () => {
setFilteredState(value.filter((state) => state !== "all"));
}
}, []);
+ const selectedWorkers = useMemo<Array<Worker>>(
+ () => workers.filter((worker) =>
selectedWorkerNames.has(worker.worker_name)),
+ [selectedWorkerNames, workers],
+ );
+ const selectedWorkersCount = selectedWorkers.length;
+ const allWorkersSelected = workers.length > 0 && selectedWorkersCount ===
workers.length;
+ const someWorkersSelected = selectedWorkersCount > 0 && !allWorkersSelected;
+
+ const handleWorkerSelect = useCallback((workerName: string, selected:
boolean) => {
+ setSelectedWorkerNames((previousSelectedWorkers) => {
+ const nextSelectedWorkers = new Set(previousSelectedWorkers);
+ if (selected) {
+ nextSelectedWorkers.add(workerName);
+ } else {
+ nextSelectedWorkers.delete(workerName);
+ }
+
+ return nextSelectedWorkers;
+ });
+ }, []);
+
+ const handleSelectAllWorkers = useCallback(
+ (selected: boolean) => {
+ if (selected) {
+ setSelectedWorkerNames(new Set(workers.map((worker) =>
worker.worker_name)));
+ } else {
+ setSelectedWorkerNames(new Set());
+ }
+ },
+ [workers],
+ );
+
+ const clearSelections = useCallback(() => {
+ setSelectedWorkerNames(new Set());
+ }, []);
return (
<Box p={2}>
@@ -99,7 +165,7 @@ export const WorkerPage = () => {
<Select.Trigger
{...(hasFilteredState ? { clearable: true } : {})}
colorPalette="brand"
- isActive={Boolean(filteredState)}
+ isActive={hasFilteredState}
>
<Select.ValueText>
{() =>
@@ -136,11 +202,21 @@ export const WorkerPage = () => {
<Text as="div" pl={2} pt={1}>
Loading...
</Text>
- ) : data.workers && data.workers.length > 0 ? (
+ ) : workers.length > 0 ? (
<>
<Table.Root size="sm" interactive stickyHeader striped>
<Table.Header>
<Table.Row>
+ <Table.ColumnHeader width="44px">
+ <ChakraCheckbox.Root
+ checked={allWorkersSelected ? true : someWorkersSelected ?
"indeterminate" : false}
+ colorPalette="brand"
+ onCheckedChange={(event) =>
handleSelectAllWorkers(event.checked === true)}
+ >
+ <ChakraCheckbox.HiddenInput />
+ <ChakraCheckbox.Control borderWidth={1} />
+ </ChakraCheckbox.Root>
+ </Table.ColumnHeader>
<Table.ColumnHeader>Worker Name</Table.ColumnHeader>
<Table.ColumnHeader>State</Table.ColumnHeader>
<Table.ColumnHeader>Queues</Table.ColumnHeader>
@@ -152,8 +228,20 @@ export const WorkerPage = () => {
</Table.Row>
</Table.Header>
<Table.Body>
- {data.workers.map((worker) => (
+ {workers.map((worker) => (
<Table.Row key={worker.worker_name} id={worker.worker_name}>
+ <Table.Cell>
+ <ChakraCheckbox.Root
+ checked={selectedWorkerNames.has(worker.worker_name)}
+ colorPalette="brand"
+ onCheckedChange={(event) =>
+ handleWorkerSelect(worker.worker_name, event.checked
=== true)
+ }
+ >
+ <ChakraCheckbox.HiddenInput />
+ <ChakraCheckbox.Control borderWidth={1} />
+ </ChakraCheckbox.Root>
+ </Table.Cell>
<Table.Cell>{worker.worker_name}</Table.Cell>
<Table.Cell>
<WorkerStateBadge
state={worker.state}>{worker.state}</WorkerStateBadge>
@@ -222,6 +310,26 @@ export const WorkerPage = () => {
how to deploy a new worker.
</Text>
)}
+ <ActionBar.Root closeOnInteractOutside={false}
open={Boolean(selectedWorkersCount)}>
+ <Portal>
+ <ActionBar.Positioner>
+ <ActionBar.Content>
+ <ActionBar.SelectionTrigger>
+ {selectedWorkersCount} selected
+ </ActionBar.SelectionTrigger>
+ <ActionBar.Separator />
+ <BulkWorkerOperations
+ onClearSelection={clearSelections}
+ onOperations={refetch}
+ selectedWorkers={selectedWorkers}
+ />
+ <ActionBar.CloseTrigger asChild onClick={clearSelections}>
+ <CloseButton size="sm" />
+ </ActionBar.CloseTrigger>
+ </ActionBar.Content>
+ </ActionBar.Positioner>
+ </Portal>
+ </ActionBar.Root>
</Box>
);
};