This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cb40ffb38d8 Add clear dag run to UI (#45039)
cb40ffb38d8 is described below
commit cb40ffb38d8706202776dc344744d874b9aba2fa
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Thu Dec 19 22:15:27 2024 +0800
Add clear dag run to UI (#45039)
* Add clear dag run to UI
More code
* Small adjustments
* Update following code reviews
---
airflow/api_fastapi/core_api/datamodels/dag_run.py | 2 +-
.../api_fastapi/core_api/openapi/v1-generated.yaml | 4 +-
airflow/ui/openapi-gen/requests/schemas.gen.ts | 9 +-
airflow/ui/openapi-gen/requests/types.gen.ts | 2 +-
.../ui/src/components/ClearRun/ClearRunButton.tsx | 84 +++++++++++++
.../ui/src/components/ClearRun/ClearRunDialog.tsx | 136 +++++++++++++++++++++
.../components/ClearRun/ClearRunTaskAccordion.tsx | 107 ++++++++++++++++
airflow/ui/src/components/ClearRun/index.tsx | 20 +++
airflow/ui/src/components/ui/SegmentedControl.tsx | 61 +++++++++
airflow/ui/src/pages/Run/Header.tsx | 4 +
airflow/ui/src/queries/useClearRun.ts | 85 +++++++++++++
11 files changed, 501 insertions(+), 13 deletions(-)
diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py
b/airflow/api_fastapi/core_api/datamodels/dag_run.py
index c24b42b109d..48c92d2a83c 100644
--- a/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -54,7 +54,7 @@ class DAGRunClearBody(BaseModel):
class DAGRunResponse(BaseModel):
"""DAG Run serializer for responses."""
- dag_run_id: str | None = Field(validation_alias="run_id")
+ dag_run_id: str = Field(validation_alias="run_id")
dag_id: str
logical_date: datetime | None
queued_at: datetime | None
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 980b045e34d..cb1b88de54f 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -7097,9 +7097,7 @@ components:
DAGRunResponse:
properties:
dag_run_id:
- anyOf:
- - type: string
- - type: 'null'
+ type: string
title: Dag Run Id
dag_id:
type: string
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index defc76ec734..501f76d0dea 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1723,14 +1723,7 @@ export const $DAGRunPatchStates = {
export const $DAGRunResponse = {
properties: {
dag_run_id: {
- anyOf: [
- {
- type: "string",
- },
- {
- type: "null",
- },
- ],
+ type: "string",
title: "Dag Run Id",
},
dag_id: {
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 9aeda695c35..f7f0c32a2bc 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -402,7 +402,7 @@ export type DAGRunPatchStates = "queued" | "success" |
"failed";
* DAG Run serializer for responses.
*/
export type DAGRunResponse = {
- dag_run_id: string | null;
+ dag_run_id: string;
dag_id: string;
logical_date: string | null;
queued_at: string | null;
diff --git a/airflow/ui/src/components/ClearRun/ClearRunButton.tsx
b/airflow/ui/src/components/ClearRun/ClearRunButton.tsx
new file mode 100644
index 00000000000..7c3060765cc
--- /dev/null
+++ b/airflow/ui/src/components/ClearRun/ClearRunButton.tsx
@@ -0,0 +1,84 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Box, useDisclosure } from "@chakra-ui/react";
+import { useState } from "react";
+import { FiRefreshCw } from "react-icons/fi";
+
+import type { TaskInstanceCollectionResponse } from
"openapi/requests/types.gen";
+import { Button } from "src/components/ui";
+import { useClearDagRun } from "src/queries/useClearRun";
+
+import ClearRunDialog from "./ClearRunDialog";
+
+type Props = {
+ readonly dagId: string;
+ readonly dagRunId: string;
+};
+
+const ClearRunButton = ({ dagId, dagRunId }: Props) => {
+ const { onClose, onOpen, open } = useDisclosure();
+
+ const [onlyFailed, setOnlyFailed] = useState(false);
+
+ const [affectedTasks, setAffectedTasks] =
+ useState<TaskInstanceCollectionResponse>({
+ task_instances: [],
+ total_entries: 0,
+ });
+
+ const { isPending, mutate } = useClearDagRun({
+ dagId,
+ dagRunId,
+ onSuccessConfirm: onClose,
+ onSuccessDryRun: setAffectedTasks,
+ });
+
+ return (
+ <Box>
+ <Button
+ onClick={() => {
+ onOpen();
+ mutate({
+ dagId,
+ dagRunId,
+ requestBody: { dry_run: true, only_failed: onlyFailed },
+ });
+ }}
+ variant="outline"
+ >
+ <FiRefreshCw height={5} width={5} />
+ Clear Run
+ </Button>
+
+ <ClearRunDialog
+ affectedTasks={affectedTasks}
+ dagId={dagId}
+ dagRunId={dagRunId}
+ isPending={isPending}
+ mutate={mutate}
+ onClose={onClose}
+ onlyFailed={onlyFailed}
+ open={open}
+ setOnlyFailed={setOnlyFailed}
+ />
+ </Box>
+ );
+};
+
+export default ClearRunButton;
diff --git a/airflow/ui/src/components/ClearRun/ClearRunDialog.tsx
b/airflow/ui/src/components/ClearRun/ClearRunDialog.tsx
new file mode 100644
index 00000000000..1bab6372749
--- /dev/null
+++ b/airflow/ui/src/components/ClearRun/ClearRunDialog.tsx
@@ -0,0 +1,136 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Flex, Heading, VStack } from "@chakra-ui/react";
+import { FiRefreshCw } from "react-icons/fi";
+
+import type {
+ DAGRunClearBody,
+ TaskInstanceCollectionResponse,
+} from "openapi/requests/types.gen";
+import { Button, Dialog } from "src/components/ui";
+
+import SegmentedControl from "../ui/SegmentedControl";
+import ClearRunTasksAccordion from "./ClearRunTaskAccordion";
+
+type Props = {
+ readonly affectedTasks: TaskInstanceCollectionResponse;
+ readonly dagId: string;
+ readonly dagRunId: string;
+ readonly isPending: boolean;
+ readonly mutate: ({
+ dagId,
+ dagRunId,
+ requestBody,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ requestBody: DAGRunClearBody;
+ }) => void;
+ readonly onClose: () => void;
+ readonly onlyFailed: boolean;
+ readonly open: boolean;
+ readonly setOnlyFailed: (value: boolean) => void;
+};
+
+const ClearRunDialog = ({
+ affectedTasks,
+ dagId,
+ dagRunId,
+ isPending,
+ mutate,
+ onClose,
+ onlyFailed,
+ open,
+ setOnlyFailed,
+}: Props) => {
+ const onChange = (value: string) => {
+ switch (value) {
+ case "existing_tasks":
+ setOnlyFailed(false);
+ mutate({
+ dagId,
+ dagRunId,
+ requestBody: { dry_run: true, only_failed: false },
+ });
+ break;
+ case "only_failed":
+ setOnlyFailed(true);
+ mutate({
+ dagId,
+ dagRunId,
+ requestBody: { dry_run: true, only_failed: true },
+ });
+ break;
+ default:
+ // TODO: Handle this `new_tasks` case
+ break;
+ }
+ };
+
+ return (
+ <Dialog.Root onOpenChange={onClose} open={open} size="xl">
+ <Dialog.Content backdrop>
+ <Dialog.Header>
+ <VStack align="start" gap={4}>
+ <Heading size="xl">Clear DagRun - {dagRunId} </Heading>
+ </VStack>
+ </Dialog.Header>
+
+ <Dialog.CloseTrigger />
+
+ <Dialog.Body width="full">
+ <Flex justifyContent="center">
+ <SegmentedControl
+ mb={3}
+ onValueChange={onChange}
+ options={[
+ { label: "Clear existing tasks", value: "existing_tasks" },
+ { label: "Clear only failed tasks", value: "only_failed" },
+ {
+ disabled: true,
+ label: "Queued up new tasks",
+ value: "new_tasks",
+ },
+ ]}
+ value={onlyFailed ? "only_failed" : "existing_tasks"}
+ />
+ </Flex>
+ <ClearRunTasksAccordion affectedTasks={affectedTasks} />
+ <Flex justifyContent="end" mt={3}>
+ <Button
+ colorPalette="blue"
+ loading={isPending}
+ onClick={() => {
+ mutate({
+ dagId,
+ dagRunId,
+ requestBody: { dry_run: false, only_failed: onlyFailed },
+ });
+ }}
+ >
+ <FiRefreshCw /> Confirm
+ </Button>
+ </Flex>
+ </Dialog.Body>
+ </Dialog.Content>
+ </Dialog.Root>
+ );
+};
+
+export default ClearRunDialog;
diff --git a/airflow/ui/src/components/ClearRun/ClearRunTaskAccordion.tsx
b/airflow/ui/src/components/ClearRun/ClearRunTaskAccordion.tsx
new file mode 100644
index 00000000000..c579fa4d6fa
--- /dev/null
+++ b/airflow/ui/src/components/ClearRun/ClearRunTaskAccordion.tsx
@@ -0,0 +1,107 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Box, Text } from "@chakra-ui/react";
+import { Link } from "@chakra-ui/react";
+import type { ColumnDef } from "@tanstack/react-table";
+import { Link as RouterLink } from "react-router-dom";
+
+import type {
+ TaskInstanceCollectionResponse,
+ TaskInstanceResponse,
+} from "openapi/requests/types.gen";
+import { DataTable } from "src/components/DataTable";
+import { Status } from "src/components/ui";
+
+import { Accordion } from "../ui";
+
+const columns: Array<ColumnDef<TaskInstanceResponse>> = [
+ {
+ accessorKey: "task_display_name",
+ cell: ({ row: { original } }) => (
+ <Link asChild color="fg.info" fontWeight="bold">
+ <RouterLink
+
to={`/dags/${original.dag_id}/runs/${original.dag_run_id}/tasks/${original.task_id}${original.map_index
> -1 ? `?map_index=${original.map_index}` : ""}`}
+ >
+ {original.task_display_name}
+ </RouterLink>
+ </Link>
+ ),
+ enableSorting: false,
+ header: "Task ID",
+ },
+ {
+ accessorKey: "state",
+ cell: ({
+ row: {
+ original: { state },
+ },
+ }) => <Status state={state}>{state}</Status>,
+ enableSorting: false,
+ header: () => "State",
+ },
+ {
+ accessorKey: "map_index",
+ enableSorting: false,
+ header: "Map Index",
+ },
+
+ {
+ accessorKey: "dag_run_id",
+ enableSorting: false,
+ header: "Run Id",
+ },
+];
+
+type Props = {
+ readonly affectedTasks?: TaskInstanceCollectionResponse;
+};
+
+// Table is in memory, pagination and sorting are disabled.
+// TODO: Make a front-end only unconnected table component with client side
ordering and pagination
+const ClearRunTasksAccordion = ({ affectedTasks }: Props) => (
+ <Accordion.Root collapsible variant="enclosed">
+ <Accordion.Item key="tasks" value="tasks">
+ <Accordion.ItemTrigger>
+ <Text fontWeight="bold">
+ Affected Tasks: {affectedTasks?.total_entries ?? 0}
+ </Text>
+ </Accordion.ItemTrigger>
+ <Accordion.ItemContent>
+ <Box maxH="400px" overflowY="scroll">
+ <DataTable
+ columns={columns}
+ data={affectedTasks?.task_instances ?? []}
+ displayMode="table"
+ initialState={{
+ pagination: {
+ pageIndex: 0,
+ pageSize: affectedTasks?.total_entries ?? 0,
+ },
+ sorting: [],
+ }}
+ modelName="Task Instance"
+ total={affectedTasks?.total_entries}
+ />
+ </Box>
+ </Accordion.ItemContent>
+ </Accordion.Item>
+ </Accordion.Root>
+);
+
+export default ClearRunTasksAccordion;
diff --git a/airflow/ui/src/components/ClearRun/index.tsx
b/airflow/ui/src/components/ClearRun/index.tsx
new file mode 100644
index 00000000000..1e9362a6264
--- /dev/null
+++ b/airflow/ui/src/components/ClearRun/index.tsx
@@ -0,0 +1,20 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+export { default } from "./ClearRunButton";
diff --git a/airflow/ui/src/components/ui/SegmentedControl.tsx
b/airflow/ui/src/components/ui/SegmentedControl.tsx
new file mode 100644
index 00000000000..8c972eb35ed
--- /dev/null
+++ b/airflow/ui/src/components/ui/SegmentedControl.tsx
@@ -0,0 +1,61 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Tabs, For, type TabsRootProps } from "@chakra-ui/react";
+
+type Option = {
+ disabled?: boolean;
+ label: string;
+ value: string;
+};
+
+type SegmentedControlProps = {
+ readonly onValueChange: (value: string) => void;
+ readonly options: Array<Option>;
+ readonly value: string;
+} & Omit<TabsRootProps, "onValueChange">;
+
+const SegmentedControl = ({
+ onValueChange,
+ options,
+ value,
+ ...rest
+}: SegmentedControlProps) => (
+ <Tabs.Root
+ defaultValue={value}
+ onValueChange={(option) => onValueChange(option.value)}
+ variant="enclosed"
+ {...rest}
+ >
+ <Tabs.List>
+ <For each={options}>
+ {(option) => (
+ <Tabs.Trigger
+ disabled={option.disabled}
+ key={option.value}
+ value={option.value}
+ >
+ {option.label}
+ </Tabs.Trigger>
+ )}
+ </For>
+ </Tabs.List>
+ </Tabs.Root>
+);
+
+export default SegmentedControl;
diff --git a/airflow/ui/src/pages/Run/Header.tsx
b/airflow/ui/src/pages/Run/Header.tsx
index eeab7e73efa..6d439d2f8fc 100644
--- a/airflow/ui/src/pages/Run/Header.tsx
+++ b/airflow/ui/src/pages/Run/Header.tsx
@@ -22,6 +22,7 @@ import { FiBarChart } from "react-icons/fi";
import { MdOutlineModeComment } from "react-icons/md";
import type { DAGRunResponse } from "openapi/requests/types.gen";
+import ClearRunButton from "src/components/ClearRun";
import { RunTypeIcon } from "src/components/RunTypeIcon";
import { Stat } from "src/components/Stat";
import Time from "src/components/Time";
@@ -41,6 +42,9 @@ export const Header = ({ dagRun }: { readonly dagRun:
DAGRunResponse }) => (
<div />
</Flex>
</HStack>
+ <HStack>
+ <ClearRunButton dagId={dagRun.dag_id} dagRunId={dagRun.dag_run_id} />
+ </HStack>
</Flex>
{dagRun.note === null || dagRun.note.length === 0 ? undefined : (
<Flex alignItems="flex-start" justifyContent="space-between" mr={16}>
diff --git a/airflow/ui/src/queries/useClearRun.ts
b/airflow/ui/src/queries/useClearRun.ts
new file mode 100644
index 00000000000..9efcb7ae037
--- /dev/null
+++ b/airflow/ui/src/queries/useClearRun.ts
@@ -0,0 +1,85 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useQueryClient } from "@tanstack/react-query";
+
+import {
+ useDagRunServiceClearDagRun,
+ UseDagRunServiceGetDagRunKeyFn,
+ UseDagServiceGetDagDetailsKeyFn,
+ useTaskInstanceServiceGetTaskInstancesKey,
+} from "openapi/queries";
+import type {
+ DAGRunClearBody,
+ TaskInstanceCollectionResponse,
+} from "openapi/requests/types.gen";
+import { toaster } from "src/components/ui";
+
+const onError = () => {
+ toaster.create({
+ description: "Clear Dag Run request failed.",
+ title: "Failed to clear the Dag Run",
+ type: "error",
+ });
+};
+
+export const useClearDagRun = ({
+ dagId,
+ dagRunId,
+ onSuccessConfirm,
+ onSuccessDryRun,
+}: {
+ dagId: string;
+ dagRunId: string;
+ onSuccessConfirm: () => void;
+ onSuccessDryRun: (date: TaskInstanceCollectionResponse) => void;
+}) => {
+ const queryClient = useQueryClient();
+
+ const onSuccess = async (
+ data: TaskInstanceCollectionResponse,
+ variables: {
+ dagId: string;
+ dagRunId: string;
+ requestBody: DAGRunClearBody;
+ },
+ ) => {
+ if (variables.requestBody.dry_run) {
+ onSuccessDryRun(data);
+ } else {
+ const queryKeys = [
+ [useTaskInstanceServiceGetTaskInstancesKey],
+ UseDagServiceGetDagDetailsKeyFn({ dagId }),
+ UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
+ ];
+
+ await Promise.all(
+ queryKeys.map((key) =>
+ queryClient.invalidateQueries({ queryKey: key }),
+ ),
+ );
+
+ onSuccessConfirm();
+ }
+ };
+
+ return useDagRunServiceClearDagRun({
+ onError,
+ onSuccess,
+ });
+};