This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cb40ffb38d8 Add clear dag run to UI (#45039)
cb40ffb38d8 is described below

commit cb40ffb38d8706202776dc344744d874b9aba2fa
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Thu Dec 19 22:15:27 2024 +0800

    Add clear dag run to UI (#45039)
    
    * Add clear dag run to UI
    
    More code
    
    * Small adjustments
    
    * Update following code reviews
---
 airflow/api_fastapi/core_api/datamodels/dag_run.py |   2 +-
 .../api_fastapi/core_api/openapi/v1-generated.yaml |   4 +-
 airflow/ui/openapi-gen/requests/schemas.gen.ts     |   9 +-
 airflow/ui/openapi-gen/requests/types.gen.ts       |   2 +-
 .../ui/src/components/ClearRun/ClearRunButton.tsx  |  84 +++++++++++++
 .../ui/src/components/ClearRun/ClearRunDialog.tsx  | 136 +++++++++++++++++++++
 .../components/ClearRun/ClearRunTaskAccordion.tsx  | 107 ++++++++++++++++
 airflow/ui/src/components/ClearRun/index.tsx       |  20 +++
 airflow/ui/src/components/ui/SegmentedControl.tsx  |  61 +++++++++
 airflow/ui/src/pages/Run/Header.tsx                |   4 +
 airflow/ui/src/queries/useClearRun.ts              |  85 +++++++++++++
 11 files changed, 501 insertions(+), 13 deletions(-)

diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py 
b/airflow/api_fastapi/core_api/datamodels/dag_run.py
index c24b42b109d..48c92d2a83c 100644
--- a/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -54,7 +54,7 @@ class DAGRunClearBody(BaseModel):
 class DAGRunResponse(BaseModel):
     """DAG Run serializer for responses."""
 
-    dag_run_id: str | None = Field(validation_alias="run_id")
+    dag_run_id: str = Field(validation_alias="run_id")
     dag_id: str
     logical_date: datetime | None
     queued_at: datetime | None
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 980b045e34d..cb1b88de54f 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -7097,9 +7097,7 @@ components:
     DAGRunResponse:
       properties:
         dag_run_id:
-          anyOf:
-          - type: string
-          - type: 'null'
+          type: string
           title: Dag Run Id
         dag_id:
           type: string
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index defc76ec734..501f76d0dea 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1723,14 +1723,7 @@ export const $DAGRunPatchStates = {
 export const $DAGRunResponse = {
   properties: {
     dag_run_id: {
-      anyOf: [
-        {
-          type: "string",
-        },
-        {
-          type: "null",
-        },
-      ],
+      type: "string",
       title: "Dag Run Id",
     },
     dag_id: {
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 9aeda695c35..f7f0c32a2bc 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -402,7 +402,7 @@ export type DAGRunPatchStates = "queued" | "success" | 
"failed";
  * DAG Run serializer for responses.
  */
 export type DAGRunResponse = {
-  dag_run_id: string | null;
+  dag_run_id: string;
   dag_id: string;
   logical_date: string | null;
   queued_at: string | null;
diff --git a/airflow/ui/src/components/ClearRun/ClearRunButton.tsx 
b/airflow/ui/src/components/ClearRun/ClearRunButton.tsx
new file mode 100644
index 00000000000..7c3060765cc
--- /dev/null
+++ b/airflow/ui/src/components/ClearRun/ClearRunButton.tsx
@@ -0,0 +1,84 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Box, useDisclosure } from "@chakra-ui/react";
+import { useState } from "react";
+import { FiRefreshCw } from "react-icons/fi";
+
+import type { TaskInstanceCollectionResponse } from 
"openapi/requests/types.gen";
+import { Button } from "src/components/ui";
+import { useClearDagRun } from "src/queries/useClearRun";
+
+import ClearRunDialog from "./ClearRunDialog";
+
+type Props = {
+  readonly dagId: string;
+  readonly dagRunId: string;
+};
+
+const ClearRunButton = ({ dagId, dagRunId }: Props) => {
+  const { onClose, onOpen, open } = useDisclosure();
+
+  const [onlyFailed, setOnlyFailed] = useState(false);
+
+  const [affectedTasks, setAffectedTasks] =
+    useState<TaskInstanceCollectionResponse>({
+      task_instances: [],
+      total_entries: 0,
+    });
+
+  const { isPending, mutate } = useClearDagRun({
+    dagId,
+    dagRunId,
+    onSuccessConfirm: onClose,
+    onSuccessDryRun: setAffectedTasks,
+  });
+
+  return (
+    <Box>
+      <Button
+        onClick={() => {
+          onOpen();
+          mutate({
+            dagId,
+            dagRunId,
+            requestBody: { dry_run: true, only_failed: onlyFailed },
+          });
+        }}
+        variant="outline"
+      >
+        <FiRefreshCw height={5} width={5} />
+        Clear Run
+      </Button>
+
+      <ClearRunDialog
+        affectedTasks={affectedTasks}
+        dagId={dagId}
+        dagRunId={dagRunId}
+        isPending={isPending}
+        mutate={mutate}
+        onClose={onClose}
+        onlyFailed={onlyFailed}
+        open={open}
+        setOnlyFailed={setOnlyFailed}
+      />
+    </Box>
+  );
+};
+
+export default ClearRunButton;
diff --git a/airflow/ui/src/components/ClearRun/ClearRunDialog.tsx 
b/airflow/ui/src/components/ClearRun/ClearRunDialog.tsx
new file mode 100644
index 00000000000..1bab6372749
--- /dev/null
+++ b/airflow/ui/src/components/ClearRun/ClearRunDialog.tsx
@@ -0,0 +1,136 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Flex, Heading, VStack } from "@chakra-ui/react";
+import { FiRefreshCw } from "react-icons/fi";
+
+import type {
+  DAGRunClearBody,
+  TaskInstanceCollectionResponse,
+} from "openapi/requests/types.gen";
+import { Button, Dialog } from "src/components/ui";
+
+import SegmentedControl from "../ui/SegmentedControl";
+import ClearRunTasksAccordion from "./ClearRunTaskAccordion";
+
+type Props = {
+  readonly affectedTasks: TaskInstanceCollectionResponse;
+  readonly dagId: string;
+  readonly dagRunId: string;
+  readonly isPending: boolean;
+  readonly mutate: ({
+    dagId,
+    dagRunId,
+    requestBody,
+  }: {
+    dagId: string;
+    dagRunId: string;
+    requestBody: DAGRunClearBody;
+  }) => void;
+  readonly onClose: () => void;
+  readonly onlyFailed: boolean;
+  readonly open: boolean;
+  readonly setOnlyFailed: (value: boolean) => void;
+};
+
+const ClearRunDialog = ({
+  affectedTasks,
+  dagId,
+  dagRunId,
+  isPending,
+  mutate,
+  onClose,
+  onlyFailed,
+  open,
+  setOnlyFailed,
+}: Props) => {
+  const onChange = (value: string) => {
+    switch (value) {
+      case "existing_tasks":
+        setOnlyFailed(false);
+        mutate({
+          dagId,
+          dagRunId,
+          requestBody: { dry_run: true, only_failed: false },
+        });
+        break;
+      case "only_failed":
+        setOnlyFailed(true);
+        mutate({
+          dagId,
+          dagRunId,
+          requestBody: { dry_run: true, only_failed: true },
+        });
+        break;
+      default:
+        // TODO: Handle this `new_tasks` case
+        break;
+    }
+  };
+
+  return (
+    <Dialog.Root onOpenChange={onClose} open={open} size="xl">
+      <Dialog.Content backdrop>
+        <Dialog.Header>
+          <VStack align="start" gap={4}>
+            <Heading size="xl">Clear DagRun - {dagRunId} </Heading>
+          </VStack>
+        </Dialog.Header>
+
+        <Dialog.CloseTrigger />
+
+        <Dialog.Body width="full">
+          <Flex justifyContent="center">
+            <SegmentedControl
+              mb={3}
+              onValueChange={onChange}
+              options={[
+                { label: "Clear existing tasks", value: "existing_tasks" },
+                { label: "Clear only failed tasks", value: "only_failed" },
+                {
+                  disabled: true,
+                  label: "Queued up new tasks",
+                  value: "new_tasks",
+                },
+              ]}
+              value={onlyFailed ? "only_failed" : "existing_tasks"}
+            />
+          </Flex>
+          <ClearRunTasksAccordion affectedTasks={affectedTasks} />
+          <Flex justifyContent="end" mt={3}>
+            <Button
+              colorPalette="blue"
+              loading={isPending}
+              onClick={() => {
+                mutate({
+                  dagId,
+                  dagRunId,
+                  requestBody: { dry_run: false, only_failed: onlyFailed },
+                });
+              }}
+            >
+              <FiRefreshCw /> Confirm
+            </Button>
+          </Flex>
+        </Dialog.Body>
+      </Dialog.Content>
+    </Dialog.Root>
+  );
+};
+
+export default ClearRunDialog;
diff --git a/airflow/ui/src/components/ClearRun/ClearRunTaskAccordion.tsx 
b/airflow/ui/src/components/ClearRun/ClearRunTaskAccordion.tsx
new file mode 100644
index 00000000000..c579fa4d6fa
--- /dev/null
+++ b/airflow/ui/src/components/ClearRun/ClearRunTaskAccordion.tsx
@@ -0,0 +1,107 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Box, Text } from "@chakra-ui/react";
+import { Link } from "@chakra-ui/react";
+import type { ColumnDef } from "@tanstack/react-table";
+import { Link as RouterLink } from "react-router-dom";
+
+import type {
+  TaskInstanceCollectionResponse,
+  TaskInstanceResponse,
+} from "openapi/requests/types.gen";
+import { DataTable } from "src/components/DataTable";
+import { Status } from "src/components/ui";
+
+import { Accordion } from "../ui";
+
+const columns: Array<ColumnDef<TaskInstanceResponse>> = [
+  {
+    accessorKey: "task_display_name",
+    cell: ({ row: { original } }) => (
+      <Link asChild color="fg.info" fontWeight="bold">
+        <RouterLink
+          
to={`/dags/${original.dag_id}/runs/${original.dag_run_id}/tasks/${original.task_id}${original.map_index
 > -1 ? `?map_index=${original.map_index}` : ""}`}
+        >
+          {original.task_display_name}
+        </RouterLink>
+      </Link>
+    ),
+    enableSorting: false,
+    header: "Task ID",
+  },
+  {
+    accessorKey: "state",
+    cell: ({
+      row: {
+        original: { state },
+      },
+    }) => <Status state={state}>{state}</Status>,
+    enableSorting: false,
+    header: () => "State",
+  },
+  {
+    accessorKey: "map_index",
+    enableSorting: false,
+    header: "Map Index",
+  },
+
+  {
+    accessorKey: "dag_run_id",
+    enableSorting: false,
+    header: "Run Id",
+  },
+];
+
+type Props = {
+  readonly affectedTasks?: TaskInstanceCollectionResponse;
+};
+
+// Table is in memory, pagination and sorting are disabled.
+// TODO: Make a front-end only unconnected table component with client side 
ordering and pagination
+const ClearRunTasksAccordion = ({ affectedTasks }: Props) => (
+  <Accordion.Root collapsible variant="enclosed">
+    <Accordion.Item key="tasks" value="tasks">
+      <Accordion.ItemTrigger>
+        <Text fontWeight="bold">
+          Affected Tasks: {affectedTasks?.total_entries ?? 0}
+        </Text>
+      </Accordion.ItemTrigger>
+      <Accordion.ItemContent>
+        <Box maxH="400px" overflowY="scroll">
+          <DataTable
+            columns={columns}
+            data={affectedTasks?.task_instances ?? []}
+            displayMode="table"
+            initialState={{
+              pagination: {
+                pageIndex: 0,
+                pageSize: affectedTasks?.total_entries ?? 0,
+              },
+              sorting: [],
+            }}
+            modelName="Task Instance"
+            total={affectedTasks?.total_entries}
+          />
+        </Box>
+      </Accordion.ItemContent>
+    </Accordion.Item>
+  </Accordion.Root>
+);
+
+export default ClearRunTasksAccordion;
diff --git a/airflow/ui/src/components/ClearRun/index.tsx 
b/airflow/ui/src/components/ClearRun/index.tsx
new file mode 100644
index 00000000000..1e9362a6264
--- /dev/null
+++ b/airflow/ui/src/components/ClearRun/index.tsx
@@ -0,0 +1,20 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+export { default } from "./ClearRunButton";
diff --git a/airflow/ui/src/components/ui/SegmentedControl.tsx 
b/airflow/ui/src/components/ui/SegmentedControl.tsx
new file mode 100644
index 00000000000..8c972eb35ed
--- /dev/null
+++ b/airflow/ui/src/components/ui/SegmentedControl.tsx
@@ -0,0 +1,61 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Tabs, For, type TabsRootProps } from "@chakra-ui/react";
+
+type Option = {
+  disabled?: boolean;
+  label: string;
+  value: string;
+};
+
+type SegmentedControlProps = {
+  readonly onValueChange: (value: string) => void;
+  readonly options: Array<Option>;
+  readonly value: string;
+} & Omit<TabsRootProps, "onValueChange">;
+
+const SegmentedControl = ({
+  onValueChange,
+  options,
+  value,
+  ...rest
+}: SegmentedControlProps) => (
+  <Tabs.Root
+    defaultValue={value}
+    onValueChange={(option) => onValueChange(option.value)}
+    variant="enclosed"
+    {...rest}
+  >
+    <Tabs.List>
+      <For each={options}>
+        {(option) => (
+          <Tabs.Trigger
+            disabled={option.disabled}
+            key={option.value}
+            value={option.value}
+          >
+            {option.label}
+          </Tabs.Trigger>
+        )}
+      </For>
+    </Tabs.List>
+  </Tabs.Root>
+);
+
+export default SegmentedControl;
diff --git a/airflow/ui/src/pages/Run/Header.tsx 
b/airflow/ui/src/pages/Run/Header.tsx
index eeab7e73efa..6d439d2f8fc 100644
--- a/airflow/ui/src/pages/Run/Header.tsx
+++ b/airflow/ui/src/pages/Run/Header.tsx
@@ -22,6 +22,7 @@ import { FiBarChart } from "react-icons/fi";
 import { MdOutlineModeComment } from "react-icons/md";
 
 import type { DAGRunResponse } from "openapi/requests/types.gen";
+import ClearRunButton from "src/components/ClearRun";
 import { RunTypeIcon } from "src/components/RunTypeIcon";
 import { Stat } from "src/components/Stat";
 import Time from "src/components/Time";
@@ -41,6 +42,9 @@ export const Header = ({ dagRun }: { readonly dagRun: 
DAGRunResponse }) => (
           <div />
         </Flex>
       </HStack>
+      <HStack>
+        <ClearRunButton dagId={dagRun.dag_id} dagRunId={dagRun.dag_run_id} />
+      </HStack>
     </Flex>
     {dagRun.note === null || dagRun.note.length === 0 ? undefined : (
       <Flex alignItems="flex-start" justifyContent="space-between" mr={16}>
diff --git a/airflow/ui/src/queries/useClearRun.ts 
b/airflow/ui/src/queries/useClearRun.ts
new file mode 100644
index 00000000000..9efcb7ae037
--- /dev/null
+++ b/airflow/ui/src/queries/useClearRun.ts
@@ -0,0 +1,85 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useQueryClient } from "@tanstack/react-query";
+
+import {
+  useDagRunServiceClearDagRun,
+  UseDagRunServiceGetDagRunKeyFn,
+  UseDagServiceGetDagDetailsKeyFn,
+  useTaskInstanceServiceGetTaskInstancesKey,
+} from "openapi/queries";
+import type {
+  DAGRunClearBody,
+  TaskInstanceCollectionResponse,
+} from "openapi/requests/types.gen";
+import { toaster } from "src/components/ui";
+
+const onError = () => {
+  toaster.create({
+    description: "Clear Dag Run request failed.",
+    title: "Failed to clear the Dag Run",
+    type: "error",
+  });
+};
+
+export const useClearDagRun = ({
+  dagId,
+  dagRunId,
+  onSuccessConfirm,
+  onSuccessDryRun,
+}: {
+  dagId: string;
+  dagRunId: string;
+  onSuccessConfirm: () => void;
+  onSuccessDryRun: (date: TaskInstanceCollectionResponse) => void;
+}) => {
+  const queryClient = useQueryClient();
+
+  const onSuccess = async (
+    data: TaskInstanceCollectionResponse,
+    variables: {
+      dagId: string;
+      dagRunId: string;
+      requestBody: DAGRunClearBody;
+    },
+  ) => {
+    if (variables.requestBody.dry_run) {
+      onSuccessDryRun(data);
+    } else {
+      const queryKeys = [
+        [useTaskInstanceServiceGetTaskInstancesKey],
+        UseDagServiceGetDagDetailsKeyFn({ dagId }),
+        UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
+      ];
+
+      await Promise.all(
+        queryKeys.map((key) =>
+          queryClient.invalidateQueries({ queryKey: key }),
+        ),
+      );
+
+      onSuccessConfirm();
+    }
+  };
+
+  return useDagRunServiceClearDagRun({
+    onError,
+    onSuccess,
+  });
+};

Reply via email to