This is an automated email from the ASF dual-hosted git repository.

shubhamraj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 43945831cef Clear, Mark Success/Fail and delete multiple Task 
Instances (#64141)
43945831cef is described below

commit 43945831cef8091a7cf63be1d16a98a499554359
Author: Shubham Raj <[email protected]>
AuthorDate: Fri Mar 27 00:45:04 2026 +0530

    Clear, Mark Success/Fail and delete multiple Task Instances (#64141)
---
 .../core_api/datamodels/task_instances.py          |   1 +
 .../core_api/openapi/v2-rest-api-generated.yaml    |   6 +
 .../core_api/routes/public/task_instances.py       |   8 +
 .../core_api/services/public/task_instances.py     |  13 +-
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts |  12 ++
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |   1 +
 .../airflow/ui/public/i18n/locales/en/common.json  |  15 ++
 .../components/ActionAccordion/ActionAccordion.tsx |  87 +++++++--
 .../TaskInstance/ClearGroupTaskInstanceDialog.tsx  |   3 +-
 .../TaskInstances/BulkClearTaskInstancesButton.tsx | 157 ++++++++++++++++
 .../BulkDeleteTaskInstancesButton.tsx              | 153 +++++++++++++++
 .../BulkMarkTaskInstancesAsButton.tsx              | 208 +++++++++++++++++++++
 .../ui/src/pages/TaskInstances/TaskInstances.tsx   |  85 ++++++++-
 .../airflow/ui/src/queries/useBulkClearDryRun.ts   | 113 +++++++++++
 .../ui/src/queries/useBulkClearTaskInstances.ts    | 118 ++++++++++++
 .../airflow/ui/src/queries/useBulkMarkAsDryRun.ts  | 113 +++++++++++
 .../airflow/ui/src/queries/useBulkTaskInstances.ts | 105 +++++++++++
 .../core_api/routes/public/test_task_instances.py  | 140 ++++++++++++++
 .../src/airflowctl/api/datamodels/generated.py     |   1 +
 19 files changed, 1316 insertions(+), 23 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
index 397389994a0..c8e7ab2378d 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
@@ -168,6 +168,7 @@ class ClearTaskInstancesBody(StrictBaseModel):
         "clearing the task instances.",
     )
     prevent_running_task: bool = False
+    note: Annotated[str, StringConstraints(max_length=1000)] | None = None
 
     @model_validator(mode="before")
     @classmethod
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 17c4f87a6e4..128d0f2bd05 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -9904,6 +9904,12 @@ components:
           type: boolean
           title: Prevent Running Task
           default: false
+        note:
+          anyOf:
+          - type: string
+            maxLength: 1000
+          - type: 'null'
+          title: Note
       additionalProperties: false
       type: object
       title: ClearTaskInstancesBody
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
index 97fa930b1c1..9488c0b7591 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -721,6 +721,7 @@ def post_clear_task_instances(
     dag_bag: DagBagDep,
     body: ClearTaskInstancesBody,
     session: SessionDep,
+    user: GetUserDep,
 ) -> TaskInstanceCollectionResponse:
     """Clear task instances."""
     dag = get_latest_version_of_dag(dag_bag, dag_id, session)
@@ -837,6 +838,13 @@ def post_clear_task_instances(
         except AirflowClearRunningTaskException as e:
             raise HTTPException(status.HTTP_409_CONFLICT, str(e)) from e
 
+        if body.note is not None:
+            _patch_task_instance_note(
+                task_instance_body=body,
+                tis=task_instances,
+                user=user,
+            )
+
     return TaskInstanceCollectionResponse(
         task_instances=[TaskInstanceResponse.model_validate(ti) for ti in 
task_instances],
         total_entries=len(task_instances),
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
index 1c3d7c62913..8ab426bbef6 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
@@ -37,7 +37,11 @@ from airflow.api_fastapi.core_api.datamodels.common import (
     BulkDeleteAction,
     BulkUpdateAction,
 )
-from airflow.api_fastapi.core_api.datamodels.task_instances import 
BulkTaskInstanceBody, PatchTaskInstanceBody
+from airflow.api_fastapi.core_api.datamodels.task_instances import (
+    BulkTaskInstanceBody,
+    ClearTaskInstancesBody,
+    PatchTaskInstanceBody,
+)
 from airflow.api_fastapi.core_api.security import GetUserDep
 from airflow.api_fastapi.core_api.services.public.common import BulkService
 from airflow.listeners.listener import get_listener_manager
@@ -139,7 +143,7 @@ def _patch_task_instance_state(
 
 
 def _patch_task_instance_note(
-    task_instance_body: BulkTaskInstanceBody | PatchTaskInstanceBody,
+    task_instance_body: BulkTaskInstanceBody | ClearTaskInstancesBody | 
PatchTaskInstanceBody,
     tis: list[TI],
     user: GetUserDep,
     update_mask: list[str] | None = Query(None),
@@ -275,6 +279,7 @@ class 
BulkTaskInstanceService(BulkService[BulkTaskInstanceBody]):
             dag_bag=self.dag_bag,
             body=entity,
             session=self.session,
+            map_index=map_index,
             update_mask=update_mask,
         )
 
@@ -318,12 +323,12 @@ class 
BulkTaskInstanceService(BulkService[BulkTaskInstanceBody]):
 
         try:
             specific_entity_map = {
-                (entity.dag_id, entity.dag_run_id, entity.task_id, 
entity.map_index): entity
+                self._extract_task_identifiers(entity): entity
                 for entity in action.entities
                 if entity.map_index is not None
             }
             all_map_entity_map = {
-                (entity.dag_id, entity.dag_run_id, entity.task_id): entity
+                self._extract_task_identifiers(entity)[:3]: entity
                 for entity in action.entities
                 if entity.map_index is None
             }
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index f47aa107aea..0195c0b564a 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1383,6 +1383,18 @@ export const $ClearTaskInstancesBody = {
             type: 'boolean',
             title: 'Prevent Running Task',
             default: false
+        },
+        note: {
+            anyOf: [
+                {
+                    type: 'string',
+                    maxLength: 1000
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Note'
         }
     },
     additionalProperties: false,
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 7423c08d42c..9c61f27e71e 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -427,6 +427,7 @@ export type ClearTaskInstancesBody = {
      */
     run_on_latest_version?: boolean;
     prevent_running_task?: boolean;
+    note?: string | null;
 };
 
 /**
diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json 
b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
index 49037f58815..f0ccd787a26 100644
--- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
+++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
@@ -199,6 +199,7 @@
     "users": "Users"
   },
   "selectLanguage": "Select Language",
+  "selected": "Selected",
   "showDetailsPanel": "Show Details Panel",
   "signedInAs": "Signed in as",
   "source": {
@@ -299,6 +300,13 @@
     "utc": "UTC (Coordinated Universal Time)"
   },
   "toaster": {
+    "bulkClear": {
+      "error": "Bulk Clear {{resourceName}} Request Failed",
+      "success": {
+        "description": "{{count}} {{resourceName}} have been successfully 
cleared. Keys: {{keys}}",
+        "title": "Bulk Clear {{resourceName}} Request Submitted"
+      }
+    },
     "bulkDelete": {
       "error": "Bulk Delete {{resourceName}} Request Failed",
       "success": {
@@ -306,6 +314,13 @@
         "title": "Bulk Delete {{resourceName}} Request Submitted"
       }
     },
+    "bulkUpdate": {
+      "error": "Bulk Update {{resourceName}} Request Failed",
+      "success": {
+        "description": "{{count}} {{resourceName}} have been successfully 
updated. Keys: {{keys}}",
+        "title": "Bulk Update {{resourceName}} Request Submitted"
+      }
+    },
     "create": {
       "error": "Create {{resourceName}} Request Failed",
       "success": {
diff --git 
a/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx
 
b/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx
index e2f5b274a4c..514d93799d5 100644
--- 
a/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx
+++ 
b/airflow-core/src/airflow/ui/src/components/ActionAccordion/ActionAccordion.tsx
@@ -20,7 +20,11 @@ import { Box, Editable, Text, VStack } from 
"@chakra-ui/react";
 import type { ChangeEvent } from "react";
 import { useTranslation } from "react-i18next";
 
-import type { DAGRunResponse, TaskInstanceCollectionResponse } from 
"openapi/requests/types.gen";
+import type {
+  DAGRunResponse,
+  TaskInstanceCollectionResponse,
+  TaskInstanceResponse,
+} from "openapi/requests/types.gen";
 import ReactMarkdown from "src/components/ReactMarkdown";
 import { Accordion } from "src/components/ui";
 
@@ -29,17 +33,59 @@ import { getColumns } from "./columns";
 
 type Props = {
   readonly affectedTasks?: TaskInstanceCollectionResponse;
+  readonly groupByRunId?: boolean;
   readonly note: DAGRunResponse["note"];
   readonly setNote: (value: string) => void;
 };
 
+const TasksTable = ({
+  noRowsMessage,
+  tasks,
+}: {
+  readonly noRowsMessage: string;
+  readonly tasks: Array<TaskInstanceResponse>;
+}) => {
+  const { t: translate } = useTranslation();
+  const columns = getColumns(translate);
+
+  return (
+    <DataTable
+      columns={columns}
+      data={tasks}
+      displayMode="table"
+      modelName="common:taskInstance"
+      noRowsMessage={noRowsMessage}
+      total={tasks.length}
+    />
+  );
+};
+
 // Table is in memory, pagination and sorting are disabled.
 // TODO: Make a front-end only unconnected table component with client side 
ordering and pagination
-const ActionAccordion = ({ affectedTasks, note, setNote }: Props) => {
+const ActionAccordion = ({ affectedTasks, groupByRunId = false, note, setNote 
}: Props) => {
   const showTaskSection = affectedTasks !== undefined;
   const { t: translate } = useTranslation();
 
-  const columns = getColumns(translate);
+  // Group task instances by dag_run_id when requested
+  const runGroups = (() => {
+    if (!groupByRunId || !affectedTasks) {
+      return undefined;
+    }
+
+    const map = new Map<string, Array<TaskInstanceResponse>>();
+
+    for (const ti of affectedTasks.task_instances) {
+      const group = map.get(ti.dag_run_id) ?? [];
+
+      group.push(ti);
+      map.set(ti.dag_run_id, group);
+    }
+
+    return map;
+  })();
+
+  // Only group when there are actually multiple run IDs
+  const shouldGroup = groupByRunId && runGroups !== undefined && 
runGroups.size > 1;
 
   return (
     <Accordion.Root
@@ -59,14 +105,33 @@ const ActionAccordion = ({ affectedTasks, note, setNote }: 
Props) => {
           </Accordion.ItemTrigger>
           <Accordion.ItemContent>
             <Box maxH="400px" overflowY="scroll">
-              <DataTable
-                columns={columns}
-                data={affectedTasks.task_instances}
-                displayMode="table"
-                modelName="common:taskInstance"
-                
noRowsMessage={translate("dags:runAndTaskActions.affectedTasks.noItemsFound")}
-                total={affectedTasks.total_entries}
-              />
+              {shouldGroup ? (
+                <Accordion.Root collapsible multiple variant="plain">
+                  {[...runGroups.entries()].map(([runId, tis]) => (
+                    <Accordion.Item key={runId} value={runId}>
+                      <Accordion.ItemTrigger px={2} py={1}>
+                        <Text fontSize="sm" fontWeight="semibold">
+                          {translate("runId")}: {runId}{" "}
+                          <Text as="span" color="fg.subtle" 
fontWeight="normal">
+                            ({tis.length})
+                          </Text>
+                        </Text>
+                      </Accordion.ItemTrigger>
+                      <Accordion.ItemContent>
+                        <TasksTable
+                          
noRowsMessage={translate("dags:runAndTaskActions.affectedTasks.noItemsFound")}
+                          tasks={tis}
+                        />
+                      </Accordion.ItemContent>
+                    </Accordion.Item>
+                  ))}
+                </Accordion.Root>
+              ) : (
+                <TasksTable
+                  
noRowsMessage={translate("dags:runAndTaskActions.affectedTasks.noItemsFound")}
+                  tasks={affectedTasks.task_instances}
+                />
+              )}
             </Box>
           </Accordion.ItemContent>
         </Accordion.Item>
diff --git 
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
 
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
index 63e1df78526..5dc09c1f421 100644
--- 
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
+++ 
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
@@ -57,7 +57,7 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open, 
taskInstance }: Pr
   const downstream = selectedOptions.includes("downstream");
   const [runOnLatestVersion, setRunOnLatestVersion] = useState(false);
 
-  const [note, setNote] = useState<string>("");
+  const [note, setNote] = useState<string | null>(null);
 
   const { data: dagDetails } = useDagServiceGetDagDetails({
     dagId,
@@ -186,6 +186,7 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, 
open, taskInstance }: Pr
                     include_future: future,
                     include_past: past,
                     include_upstream: upstream,
+                    ...(note === null ? {} : { note }),
                     only_failed: onlyFailed,
                     run_on_latest_version: runOnLatestVersion,
                     task_ids: groupTaskIds,
diff --git 
a/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkClearTaskInstancesButton.tsx
 
b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkClearTaskInstancesButton.tsx
new file mode 100644
index 00000000000..13bc6df292b
--- /dev/null
+++ 
b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkClearTaskInstancesButton.tsx
@@ -0,0 +1,157 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Button, Flex, Heading, VStack, useDisclosure } from 
"@chakra-ui/react";
+import { useState } from "react";
+import { useTranslation } from "react-i18next";
+import { CgRedo } from "react-icons/cg";
+
+import type { TaskInstanceResponse } from "openapi/requests/types.gen";
+import { ActionAccordion } from "src/components/ActionAccordion";
+import { ErrorAlert } from "src/components/ErrorAlert";
+import { Checkbox, Dialog } from "src/components/ui";
+import SegmentedControl from "src/components/ui/SegmentedControl";
+import { useBulkClearDryRun } from "src/queries/useBulkClearDryRun";
+import { useBulkClearTaskInstances } from 
"src/queries/useBulkClearTaskInstances";
+
+type Props = {
+  readonly clearSelections: VoidFunction;
+  readonly selectedTaskInstances: Array<TaskInstanceResponse>;
+};
+
+const BulkClearTaskInstancesButton = ({ clearSelections, selectedTaskInstances 
}: Props) => {
+  const { t: translate } = useTranslation();
+  const { onClose, onOpen, open } = useDisclosure();
+  const [selectedOptions, setSelectedOptions] = 
useState<Array<string>>(["downstream"]);
+  const [note, setNote] = useState<string | null>(null);
+  const [preventRunningTask, setPreventRunningTask] = useState(true);
+  const { bulkClear, error, isPending } = useBulkClearTaskInstances({
+    clearSelections,
+    onSuccessConfirm: onClose,
+  });
+
+  const handleClose = () => {
+    setNote(null);
+    onClose();
+  };
+
+  const past = selectedOptions.includes("past");
+  const future = selectedOptions.includes("future");
+  const upstream = selectedOptions.includes("upstream");
+  const downstream = selectedOptions.includes("downstream");
+  const onlyFailed = selectedOptions.includes("onlyFailed");
+
+  const hasLogicalDate = selectedTaskInstances.some((ti) => ti.logical_date 
!== null);
+
+  const { data: affectedTasks, isFetching } = useBulkClearDryRun(open, 
selectedTaskInstances, {
+    includeDownstream: downstream,
+    includeFuture: future,
+    includeOnlyFailed: onlyFailed,
+    includePast: past,
+    includeUpstream: upstream,
+  });
+
+  return (
+    <>
+      <Button colorPalette="brand" onClick={onOpen} size="sm" 
variant="outline">
+        <CgRedo />
+        {translate("dags:runAndTaskActions.clear.button", { type: 
translate("taskInstance_other") })}
+      </Button>
+
+      <Dialog.Root onOpenChange={handleClose} open={open} size="xl">
+        <Dialog.Content backdrop>
+          <Dialog.Header>
+            <VStack align="start" gap={4}>
+              <Heading size="xl">
+                {translate("dags:runAndTaskActions.clear.title", {
+                  type: translate("taskInstance_other"),
+                })}
+              </Heading>
+            </VStack>
+          </Dialog.Header>
+
+          <Dialog.CloseTrigger />
+          <Dialog.Body width="full">
+            <Flex justifyContent="center" mb={4}>
+              <SegmentedControl
+                defaultValues={["downstream"]}
+                multiple
+                onChange={setSelectedOptions}
+                options={[
+                  {
+                    disabled: !hasLogicalDate,
+                    label: translate("dags:runAndTaskActions.options.past"),
+                    value: "past",
+                  },
+                  {
+                    disabled: !hasLogicalDate,
+                    label: translate("dags:runAndTaskActions.options.future"),
+                    value: "future",
+                  },
+                  {
+                    label: 
translate("dags:runAndTaskActions.options.upstream"),
+                    value: "upstream",
+                  },
+                  {
+                    label: 
translate("dags:runAndTaskActions.options.downstream"),
+                    value: "downstream",
+                  },
+                  {
+                    label: 
translate("dags:runAndTaskActions.options.onlyFailed"),
+                    value: "onlyFailed",
+                  },
+                ]}
+              />
+            </Flex>
+            <ActionAccordion affectedTasks={affectedTasks} groupByRunId 
note={note} setNote={setNote} />
+            <ErrorAlert error={error} />
+            <Flex alignItems="center" justifyContent="space-between" mt={3}>
+              <Checkbox
+                checked={preventRunningTask}
+                onCheckedChange={(event) => 
setPreventRunningTask(Boolean(event.checked))}
+              >
+                
{translate("dags:runAndTaskActions.options.preventRunningTasks")}
+              </Checkbox>
+              <Button
+                colorPalette="brand"
+                disabled={affectedTasks.total_entries === 0}
+                loading={isPending || isFetching}
+                onClick={() => {
+                  void bulkClear(selectedTaskInstances, {
+                    includeDownstream: downstream,
+                    includeFuture: future,
+                    includeOnlyFailed: onlyFailed,
+                    includePast: past,
+                    includeUpstream: upstream,
+                    note,
+                    preventRunningTask,
+                  });
+                }}
+              >
+                <CgRedo />
+                {translate("modal.confirm")}
+              </Button>
+            </Flex>
+          </Dialog.Body>
+        </Dialog.Content>
+      </Dialog.Root>
+    </>
+  );
+};
+
+export default BulkClearTaskInstancesButton;
diff --git 
a/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkDeleteTaskInstancesButton.tsx
 
b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkDeleteTaskInstancesButton.tsx
new file mode 100644
index 00000000000..c3395d3be03
--- /dev/null
+++ 
b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkDeleteTaskInstancesButton.tsx
@@ -0,0 +1,153 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Box, Button, Flex, Heading, Text, useDisclosure, VStack } from 
"@chakra-ui/react";
+import { useTranslation } from "react-i18next";
+import { FiTrash2 } from "react-icons/fi";
+
+import type { TaskInstanceResponse } from "openapi/requests/types.gen";
+import { getColumns } from "src/components/ActionAccordion/columns";
+import { DataTable } from "src/components/DataTable";
+import { ErrorAlert } from "src/components/ErrorAlert";
+import { Accordion, Dialog } from "src/components/ui";
+import { useBulkTaskInstances } from "src/queries/useBulkTaskInstances";
+
+type Props = {
+  readonly clearSelections: VoidFunction;
+  readonly selectedTaskInstances: Array<TaskInstanceResponse>;
+};
+
+const BulkDeleteTaskInstancesButton = ({ clearSelections, 
selectedTaskInstances }: Props) => {
+  const { t: translate } = useTranslation();
+  const { onClose, onOpen, open } = useDisclosure();
+  const { bulkAction, error, isPending } = useBulkTaskInstances({
+    clearSelections,
+    onSuccessConfirm: onClose,
+  });
+
+  const columns = getColumns(translate);
+
+  // Group by dag_run_id for display
+  const byRunId = new Map<string, Array<TaskInstanceResponse>>();
+
+  for (const ti of selectedTaskInstances) {
+    const group = byRunId.get(ti.dag_run_id) ?? [];
+
+    group.push(ti);
+    byRunId.set(ti.dag_run_id, group);
+  }
+
+  const isGrouped = byRunId.size > 1;
+
+  return (
+    <>
+      <Button colorPalette="danger" onClick={onOpen} size="sm" 
variant="outline">
+        <FiTrash2 />
+        {translate("dags:runAndTaskActions.delete.button", { type: 
translate("taskInstance_other") })}
+      </Button>
+
+      <Dialog.Root onOpenChange={onClose} open={open} size="xl">
+        <Dialog.Content backdrop>
+          <Dialog.Header>
+            <VStack align="start" gap={4}>
+              <Heading size="xl">
+                {translate("dags:runAndTaskActions.delete.dialog.title", {
+                  type: translate("taskInstance_other"),
+                })}
+              </Heading>
+            </VStack>
+          </Dialog.Header>
+
+          <Dialog.CloseTrigger />
+          <Dialog.Body width="full">
+            <Text color="fg.subtle" fontSize="sm" mb={4}>
+              {translate("dags:runAndTaskActions.delete.dialog.warning", {
+                type: translate("taskInstance_other"),
+              })}
+            </Text>
+
+            <Box maxH="400px" overflowY="auto">
+              {isGrouped ? (
+                <Accordion.Root collapsible multiple variant="enclosed">
+                  {[...byRunId.entries()].map(([runId, tis]) => (
+                    <Accordion.Item key={runId} value={runId}>
+                      <Accordion.ItemTrigger>
+                        <Text fontSize="sm" fontWeight="semibold">
+                          {translate("runId")}: {runId}{" "}
+                          <Text as="span" color="fg.subtle" 
fontWeight="normal">
+                            ({tis.length})
+                          </Text>
+                        </Text>
+                      </Accordion.ItemTrigger>
+                      <Accordion.ItemContent>
+                        <DataTable
+                          columns={columns}
+                          data={tis}
+                          displayMode="table"
+                          modelName="common:taskInstance"
+                          total={tis.length}
+                        />
+                      </Accordion.ItemContent>
+                    </Accordion.Item>
+                  ))}
+                </Accordion.Root>
+              ) : (
+                <DataTable
+                  columns={columns}
+                  data={selectedTaskInstances}
+                  displayMode="table"
+                  modelName="common:taskInstance"
+                  total={selectedTaskInstances.length}
+                />
+              )}
+            </Box>
+
+            <ErrorAlert error={error} />
+            <Flex justifyContent="end" mt={3}>
+              <Button
+                colorPalette="danger"
+                loading={isPending}
+                onClick={() => {
+                  bulkAction({
+                    actions: [
+                      {
+                        action: "delete" as const,
+                        action_on_non_existence: "skip",
+                        entities: selectedTaskInstances.map((ti) => ({
+                          dag_id: ti.dag_id,
+                          dag_run_id: ti.dag_run_id,
+                          map_index: ti.map_index,
+                          task_id: ti.task_id,
+                        })),
+                      },
+                    ],
+                  });
+                }}
+              >
+                <FiTrash2 />
+                <Text fontWeight="bold">{translate("modal.confirm")}</Text>
+              </Button>
+            </Flex>
+          </Dialog.Body>
+        </Dialog.Content>
+      </Dialog.Root>
+    </>
+  );
+};
+
+export default BulkDeleteTaskInstancesButton;
diff --git 
a/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkMarkTaskInstancesAsButton.tsx
 
b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkMarkTaskInstancesAsButton.tsx
new file mode 100644
index 00000000000..e30b777ad9c
--- /dev/null
+++ 
b/airflow-core/src/airflow/ui/src/pages/TaskInstances/BulkMarkTaskInstancesAsButton.tsx
@@ -0,0 +1,208 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Badge, Box, Button, Flex, Heading, HStack, VStack, useDisclosure } 
from "@chakra-ui/react";
+import { useState } from "react";
+import { useTranslation } from "react-i18next";
+import { FiX } from "react-icons/fi";
+import { LuCheck } from "react-icons/lu";
+
+import type { TaskInstanceResponse, TaskInstanceState } from 
"openapi/requests/types.gen";
+import { ActionAccordion } from "src/components/ActionAccordion";
+import { ErrorAlert } from "src/components/ErrorAlert";
+import { allowedStates } from "src/components/MarkAs/utils";
+import { StateBadge } from "src/components/StateBadge";
+import { Dialog, Menu } from "src/components/ui";
+import SegmentedControl from "src/components/ui/SegmentedControl";
+import { useBulkMarkAsDryRun } from "src/queries/useBulkMarkAsDryRun";
+import { useBulkTaskInstances } from "src/queries/useBulkTaskInstances";
+
+type Props = {
+  readonly clearSelections: VoidFunction;
+  readonly selectedTaskInstances: Array<TaskInstanceResponse>;
+};
+
+const BulkMarkTaskInstancesAsButton = ({ clearSelections, 
selectedTaskInstances }: Props) => {
+  const { t: translate } = useTranslation();
+  const { onClose, onOpen, open } = useDisclosure();
+  const [state, setState] = useState<TaskInstanceState>("success");
+  const [selectedOptions, setSelectedOptions] = useState<Array<string>>([]);
+  const [note, setNote] = useState<string | null>(null);
+  const { bulkAction, error, isPending, setError } = useBulkTaskInstances({
+    clearSelections,
+    onSuccessConfirm: onClose,
+  });
+
+  const past = selectedOptions.includes("past");
+  const future = selectedOptions.includes("future");
+  const upstream = selectedOptions.includes("upstream");
+  const downstream = selectedOptions.includes("downstream");
+
+  const hasLogicalDate = selectedTaskInstances.some((ti) => ti.logical_date 
!== null);
+
+  const affectedCount = (targetState: TaskInstanceState) =>
+    selectedTaskInstances.filter((ti) => ti.state !== targetState).length;
+
+  const { data: affectedTasks, isFetching } = useBulkMarkAsDryRun(open, {
+    options: {
+      includeDownstream: downstream,
+      includeFuture: future,
+      includePast: past,
+      includeUpstream: upstream,
+    },
+    selectedTaskInstances,
+    targetState: state,
+  });
+
+  const handleOpen = (newState: TaskInstanceState) => {
+    setState(newState);
+    setSelectedOptions([]);
+    setNote(null);
+    setError(undefined);
+    onOpen();
+  };
+
+  const directlyAffected = selectedTaskInstances.filter((ti) => ti.state !== 
state);
+
+  return (
+    <Box>
+      <Menu.Root positioning={{ gutter: 0, placement: "top" }}>
+        <Menu.Trigger asChild>
+          <div>
+            <Button colorPalette="brand" size="sm" variant="outline">
+              <HStack gap={1} mx={1}>
+                <LuCheck />
+                <span>/</span>
+                <FiX />
+              </HStack>
+              {translate("dags:runAndTaskActions.markAs.button", { type: 
translate("taskInstance_other") })}
+            </Button>
+          </div>
+        </Menu.Trigger>
+        <Menu.Content>
+          {allowedStates.map((menuState) => {
+            const count = affectedCount(menuState);
+
+            return (
+              <Menu.Item
+                disabled={count === 0}
+                key={menuState}
+                onClick={() => {
+                  if (count > 0) {
+                    handleOpen(menuState);
+                  }
+                }}
+                value={menuState}
+              >
+                <HStack justify="space-between" width="full">
+                  <StateBadge 
state={menuState}>{translate(`common:states.${menuState}`)}</StateBadge>
+                  <Badge colorPalette="gray" variant="subtle">
+                    {count}
+                  </Badge>
+                </HStack>
+              </Menu.Item>
+            );
+          })}
+        </Menu.Content>
+      </Menu.Root>
+
+      <Dialog.Root onOpenChange={onClose} open={open} size="xl">
+        <Dialog.Content backdrop>
+          <Dialog.Header>
+            <VStack align="start" gap={4}>
+              <Heading size="xl">
+                {translate("dags:runAndTaskActions.markAs.title", {
+                  state,
+                  type: translate("taskInstance_other"),
+                })}{" "}
+                <StateBadge state={state} />
+              </Heading>
+            </VStack>
+          </Dialog.Header>
+
+          <Dialog.CloseTrigger />
+          <Dialog.Body width="full">
+            <Flex justifyContent="center" mb={4}>
+              <SegmentedControl
+                defaultValues={[]}
+                multiple
+                onChange={setSelectedOptions}
+                options={[
+                  {
+                    disabled: !hasLogicalDate,
+                    label: translate("dags:runAndTaskActions.options.past"),
+                    value: "past",
+                  },
+                  {
+                    disabled: !hasLogicalDate,
+                    label: translate("dags:runAndTaskActions.options.future"),
+                    value: "future",
+                  },
+                  {
+                    label: 
translate("dags:runAndTaskActions.options.upstream"),
+                    value: "upstream",
+                  },
+                  {
+                    label: 
translate("dags:runAndTaskActions.options.downstream"),
+                    value: "downstream",
+                  },
+                ]}
+              />
+            </Flex>
+            <ActionAccordion affectedTasks={affectedTasks} groupByRunId 
note={note} setNote={setNote} />
+            <ErrorAlert error={error} />
+            <Flex justifyContent="end" mt={3}>
+              <Button
+                colorPalette="brand"
+                disabled={affectedTasks.total_entries === 0}
+                loading={isPending || isFetching}
+                onClick={() => {
+                  bulkAction({
+                    actions: [
+                      {
+                        action: "update" as const,
+                        action_on_non_existence: "skip",
+                        entities: directlyAffected.map((ti) => ({
+                          dag_id: ti.dag_id,
+                          dag_run_id: ti.dag_run_id,
+                          include_downstream: downstream,
+                          include_future: future,
+                          include_past: past,
+                          include_upstream: upstream,
+                          map_index: ti.map_index,
+                          new_state: state,
+                          note,
+                          task_id: ti.task_id,
+                        })),
+                        update_mask: note === null ? ["new_state"] : 
["new_state", "note"],
+                      },
+                    ],
+                  });
+                }}
+              >
+                {translate("modal.confirm")}
+              </Button>
+            </Flex>
+          </Dialog.Body>
+        </Dialog.Content>
+      </Dialog.Root>
+    </Box>
+  );
+};
+
+export default BulkMarkTaskInstancesAsButton;
diff --git 
a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx 
b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx
index 678f386f116..e066436afd0 100644
--- a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx
@@ -29,21 +29,29 @@ import type { TaskInstanceResponse } from 
"openapi/requests/types.gen";
 import { ClearTaskInstanceButton } from "src/components/Clear";
 import { DagVersion } from "src/components/DagVersion";
 import { DataTable } from "src/components/DataTable";
+import { useRowSelection, type GetColumnsParams } from 
"src/components/DataTable/useRowSelection";
 import { useTableURLState } from "src/components/DataTable/useTableUrlState";
 import { ErrorAlert } from "src/components/ErrorAlert";
 import { MarkTaskInstanceAsButton } from "src/components/MarkAs";
 import { StateBadge } from "src/components/StateBadge";
 import Time from "src/components/Time";
 import { TruncatedText } from "src/components/TruncatedText";
+import { ActionBar } from "src/components/ui/ActionBar";
+import { Checkbox } from "src/components/ui/Checkbox";
 import { SearchParamsKeys, type SearchParamsKeysType } from 
"src/constants/searchParams";
 import { useAutoRefresh, isStatePending, renderDuration } from "src/utils";
 import { getTaskInstanceLink } from "src/utils/links";
 
+import BulkClearTaskInstancesButton from "./BulkClearTaskInstancesButton";
+import BulkDeleteTaskInstancesButton from "./BulkDeleteTaskInstancesButton";
+import BulkMarkTaskInstancesAsButton from "./BulkMarkTaskInstancesAsButton";
 import DeleteTaskInstanceButton from "./DeleteTaskInstanceButton";
 import { TaskInstancesFilter } from "./TaskInstancesFilter";
 
 type TaskInstanceRow = { row: { original: TaskInstanceResponse } };
 
+const getRowKey = (ti: TaskInstanceResponse) => 
`${ti.dag_id}:${ti.dag_run_id}:${ti.task_id}:${ti.map_index}`;
+
 const {
   DAG_ID_PATTERN: DAG_ID_PATTERN_PARAM,
   DAG_VERSION: DAG_VERSION_PARAM,
@@ -63,17 +71,47 @@ const {
   TRY_NUMBER: TRY_NUMBER_PARAM,
 }: SearchParamsKeysType = SearchParamsKeys;
 
+type ColumnProps = {
+  readonly dagId?: string;
+  readonly runId?: string;
+  readonly taskId?: string;
+  readonly translate: TFunction;
+};
+
 const taskInstanceColumns = ({
+  allRowsSelected,
   dagId,
+  onRowSelect,
+  onSelectAll,
   runId,
+  selectedRows,
   taskId,
   translate,
-}: {
-  dagId?: string;
-  runId?: string;
-  taskId?: string;
-  translate: TFunction;
-}): Array<ColumnDef<TaskInstanceResponse>> => [
+}: ColumnProps & GetColumnsParams): Array<ColumnDef<TaskInstanceResponse>> => [
+  {
+    accessorKey: "select",
+    cell: ({ row }) => (
+      <Checkbox
+        borderWidth={1}
+        checked={selectedRows.get(getRowKey(row.original))}
+        colorPalette="brand"
+        onCheckedChange={(event) => onRowSelect(getRowKey(row.original), 
Boolean(event.checked))}
+      />
+    ),
+    enableHiding: false,
+    enableSorting: false,
+    header: () => (
+      <Checkbox
+        borderWidth={1}
+        checked={allRowsSelected}
+        colorPalette="brand"
+        onCheckedChange={(event) => onSelectAll(Boolean(event.checked))}
+      />
+    ),
+    meta: {
+      skeletonWidth: 10,
+    },
+  },
   ...(Boolean(dagId)
     ? []
     : [
@@ -95,7 +133,6 @@ const taskInstanceColumns = ({
     : [
         {
           accessorKey: "run_after",
-          // If we don't show the taskId column, make the dag run a link to 
the task instance
           cell: ({ row: { original } }: TaskInstanceRow) =>
             Boolean(taskId) ? (
               <Link asChild color="fg.info" fontWeight="bold">
@@ -292,9 +329,22 @@ export const TaskInstances = () => {
     },
   );
 
+  const { allRowsSelected, clearSelections, handleRowSelect, handleSelectAll, 
selectedRows } =
+    useRowSelection({
+      data: data?.task_instances,
+      getKey: getRowKey,
+    });
+
+  const selectedTaskInstances = (data?.task_instances ?? []).filter((ti) => 
selectedRows.has(getRowKey(ti)));
+
   const columns = taskInstanceColumns({
+    allRowsSelected,
     dagId,
+    multiTeam: false,
+    onRowSelect: handleRowSelect,
+    onSelectAll: handleSelectAll,
     runId,
+    selectedRows,
     taskId: Boolean(groupId) ? undefined : taskId,
     translate,
   });
@@ -312,6 +362,27 @@ export const TaskInstances = () => {
         onStateChange={setTableURLState}
         total={data?.total_entries}
       />
+      <ActionBar.Root closeOnInteractOutside={false} 
open={Boolean(selectedRows.size)}>
+        <ActionBar.Content>
+          <ActionBar.SelectionTrigger>
+            {selectedRows.size} {translate("selected")}
+          </ActionBar.SelectionTrigger>
+          <ActionBar.Separator />
+          <BulkClearTaskInstancesButton
+            clearSelections={clearSelections}
+            selectedTaskInstances={selectedTaskInstances}
+          />
+          <BulkMarkTaskInstancesAsButton
+            clearSelections={clearSelections}
+            selectedTaskInstances={selectedTaskInstances}
+          />
+          <BulkDeleteTaskInstancesButton
+            clearSelections={clearSelections}
+            selectedTaskInstances={selectedTaskInstances}
+          />
+          <ActionBar.CloseTrigger onClick={clearSelections} />
+        </ActionBar.Content>
+      </ActionBar.Root>
     </>
   );
 };
diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkClearDryRun.ts 
b/airflow-core/src/airflow/ui/src/queries/useBulkClearDryRun.ts
new file mode 100644
index 00000000000..e631860f300
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkClearDryRun.ts
@@ -0,0 +1,113 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useQueries } from "@tanstack/react-query";
+import { useMemo } from "react";
+
+import { TaskInstanceService } from "openapi/requests/services.gen";
+import type { TaskInstanceCollectionResponse, TaskInstanceResponse } from 
"openapi/requests/types.gen";
+
+type Options = {
+  includeDownstream: boolean;
+  includeFuture: boolean;
+  includeOnlyFailed: boolean;
+  includePast: boolean;
+  includeUpstream: boolean;
+};
+
+const EMPTY: TaskInstanceCollectionResponse = { task_instances: [], 
total_entries: 0 };
+
+export const useBulkClearDryRunKey = "bulkClearDryRun";
+
+export const useBulkClearDryRun = (
+  enabled: boolean,
+  selectedTaskInstances: Array<TaskInstanceResponse>,
+  options: Options,
+) => {
+  const byDagRun = useMemo(() => {
+    const groups = new Map<string, { dagId: string; dagRunId: string; tis: 
Array<TaskInstanceResponse> }>();
+
+    for (const ti of selectedTaskInstances) {
+      const key = `${ti.dag_id}::${ti.dag_run_id}`;
+      const group = groups.get(key) ?? { dagId: ti.dag_id, dagRunId: 
ti.dag_run_id, tis: [] };
+
+      group.tis.push(ti);
+      groups.set(key, group);
+    }
+
+    return [...groups.values()];
+  }, [selectedTaskInstances]);
+
+  const results = useQueries({
+    queries: byDagRun.map(({ dagId, dagRunId, tis }) => ({
+      enabled,
+      queryFn: () =>
+        TaskInstanceService.postClearTaskInstances({
+          dagId,
+          requestBody: {
+            dag_run_id: dagRunId,
+            dry_run: true,
+            include_downstream: options.includeDownstream,
+            include_future: options.includeFuture,
+            include_past: options.includePast,
+            include_upstream: options.includeUpstream,
+            only_failed: options.includeOnlyFailed,
+            task_ids: tis.map((ti) =>
+              ti.map_index >= 0 ? ([ti.task_id, ti.map_index] as [string, 
number]) : ti.task_id,
+            ),
+          },
+        }),
+      queryKey: [
+        useBulkClearDryRunKey,
+        dagId,
+        dagRunId,
+        {
+          include_downstream: options.includeDownstream,
+          include_future: options.includeFuture,
+          include_only_failed: options.includeOnlyFailed,
+          include_past: options.includePast,
+          include_upstream: options.includeUpstream,
+          task_ids: tis.map((ti) => `${ti.task_id}:${ti.map_index}`),
+        },
+      ],
+      refetchOnMount: "always" as const,
+    })),
+  });
+
+  const isFetching = results.some((result) => result.isFetching);
+
+  const data = useMemo<TaskInstanceCollectionResponse>(() => {
+    const seen = new Set<string>();
+    const merged: Array<TaskInstanceResponse> = [];
+
+    for (const result of results) {
+      for (const ti of result.data?.task_instances ?? []) {
+        const key = 
`${ti.dag_id}:${ti.dag_run_id}:${ti.task_id}:${ti.map_index}`;
+
+        if (!seen.has(key)) {
+          seen.add(key);
+          merged.push(ti);
+        }
+      }
+    }
+
+    return merged.length === 0 ? EMPTY : { task_instances: merged, 
total_entries: merged.length };
+  }, [results]);
+
+  return { data, isFetching };
+};
diff --git 
a/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts 
b/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts
new file mode 100644
index 00000000000..063b1c6d77d
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkClearTaskInstances.ts
@@ -0,0 +1,118 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useQueryClient } from "@tanstack/react-query";
+import { useState } from "react";
+import { useTranslation } from "react-i18next";
+
+import { useDagRunServiceGetDagRunsKey, 
useTaskInstanceServiceGetTaskInstancesKey } from "openapi/queries";
+import { TaskInstanceService } from "openapi/requests/services.gen";
+import type { TaskInstanceResponse } from "openapi/requests/types.gen";
+import { toaster } from "src/components/ui";
+
+type Props = {
+  readonly clearSelections: VoidFunction;
+  readonly onSuccessConfirm: VoidFunction;
+};
+
+export type BulkClearOptions = {
+  includeDownstream: boolean;
+  includeFuture: boolean;
+  includeOnlyFailed: boolean;
+  includePast: boolean;
+  includeUpstream: boolean;
+  note: string | null;
+  preventRunningTask: boolean;
+};
+
+export const useBulkClearTaskInstances = ({ clearSelections, onSuccessConfirm 
}: Props) => {
+  const queryClient = useQueryClient();
+  const [error, setError] = useState<unknown>(undefined);
+  const [isPending, setIsPending] = useState(false);
+  const { t: translate } = useTranslation(["common", "dags"]);
+
+  const invalidateQueries = async () => {
+    await Promise.all([
+      queryClient.invalidateQueries({ queryKey: 
[useTaskInstanceServiceGetTaskInstancesKey] }),
+      queryClient.invalidateQueries({ queryKey: 
[useDagRunServiceGetDagRunsKey] }),
+    ]);
+  };
+
+  const bulkClear = async (taskInstances: Array<TaskInstanceResponse>, 
options: BulkClearOptions) => {
+    setError(undefined);
+    setIsPending(true);
+
+    // Group by (dag_id, dag_run_id) — clear endpoint requires a specific 
dag_id
+    // and dag_run_id scopes the clear to the specific run
+    const byDagRun = new Map<string, { dagId: string; dagRunId: string; tis: 
Array<TaskInstanceResponse> }>();
+
+    for (const ti of taskInstances) {
+      const key = `${ti.dag_id}::${ti.dag_run_id}`;
+      const group = byDagRun.get(key) ?? { dagId: ti.dag_id, dagRunId: 
ti.dag_run_id, tis: [] };
+
+      group.tis.push(ti);
+      byDagRun.set(key, group);
+    }
+
+    try {
+      await Promise.all(
+        [...byDagRun.values()].map(({ dagId, dagRunId, tis }) =>
+          TaskInstanceService.postClearTaskInstances({
+            dagId,
+            requestBody: {
+              dag_run_id: dagRunId,
+              dry_run: false,
+              include_downstream: options.includeDownstream,
+              include_future: options.includeFuture,
+              include_past: options.includePast,
+              include_upstream: options.includeUpstream,
+              note: options.note,
+              only_failed: options.includeOnlyFailed,
+              ...(options.preventRunningTask ? { prevent_running_task: true } 
: {}),
+              task_ids: tis.map((ti) =>
+                ti.map_index >= 0 ? ([ti.task_id, ti.map_index] as [string, 
number]) : ti.task_id,
+              ),
+            },
+          }),
+        ),
+      );
+
+      await invalidateQueries();
+
+      toaster.create({
+        description: translate("toaster.bulkClear.success.description", {
+          count: taskInstances.length,
+          keys: taskInstances.map((ti) => ti.task_id).join(", "),
+          resourceName: translate("taskInstance_other"),
+        }),
+        title: translate("toaster.bulkClear.success.title", {
+          resourceName: translate("taskInstance_other"),
+        }),
+        type: "success",
+      });
+
+      clearSelections();
+      onSuccessConfirm();
+    } catch (_error) {
+      setError(_error);
+    }
+    setIsPending(false);
+  };
+
+  return { bulkClear, error, isPending, setError };
+};
diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts 
b/airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts
new file mode 100644
index 00000000000..b8d5eb06758
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkMarkAsDryRun.ts
@@ -0,0 +1,113 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useQueries } from "@tanstack/react-query";
+import { useMemo } from "react";
+
+import { TaskInstanceService } from "openapi/requests/services.gen";
+import type {
+  TaskInstanceCollectionResponse,
+  TaskInstanceResponse,
+  TaskInstanceState,
+} from "openapi/requests/types.gen";
+
+type Options = {
+  includeDownstream: boolean;
+  includeFuture: boolean;
+  includePast: boolean;
+  includeUpstream: boolean;
+};
+
+const EMPTY: TaskInstanceCollectionResponse = { task_instances: [], 
total_entries: 0 };
+
+export const useBulkMarkAsDryRunKey = "bulkMarkAsDryRun";
+
+export const useBulkMarkAsDryRun = (
+  enabled: boolean,
+  {
+    options,
+    selectedTaskInstances,
+    targetState,
+  }: {
+    options: Options;
+    selectedTaskInstances: Array<TaskInstanceResponse>;
+    targetState: TaskInstanceState;
+  },
+) => {
+  const affectedInstances = useMemo(
+    () => selectedTaskInstances.filter((ti) => ti.state !== targetState),
+    [selectedTaskInstances, targetState],
+  );
+
+  const results = useQueries({
+    queries: affectedInstances.map((ti) => ({
+      enabled,
+      queryFn: () =>
+        TaskInstanceService.patchTaskInstanceDryRun({
+          dagId: ti.dag_id,
+          dagRunId: ti.dag_run_id,
+          mapIndex: ti.map_index,
+          requestBody: {
+            include_downstream: options.includeDownstream,
+            include_future: options.includeFuture,
+            include_past: options.includePast,
+            include_upstream: options.includeUpstream,
+            new_state: targetState,
+          },
+          taskId: ti.task_id,
+        }),
+      queryKey: [
+        useBulkMarkAsDryRunKey,
+        ti.dag_id,
+        ti.dag_run_id,
+        ti.task_id,
+        ti.map_index,
+        {
+          include_downstream: options.includeDownstream,
+          include_future: options.includeFuture,
+          include_past: options.includePast,
+          include_upstream: options.includeUpstream,
+          new_state: targetState,
+        },
+      ],
+      refetchOnMount: "always" as const,
+    })),
+  });
+
+  const isFetching = results.some((result) => result.isFetching);
+
+  const data = useMemo<TaskInstanceCollectionResponse>(() => {
+    const seen = new Set<string>();
+    const merged: Array<TaskInstanceResponse> = [];
+
+    for (const result of results) {
+      for (const ti of result.data?.task_instances ?? []) {
+        const key = 
`${ti.dag_id}:${ti.dag_run_id}:${ti.task_id}:${ti.map_index}`;
+
+        if (!seen.has(key)) {
+          seen.add(key);
+          merged.push(ti);
+        }
+      }
+    }
+
+    return merged.length === 0 ? EMPTY : { task_instances: merged, 
total_entries: merged.length };
+  }, [results]);
+
+  return { data, isFetching };
+};
diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts 
b/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts
new file mode 100644
index 00000000000..8ad4d616197
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkTaskInstances.ts
@@ -0,0 +1,105 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useQueryClient } from "@tanstack/react-query";
+import { useState } from "react";
+import { useTranslation } from "react-i18next";
+
+import {
+  useDagRunServiceGetDagRunsKey,
+  useTaskInstanceServiceBulkTaskInstances,
+  useTaskInstanceServiceGetTaskInstancesKey,
+} from "openapi/queries";
+import type {
+  BulkActionResponse,
+  BulkBody_BulkTaskInstanceBody_,
+  BulkResponse,
+} from "openapi/requests/types.gen";
+import { toaster } from "src/components/ui";
+
+type Props = {
+  readonly clearSelections: VoidFunction;
+  readonly onSuccessConfirm: VoidFunction;
+};
+
+const handleActionResult = (
+  actionResult: BulkActionResponse,
+  setError: (error: unknown) => void,
+  onSuccess: (count: number, keys: Array<string>) => void,
+) => {
+  const { errors, success } = actionResult;
+
+  if (Array.isArray(errors) && errors.length > 0) {
+    const apiError = errors[0] as { error: string };
+
+    setError({ body: { detail: apiError.error } });
+  } else if (Array.isArray(success) && success.length > 0) {
+    onSuccess(success.length, success);
+  }
+};
+
+export const useBulkTaskInstances = ({ clearSelections, onSuccessConfirm }: 
Props) => {
+  const queryClient = useQueryClient();
+  const [error, setError] = useState<unknown>(undefined);
+  const { t: translate } = useTranslation(["common", "dags"]);
+
+  const onSuccess = async (responseData: BulkResponse) => {
+    await Promise.all([
+      queryClient.invalidateQueries({ queryKey: 
[useTaskInstanceServiceGetTaskInstancesKey] }),
+      queryClient.invalidateQueries({ queryKey: 
[useDagRunServiceGetDagRunsKey] }),
+    ]);
+
+    const isDelete = Boolean(responseData.delete);
+    const actionResult = responseData.delete ?? responseData.update;
+    const toasterKey = isDelete ? "toaster.bulkDelete" : "toaster.bulkUpdate";
+
+    if (actionResult) {
+      handleActionResult(actionResult, setError, (count, keys) => {
+        toaster.create({
+          description: translate(`${toasterKey}.success.description`, {
+            count,
+            keys: keys.join(", "),
+            resourceName: translate("taskInstance_other"),
+          }),
+          title: translate(`${toasterKey}.success.title`, {
+            resourceName: translate("taskInstance_other"),
+          }),
+          type: "success",
+        });
+        clearSelections();
+        onSuccessConfirm();
+      });
+    }
+  };
+
+  const onError = (_error: unknown) => {
+    setError(_error);
+  };
+
+  const { isPending, mutate } = useTaskInstanceServiceBulkTaskInstances({
+    onError,
+    onSuccess,
+  });
+
+  const bulkAction = (requestBody: BulkBody_BulkTaskInstanceBody_) => {
+    setError(undefined);
+    mutate({ dagId: "~", dagRunId: "~", requestBody });
+  };
+
+  return { bulkAction, error, isPending, setError };
+};
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index ba8f06a7424..1a5d689d1ef 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -3644,6 +3644,88 @@ class 
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
         assert response.status_code == 200
         assert logs == audit_log_count
 
+    @pytest.mark.db_test
+    def test_clear_sets_note_on_task_instances(self, test_client, session):
+        """Test that a note is set on cleared task instances when note is 
provided."""
+        dag_id = "example_python_operator"
+        note_value = "Cleared by automation"
+        payload = {
+            "dry_run": False,
+            "reset_dag_runs": False,
+            "only_failed": True,
+            "only_running": False,
+            "note": note_value,
+        }
+        self.create_task_instances(
+            session,
+            dag_id=dag_id,
+            task_instances=[{"logical_date": DEFAULT_DATETIME_1, "state": 
State.FAILED}],
+            update_extras=False,
+        )
+        response = test_client.post(
+            f"/dags/{dag_id}/clearTaskInstances",
+            json=payload,
+        )
+        assert response.status_code == 200
+        response_data = response.json()
+        assert response_data["total_entries"] == 1
+        ti_id = response_data["task_instances"][0]["id"]
+        _check_task_instance_note(session, ti_id, {"content": note_value, 
"user_id": "test"})
+
+    @pytest.mark.db_test
+    def test_clear_without_note_does_not_set_note(self, test_client, session):
+        """Test that existing note is preserved on cleared task instances when 
note is not provided."""
+        dag_id = "example_python_operator"
+        payload = {
+            "dry_run": False,
+            "reset_dag_runs": False,
+            "only_failed": True,
+            "only_running": False,
+        }
+        self.create_task_instances(
+            session,
+            dag_id=dag_id,
+            task_instances=[{"logical_date": DEFAULT_DATETIME_1, "state": 
State.FAILED}],
+            update_extras=False,
+        )
+        response = test_client.post(
+            f"/dags/{dag_id}/clearTaskInstances",
+            json=payload,
+        )
+        assert response.status_code == 200
+        response_data = response.json()
+        assert response_data["total_entries"] == 1
+        ti_id = response_data["task_instances"][0]["id"]
+        _check_task_instance_note(session, ti_id, {"content": 
"placeholder-note", "user_id": None})
+
+    @pytest.mark.db_test
+    def test_clear_dry_run_does_not_set_note(self, test_client, session):
+        """Test that a note is NOT updated when dry_run=True even if note is 
provided."""
+        dag_id = "example_python_operator"
+        note_value = "Should not be set"
+        payload = {
+            "dry_run": True,
+            "reset_dag_runs": False,
+            "only_failed": True,
+            "only_running": False,
+            "note": note_value,
+        }
+        self.create_task_instances(
+            session,
+            dag_id=dag_id,
+            task_instances=[{"logical_date": DEFAULT_DATETIME_1, "state": 
State.FAILED}],
+            update_extras=False,
+        )
+        response = test_client.post(
+            f"/dags/{dag_id}/clearTaskInstances",
+            json=payload,
+        )
+        assert response.status_code == 200
+        response_data = response.json()
+        assert response_data["total_entries"] == 1
+        ti_id = response_data["task_instances"][0]["id"]
+        _check_task_instance_note(session, ti_id, {"content": 
"placeholder-note", "user_id": None})
+
 
 class TestGetTaskInstanceTries(TestTaskInstanceEndpoint):
     def test_should_respond_200(self, test_client, session):
@@ -6001,6 +6083,64 @@ class TestBulkTaskInstances(TestTaskInstanceEndpoint):
         for task_id, value in expected_results.items():
             assert sorted(response_data[task_id]) == sorted(value)
 
+    @pytest.mark.parametrize(
+        ("map_index", "new_state"),
+        [
+            pytest.param(0, "failed", id="mapped-ti-map-index-0-failed"),
+            pytest.param(1, "failed", id="mapped-ti-map-index-1-failed"),
+            pytest.param(2, "success", id="mapped-ti-map-index-2-success"),
+        ],
+    )
+    def test_bulk_update_mapped_task_instance_state_is_persisted(
+        self, test_client, session, map_index, new_state
+    ):
+        """Verify that bulk-updating a specific mapped TI actually persists 
the new state in the DB."""
+        self.create_task_instances(
+            session,
+            task_instances=[{"state": State.RUNNING, "map_indexes": (0, 1, 
2)}],
+        )
+
+        response = test_client.patch(
+            self.ENDPOINT_URL,
+            json={
+                "actions": [
+                    {
+                        "action": "update",
+                        "entities": [
+                            {
+                                "task_id": self.TASK_ID,
+                                "map_index": map_index,
+                                "new_state": new_state,
+                            }
+                        ],
+                    }
+                ]
+            },
+        )
+        assert response.status_code == 200
+        assert response.json()["update"]["success"] == [
+            f"{self.DAG_ID}.{self.RUN_ID}.{self.TASK_ID}[{map_index}]"
+        ]
+
+        session.expire_all()
+        # Verify only the targeted mapped TI changed state; others remain 
unchanged.
+        for mi in [0, 1, 2]:
+            ti = session.scalar(
+                select(TaskInstance).where(
+                    TaskInstance.dag_id == self.DAG_ID,
+                    TaskInstance.run_id == self.RUN_ID,
+                    TaskInstance.task_id == self.TASK_ID,
+                    TaskInstance.map_index == mi,
+                )
+            )
+            assert ti is not None
+            if mi == map_index:
+                assert ti.state == new_state, f"Expected map_index={mi} to be 
{new_state!r}, got {ti.state!r}"
+            else:
+                assert ti.state == State.RUNNING, (
+                    f"Expected map_index={mi} to remain running, got 
{ti.state!r}"
+                )
+
     def test_should_respond_401(self, unauthenticated_test_client):
         response = unauthenticated_test_client.patch(self.ENDPOINT_URL, 
json={})
         assert response.status_code == 401
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py 
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index bb375f88834..1fe38713703 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -173,6 +173,7 @@ class ClearTaskInstancesBody(BaseModel):
         ),
     ] = False
     prevent_running_task: Annotated[bool | None, Field(title="Prevent Running 
Task")] = False
+    note: Annotated[Note | None, Field(title="Note")] = None
 
 
 class Value(RootModel[list]):

Reply via email to