This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fe92cfd4e01 API: Add bulk update to mark Dag runs as success/failed 
(#67948)
fe92cfd4e01 is described below

commit fe92cfd4e01d17bdfcb76d6a2a229a4f1a2c50f3
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Mon Jun 8 20:58:44 2026 +0200

    API: Add bulk update to mark Dag runs as success/failed (#67948)
    
    * API: Add bulk update to mark Dag runs as success/failed
    
    * Address review: factor out dag-run patch helpers and align bulk update 
with task-instance service
    
    * Adjusments on action_on_existence behavior
    
    * Make cross-module patch_dag_run helpers public (drop leading underscore)
    
    patch_dag_run_state / patch_dag_run_note are imported by the route module, 
so
    the leading underscore was misleading. Also rename the bulk-delete `keys` 
set
    to `to_delete_keys` to mirror the bulk-update handler.
---
 .../api_fastapi/core_api/datamodels/dag_run.py     |   4 +-
 .../core_api/openapi/v2-rest-api-generated.yaml    |  14 +-
 .../api_fastapi/core_api/routes/public/dag_run.py  |  48 +---
 .../core_api/services/public/dag_run.py            | 244 +++++++++++++++------
 .../src/airflow/ui/openapi-gen/queries/queries.ts  |   2 +-
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts |  24 +-
 .../ui/openapi-gen/requests/services.gen.ts        |   2 +-
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |   4 +-
 .../core_api/routes/public/test_dag_run.py         | 133 ++++++++++-
 .../src/airflowctl/api/datamodels/generated.py     | 194 ++++++++--------
 10 files changed, 454 insertions(+), 215 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
index d3a3f4badb2..88ffceca6d7 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -51,10 +51,12 @@ class DAGRunPatchBody(StrictBaseModel):
 
 
 class BulkDAGRunBody(StrictBaseModel):
-    """Request body for bulk delete operations on Dag Runs."""
+    """Request body for bulk operations on Dag Runs."""
 
     dag_run_id: str
     dag_id: str | None = None
+    state: DagRunMutableStates | None = None
+    note: str | None = Field(None, max_length=1000)
 
 
 class BaseDAGRunClear(StrictBaseModel):
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 1d82fb555d5..87fc2a858b9 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -2153,7 +2153,7 @@ paths:
       tags:
       - DagRun
       summary: Bulk Dag Runs
-      description: Bulk delete Dag Runs.
+      description: Bulk update or delete Dag Runs.
       operationId: bulk_dag_runs
       security:
       - OAuth2PasswordBearer: []
@@ -12122,12 +12122,22 @@ components:
           - type: string
           - type: 'null'
           title: Dag Id
+        state:
+          anyOf:
+          - $ref: '#/components/schemas/DagRunMutableStates'
+          - type: 'null'
+        note:
+          anyOf:
+          - type: string
+            maxLength: 1000
+          - type: 'null'
+          title: Note
       additionalProperties: false
       type: object
       required:
       - dag_run_id
       title: BulkDAGRunBody
-      description: Request body for bulk delete operations on Dag Runs.
+      description: Request body for bulk operations on Dag Runs.
     BulkDAGRunClearBody:
       properties:
         dry_run:
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index b1667b5eb2d..784e549f1be 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -20,7 +20,6 @@ from __future__ import annotations
 import textwrap
 from typing import Annotated, Literal, cast
 
-import structlog
 from fastapi import Depends, HTTPException, Query, Request, status
 from fastapi.exceptions import RequestValidationError
 from fastapi.responses import StreamingResponse
@@ -28,11 +27,6 @@ from pydantic import ValidationError
 from sqlalchemy import select
 from sqlalchemy.orm import joinedload
 
-from airflow.api.common.mark_tasks import (
-    set_dag_run_state_to_failed,
-    set_dag_run_state_to_queued,
-    set_dag_run_state_to_success,
-)
 from airflow.api_fastapi.app import get_auth_manager
 from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity, DagDetails
 from airflow.api_fastapi.common.cursors import (
@@ -106,19 +100,18 @@ from airflow.api_fastapi.core_api.services.public.dag_run 
import (
     DagRunWaiter,
     dry_run_clear_dag_run,
     get_dag_run_and_dag_for_clear,
+    patch_dag_run_note,
+    patch_dag_run_state,
     perform_clear_dag_run,
 )
 from airflow.api_fastapi.logging.decorators import action_logging
 from airflow.exceptions import ParamValidationError
-from airflow.listeners.listener import get_listener_manager
 from airflow.models import DagModel, DagRun
 from airflow.models.asset import AssetEvent
 from airflow.models.dag_version import DagVersion
 from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
-log = structlog.get_logger(__name__)
-
 dag_run_router = AirflowRouter(tags=["DagRun"], 
prefix="/dags/{dag_id}/dagRuns")
 dag_run_at_dag_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}")
 
@@ -227,33 +220,12 @@ def patch_dag_run(
     data = patch_body.model_dump(include=fields_to_update, by_alias=True)
 
     for attr_name, attr_value_raw in data.items():
-        if attr_name == "state":
-            attr_value = getattr(patch_body, "state")
-            if attr_value == DagRunMutableStates.SUCCESS:
-                set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, 
commit=True, session=session)
-                try:
-                    
get_listener_manager().hook.on_dag_run_success(dag_run=dag_run, msg="")
-                except Exception:
-                    log.exception("error calling listener")
-
-            # TODO AIP-103: https://github.com/apache/airflow/issues/66755
-            # Handle clearing states for all task instances in a dagrun when 
cleared
-            elif attr_value == DagRunMutableStates.QUEUED:
-                set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, 
commit=True, session=session)
-                # Not notifying on queued - only notifying on RUNNING, this is 
happening in scheduler
-            elif attr_value == DagRunMutableStates.FAILED:
-                set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, 
commit=True, session=session)
-                try:
-                    
get_listener_manager().hook.on_dag_run_failed(dag_run=dag_run, msg="")
-                except Exception:
-                    log.exception("error calling listener")
+        if attr_name == "state" and patch_body.state is not None:
+            patch_dag_run_state(dag=dag, dag_run=dag_run, 
state=patch_body.state, session=session)
         elif attr_name == "note":
             updated_dag_run = session.get(DagRun, dag_run.id)
-            if updated_dag_run and updated_dag_run.dag_run_note is None:
-                updated_dag_run.note = (attr_value_raw, user.get_id())
-            elif updated_dag_run:
-                updated_dag_run.dag_run_note.content = attr_value_raw
-                updated_dag_run.dag_run_note.user_id = user.get_id()
+            if updated_dag_run is not None:
+                patch_dag_run_note(dag_run=updated_dag_run, 
note=attr_value_raw, user=user)
 
     final_dag_run = session.get(DagRun, dag_run.id)
     if not final_dag_run:
@@ -270,9 +242,13 @@ def bulk_dag_runs(
     request: BulkBody[BulkDAGRunBody],
     session: SessionDep,
     dag_id: str,
+    dag_bag: DagBagDep,
+    user: GetUserDep,
 ) -> BulkResponse:
-    """Bulk delete Dag Runs."""
-    return BulkDagRunService(session=session, request=request, 
dag_id=dag_id).handle_request()
+    """Bulk update or delete Dag Runs."""
+    return BulkDagRunService(
+        session=session, request=request, dag_id=dag_id, dag_bag=dag_bag, 
user=user
+    ).handle_request()
 
 
 @dag_run_router.get(
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
index 3c9d8cd7eba..d063493a994 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
@@ -29,8 +29,13 @@ from fastapi import HTTPException, status
 from sqlalchemy import select, tuple_
 from sqlalchemy.orm import Session, joinedload
 
+from airflow.api.common.mark_tasks import (
+    set_dag_run_state_to_failed,
+    set_dag_run_state_to_queued,
+    set_dag_run_state_to_success,
+)
 from airflow.api_fastapi.auth.managers.models.base_user import BaseUser
-from airflow.api_fastapi.common.dagbag import DagBagDep, 
get_latest_version_of_dag
+from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run, 
get_latest_version_of_dag
 from airflow.api_fastapi.common.db.task_instances import 
eager_load_TI_and_TIH_for_validation
 from airflow.api_fastapi.core_api.datamodels.common import (
     BulkActionNotOnExistence,
@@ -43,6 +48,7 @@ from airflow.api_fastapi.core_api.datamodels.common import (
 from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunBody, 
DagRunMutableStates
 from airflow.api_fastapi.core_api.datamodels.task_instances import 
NewTaskResponse
 from airflow.api_fastapi.core_api.services.public.common import BulkService, 
resolve_run_on_latest_version
+from airflow.listeners.listener import get_listener_manager
 from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance
 from airflow.models.xcom import XCOM_RETURN_KEY, XComModel
@@ -140,14 +146,46 @@ def perform_clear_dag_run(
     if not dag_run_cleared:
         raise HTTPException(status.HTTP_404_NOT_FOUND, "Dag run not found 
after clearing")
     if note is not None:
-        if dag_run_cleared.dag_run_note is None:
-            dag_run_cleared.note = (note, user.get_id())
-        else:
-            dag_run_cleared.dag_run_note.content = note
-            dag_run_cleared.dag_run_note.user_id = user.get_id()
+        patch_dag_run_note(dag_run=dag_run_cleared, note=note, user=user)
     return dag_run_cleared
 
 
+def patch_dag_run_state(
+    *,
+    dag: SerializedDAG,
+    dag_run: DagRun,
+    state: DagRunMutableStates,
+    session: Session,
+) -> None:
+    """Set a Dag Run's state (success/queued/failed), firing the matching 
listener hooks."""
+    if state == DagRunMutableStates.SUCCESS:
+        set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, 
commit=True, session=session)
+        try:
+            get_listener_manager().hook.on_dag_run_success(dag_run=dag_run, 
msg="")
+        except Exception:
+            log.exception("error calling listener")
+    elif state == DagRunMutableStates.QUEUED:
+        # TODO AIP-103: https://github.com/apache/airflow/issues/66755
+        # Handle clearing states for all task instances in a dagrun when 
cleared.
+        # Not notifying on queued - only notifying on RUNNING, which happens 
in the scheduler.
+        set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, 
commit=True, session=session)
+    elif state == DagRunMutableStates.FAILED:
+        set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, 
commit=True, session=session)
+        try:
+            get_listener_manager().hook.on_dag_run_failed(dag_run=dag_run, 
msg="")
+        except Exception:
+            log.exception("error calling listener")
+
+
+def patch_dag_run_note(*, dag_run: DagRun, note: str | None, user: BaseUser) 
-> None:
+    """Set or update a Dag Run's note."""
+    if dag_run.dag_run_note is None:
+        dag_run.note = (note, user.get_id())
+    else:
+        dag_run.dag_run_note.content = note
+        dag_run.dag_run_note.user_id = user.get_id()
+
+
 @attrs.define
 class DagRunWaiter:
     """Wait for the specified dag run to finish, and collect info from it."""
@@ -209,9 +247,13 @@ class BulkDagRunService(BulkService[BulkDAGRunBody]):
         session: Session,
         request: BulkBody[BulkDAGRunBody],
         dag_id: str,
+        dag_bag: DagBagDep,
+        user: BaseUser,
     ):
         super().__init__(session, request)
         self.dag_id = dag_id
+        self.dag_bag = dag_bag
+        self.user = user
 
     def handle_bulk_create(
         self, action: BulkCreateAction[BulkDAGRunBody], results: 
BulkActionResponse
@@ -223,77 +265,151 @@ class BulkDagRunService(BulkService[BulkDAGRunBody]):
             }
         )
 
+    def _resolve_entity_key(
+        self, entity: str | BulkDAGRunBody, results: BulkActionResponse
+    ) -> tuple[str, str] | None:
+        """
+        Resolve the ``(dag_id, dag_run_id)`` for an entity.
+
+        Records a 400 error and returns ``None`` when a wildcard ``~`` leaves 
the
+        dag_id unresolved. Shared by the bulk update and delete handlers.
+        """
+        if isinstance(entity, str):
+            dag_id, dag_run_id = self.dag_id, entity
+        else:
+            dag_id = entity.dag_id or self.dag_id
+            dag_run_id = entity.dag_run_id
+
+        if dag_id == "~" or dag_run_id == "~":
+            if isinstance(entity, str):
+                error_msg = (
+                    "When using wildcard in path, dag_id must be specified in 
BulkDAGRunBody"
+                    f" object, not as string for dag_run_id: {entity}"
+                )
+            else:
+                error_msg = (
+                    "When using wildcard in path, dag_id must be specified in 
request body for"
+                    f" dag_run_id: {entity.dag_run_id}"
+                )
+            results.errors.append({"error": error_msg, "status_code": 
status.HTTP_400_BAD_REQUEST})
+            return None
+
+        return (dag_id, dag_run_id)
+
+    def _categorize_dag_runs(
+        self, keys: set[tuple[str, str]]
+    ) -> tuple[dict[tuple[str, str], DagRun], set[tuple[str, str]], 
set[tuple[str, str]]]:
+        """
+        Split the requested ``(dag_id, dag_run_id)`` keys into existing and 
missing ones.
+
+        :return: tuple of (dag_run_map, matched_keys, not_found_keys). Shared 
by the bulk
+            update and delete handlers.
+        """
+        dag_run_map = {
+            (dr.dag_id, dr.run_id): dr
+            for dr in self.session.scalars(
+                select(DagRun).where(tuple_(DagRun.dag_id, 
DagRun.run_id).in_(list(keys)))
+            )
+        }
+        matched_keys = set(dag_run_map.keys())
+        not_found_keys = keys - matched_keys
+        return dag_run_map, matched_keys, not_found_keys
+
     def handle_bulk_update(
         self, action: BulkUpdateAction[BulkDAGRunBody], results: 
BulkActionResponse
     ) -> None:
-        results.errors.append(
-            {
-                "error": "Dag Runs bulk update is not supported yet. Use the 
patch Dag Run endpoint per run instead.",
-                "status_code": status.HTTP_405_METHOD_NOT_ALLOWED,
-            }
-        )
-
-    def handle_bulk_delete(
-        self, action: BulkDeleteAction[BulkDAGRunBody], results: 
BulkActionResponse
-    ) -> None:
-        """Bulk delete Dag Runs."""
-        keys: set[tuple[str, str]] = set()
-
+        """Bulk update Dag Runs (mark as success/failed/queued and/or set a 
note)."""
+        entities_by_key: dict[tuple[str, str], BulkDAGRunBody] = {}
         for entity in action.entities:
             if isinstance(entity, str):
-                dag_id, dag_run_id = self.dag_id, entity
-            else:
-                dag_id = entity.dag_id or self.dag_id
-                dag_run_id = entity.dag_run_id
-
-            if dag_id == "~" or dag_run_id == "~":
-                if isinstance(entity, str):
-                    error_msg = (
-                        "When using wildcard in path, dag_id must be specified 
in BulkDAGRunBody"
-                        f" object, not as string for dag_run_id: {entity}"
-                    )
-                else:
-                    error_msg = (
-                        "When using wildcard in path, dag_id must be specified 
in request body for"
-                        f" dag_run_id: {entity.dag_run_id}"
-                    )
                 results.errors.append(
-                    {"error": error_msg, "status_code": 
status.HTTP_400_BAD_REQUEST},
+                    {
+                        "error": (
+                            "Bulk update requires a BulkDAGRunBody object,"
+                            f" not a string for dag_run_id: {entity}"
+                        ),
+                        "status_code": status.HTTP_400_BAD_REQUEST,
+                    }
                 )
                 continue
+            key = self._resolve_entity_key(entity, results)
+            if key is not None:
+                entities_by_key[key] = entity
 
-            keys.add((dag_id, dag_run_id))
-
-        if not keys:
+        if not entities_by_key:
             return
 
-        dag_runs = self.session.scalars(
-            select(DagRun).where(tuple_(DagRun.dag_id, 
DagRun.run_id).in_(list(keys)))
-        ).all()
-        dag_run_map = {(dr.dag_id, dr.run_id): dr for dr in dag_runs}
-        not_found = keys - dag_run_map.keys()
+        to_update_keys = set(entities_by_key.keys())
+        dag_run_map, matched_keys, not_found_keys = 
self._categorize_dag_runs(to_update_keys)
 
-        if action.action_on_non_existence == BulkActionNotOnExistence.FAIL:
-            for dag_id, run_id in sorted(not_found):
-                results.errors.append(
-                    {
-                        "error": (f"The DagRun with dag_id: `{dag_id}` and 
run_id: `{run_id}` was not found"),
-                        "status_code": status.HTTP_404_NOT_FOUND,
-                    }
+        try:
+            if action.action_on_non_existence == BulkActionNotOnExistence.FAIL 
and not_found_keys:
+                raise HTTPException(
+                    status.HTTP_404_NOT_FOUND,
+                    f"The DagRuns with these identifiers: 
{sorted(not_found_keys)} were not found",
                 )
+            update_keys = (
+                matched_keys
+                if action.action_on_non_existence == 
BulkActionNotOnExistence.SKIP
+                else to_update_keys
+            )
+
+            for key, entity in entities_by_key.items():
+                if key not in update_keys:
+                    continue
+                dag_id, run_id = key
+                dag_run = dag_run_map[key]
+                if entity.state is not None:
+                    dag = get_dag_for_run(self.dag_bag, dag_run, 
session=self.session)
+                    patch_dag_run_state(dag=dag, dag_run=dag_run, 
state=entity.state, session=self.session)
+                if entity.note is not None:
+                    patch_dag_run_note(dag_run=dag_run, note=entity.note, 
user=self.user)
+                results.success.append(f"{dag_id}.{run_id}")
+        except HTTPException as e:
+            results.errors.append({"error": f"{e.detail}", "status_code": 
e.status_code})
 
+    def handle_bulk_delete(
+        self, action: BulkDeleteAction[BulkDAGRunBody], results: 
BulkActionResponse
+    ) -> None:
+        """Bulk delete Dag Runs."""
+        to_delete_keys: set[tuple[str, str]] = set()
+        for entity in action.entities:
+            key = self._resolve_entity_key(entity, results)
+            if key is not None:
+                to_delete_keys.add(key)
+
+        if not to_delete_keys:
+            return
+
+        dag_run_map, matched_keys, not_found_keys = 
self._categorize_dag_runs(to_delete_keys)
         deletable_states = {s.value for s in DagRunMutableStates}
-        for (dag_id, run_id), dag_run in dag_run_map.items():
-            if dag_run.state not in deletable_states:
-                results.errors.append(
-                    {
-                        "error": (
-                            f"The DagRun with dag_id: `{dag_id}` and run_id: 
`{run_id}` "
-                            f"cannot be deleted in {dag_run.state} state"
-                        ),
-                        "status_code": status.HTTP_409_CONFLICT,
-                    }
+
+        try:
+            if action.action_on_non_existence == BulkActionNotOnExistence.FAIL 
and not_found_keys:
+                raise HTTPException(
+                    status.HTTP_404_NOT_FOUND,
+                    f"The DagRuns with these identifiers: 
{sorted(not_found_keys)} were not found",
                 )
-                continue
-            self.session.delete(dag_run)
-            results.success.append(f"{dag_id}.{run_id}")
+            delete_keys = (
+                matched_keys
+                if action.action_on_non_existence == 
BulkActionNotOnExistence.SKIP
+                else to_delete_keys
+            )
+
+            for dag_id, run_id in sorted(delete_keys):
+                dag_run = dag_run_map[(dag_id, run_id)]
+                if dag_run.state not in deletable_states:
+                    results.errors.append(
+                        {
+                            "error": (
+                                f"The DagRun with dag_id: `{dag_id}` and 
run_id: `{run_id}` "
+                                f"cannot be deleted in {dag_run.state} state"
+                            ),
+                            "status_code": status.HTTP_409_CONFLICT,
+                        }
+                    )
+                    continue
+                self.session.delete(dag_run)
+                results.success.append(f"{dag_id}.{run_id}")
+        except HTTPException as e:
+            results.errors.append({"error": f"{e.detail}", "status_code": 
e.status_code})
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index dc8b4e1cb21..8f5c660029c 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -2523,7 +2523,7 @@ export const useDagRunServicePatchDagRun = <TData = 
Common.DagRunServicePatchDag
 }, TContext>({ mutationFn: ({ dagId, dagRunId, requestBody, updateMask }) => 
DagRunService.patchDagRun({ dagId, dagRunId, requestBody, updateMask }) as 
unknown as Promise<TData>, ...options });
 /**
 * Bulk Dag Runs
-* Bulk delete Dag Runs.
+* Bulk update or delete Dag Runs.
 * @param data The data for the request.
 * @param data.dagId
 * @param data.requestBody
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 651d22d03b5..6e73f5b9204 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1104,13 +1104,35 @@ export const $BulkDAGRunBody = {
                 }
             ],
             title: 'Dag Id'
+        },
+        state: {
+            anyOf: [
+                {
+                    '$ref': '#/components/schemas/DagRunMutableStates'
+                },
+                {
+                    type: 'null'
+                }
+            ]
+        },
+        note: {
+            anyOf: [
+                {
+                    type: 'string',
+                    maxLength: 1000
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Note'
         }
     },
     additionalProperties: false,
     type: 'object',
     required: ['dag_run_id'],
     title: 'BulkDAGRunBody',
-    description: 'Request body for bulk delete operations on Dag Runs.'
+    description: 'Request body for bulk operations on Dag Runs.'
 } as const;
 
 export const $BulkDAGRunClearBody = {
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index b88e7522b03..d69a06aa60c 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -985,7 +985,7 @@ export class DagRunService {
     
     /**
      * Bulk Dag Runs
-     * Bulk delete Dag Runs.
+     * Bulk update or delete Dag Runs.
      * @param data The data for the request.
      * @param data.dagId
      * @param data.requestBody
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index fb93b5e093d..62260fa6113 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -334,11 +334,13 @@ export type BulkCreateAction_VariableBody_ = {
 };
 
 /**
- * Request body for bulk delete operations on Dag Runs.
+ * Request body for bulk operations on Dag Runs.
  */
 export type BulkDAGRunBody = {
     dag_run_id: string;
     dag_id?: string | null;
+    state?: DagRunMutableStates | null;
+    note?: string | null;
 };
 
 /**
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index a13175970b5..4dfb5bb889a 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -3034,7 +3034,7 @@ class TestBulkDagRuns:
         assert session.scalar(select(DagRun).where(DagRun.run_id == 
"running_run")) is not None
 
     def test_bulk_delete_not_found_fails(self, test_client, session):
-        """When FAIL semantics, each missing run is reported individually and 
matched runs still get deleted."""
+        """FAIL semantics: a single missing run fails the whole action and 
nothing is deleted."""
         response = test_client.patch(
             self.ENDPOINT_URL,
             json={
@@ -3048,16 +3048,15 @@ class TestBulkDagRuns:
         )
         assert response.status_code == 200
         body = response.json()
-        assert body["delete"]["success"] == [f"{DAG1_ID}.{DAG1_RUN1_ID}"]
+        assert body["delete"]["success"] == []
         errors = body["delete"]["errors"]
-        assert len(errors) == 2
-        assert all(err["status_code"] == 404 for err in errors)
-        assert {err["error"] for err in errors} == {
-            f"The DagRun with dag_id: `{DAG1_ID}` and run_id: 
`another_missing_run` was not found",
-            f"The DagRun with dag_id: `{DAG1_ID}` and run_id: 
`non_existent_run` was not found",
-        }
+        assert len(errors) == 1
+        assert errors[0]["status_code"] == 404
+        assert "non_existent_run" in errors[0]["error"]
+        assert "another_missing_run" in errors[0]["error"]
         session.expire_all()
-        assert session.scalar(select(DagRun).where(DagRun.run_id == 
DAG1_RUN1_ID)) is None
+        # The matched run must not be deleted when another entity is missing.
+        assert session.scalar(select(DagRun).where(DagRun.run_id == 
DAG1_RUN1_ID)) is not None
 
     def test_bulk_delete_not_found_skip(self, test_client, session):
         response = test_client.patch(
@@ -3137,14 +3136,94 @@ class TestBulkDagRuns:
         assert len(body["create"]["errors"]) == 1
         assert body["create"]["errors"][0]["status_code"] == 405
 
-    def test_bulk_update_not_supported(self, test_client):
+    def test_bulk_update_marks_state(self, test_client, session):
+        """Bulk update marks the selected Dag Runs to the requested state in a 
single call."""
         response = test_client.patch(
             self.ENDPOINT_URL,
             json={
                 "actions": [
                     {
                         "action": "update",
-                        "entities": [{"dag_run_id": DAG1_RUN1_ID}],
+                        "entities": [
+                            {"dag_run_id": DAG1_RUN1_ID, "state": "failed"},
+                            {"dag_run_id": DAG1_RUN2_ID, "state": "failed"},
+                        ],
+                    }
+                ]
+            },
+        )
+        assert response.status_code == 200
+        body = response.json()
+        assert sorted(body["update"]["success"]) == sorted(
+            [f"{DAG1_ID}.{DAG1_RUN1_ID}", f"{DAG1_ID}.{DAG1_RUN2_ID}"]
+        )
+        assert body["update"]["errors"] == []
+        session.expire_all()
+        for run_id in (DAG1_RUN1_ID, DAG1_RUN2_ID):
+            dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == 
DAG1_ID, DagRun.run_id == run_id))
+            assert dag_run.state == DagRunState.FAILED
+
+    def test_bulk_update_across_dags_with_wildcard(self, test_client, session):
+        """``~`` URL with per-entity dag_id marks runs across Dags in one 
call."""
+        response = test_client.patch(
+            self.WILDCARD_ENDPOINT,
+            json={
+                "actions": [
+                    {
+                        "action": "update",
+                        "entities": [
+                            {"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN1_ID, 
"state": "success"},
+                            {"dag_id": DAG2_ID, "dag_run_id": DAG2_RUN1_ID, 
"state": "success"},
+                        ],
+                    }
+                ]
+            },
+        )
+        assert response.status_code == 200
+        body = response.json()
+        assert sorted(body["update"]["success"]) == sorted(
+            [f"{DAG1_ID}.{DAG1_RUN1_ID}", f"{DAG2_ID}.{DAG2_RUN1_ID}"]
+        )
+        session.expire_all()
+        for run_id in (DAG1_RUN1_ID, DAG2_RUN1_ID):
+            assert session.scalar(select(DagRun).where(DagRun.run_id == 
run_id)).state == DagRunState.SUCCESS
+
+    def test_bulk_update_note_only(self, test_client, session):
+        """A bulk update may set only the note, without a target state."""
+        response = test_client.patch(
+            self.ENDPOINT_URL,
+            json={
+                "actions": [
+                    {
+                        "action": "update",
+                        "entities": [{"dag_run_id": DAG1_RUN1_ID, "note": 
"bulk note"}],
+                    }
+                ]
+            },
+        )
+        assert response.status_code == 200
+        body = response.json()
+        assert body["update"]["success"] == [f"{DAG1_ID}.{DAG1_RUN1_ID}"]
+        assert body["update"]["errors"] == []
+        session.expire_all()
+        dag_run = session.scalar(
+            select(DagRun).where(DagRun.dag_id == DAG1_ID, DagRun.run_id == 
DAG1_RUN1_ID)
+        )
+        assert dag_run.note == "bulk note"
+        assert dag_run.state == DAG1_RUN1_STATE
+
+    def test_bulk_update_not_found_fails(self, test_client, session):
+        """FAIL semantics: a single missing run fails the whole action and 
nothing is updated."""
+        response = test_client.patch(
+            self.ENDPOINT_URL,
+            json={
+                "actions": [
+                    {
+                        "action": "update",
+                        "entities": [
+                            {"dag_run_id": DAG1_RUN1_ID, "state": "failed"},
+                            {"dag_run_id": "non_existent_run", "state": 
"failed"},
+                        ],
                     }
                 ]
             },
@@ -3153,7 +3232,37 @@ class TestBulkDagRuns:
         body = response.json()
         assert body["update"]["success"] == []
         assert len(body["update"]["errors"]) == 1
-        assert body["update"]["errors"][0]["status_code"] == 405
+        assert body["update"]["errors"][0]["status_code"] == 404
+        assert "non_existent_run" in body["update"]["errors"][0]["error"]
+        session.expire_all()
+        # The matched run must keep its original state when another entity is 
missing.
+        dag_run = session.scalar(select(DagRun).where(DagRun.run_id == 
DAG1_RUN1_ID))
+        assert dag_run.state == DAG1_RUN1_STATE
+
+    def test_bulk_update_not_found_skip(self, test_client, session):
+        """SKIP semantics: missing runs are ignored and matched runs are still 
updated."""
+        response = test_client.patch(
+            self.ENDPOINT_URL,
+            json={
+                "actions": [
+                    {
+                        "action": "update",
+                        "action_on_non_existence": "skip",
+                        "entities": [
+                            {"dag_run_id": DAG1_RUN1_ID, "state": "failed"},
+                            {"dag_run_id": "non_existent_run", "state": 
"failed"},
+                        ],
+                    }
+                ]
+            },
+        )
+        assert response.status_code == 200
+        body = response.json()
+        assert body["update"]["success"] == [f"{DAG1_ID}.{DAG1_RUN1_ID}"]
+        assert body["update"]["errors"] == []
+        session.expire_all()
+        dag_run = session.scalar(select(DagRun).where(DagRun.run_id == 
DAG1_RUN1_ID))
+        assert dag_run.state == DagRunState.FAILED
 
     def test_bulk_delete_rejects_unauthorized_dag_ids_from_request_body(self, 
test_client, session):
         """A 403 at the route level if any entity references a Dag the user 
can't access."""
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py 
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 4e13fd0fcb6..ec2476ce6ab 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -146,64 +146,10 @@ class BulkActionResponse(BaseModel):
     ] = []
 
 
-class BulkDAGRunBody(BaseModel):
-    """
-    Request body for bulk delete operations on Dag Runs.
-    """
-
-    model_config = ConfigDict(
-        extra="forbid",
-    )
-    dag_run_id: Annotated[str, Field(title="Dag Run Id")]
-    dag_id: Annotated[str | None, Field(title="Dag Id")] = None
-
-
 class Note(RootModel[str]):
     root: Annotated[str, Field(max_length=1000, title="Note")]
 
 
-class BulkDAGRunClearBody(BaseModel):
-    """
-    Request body for the bulk clear Dag Runs endpoint.
-    """
-
-    model_config = ConfigDict(
-        extra="forbid",
-    )
-    dry_run: Annotated[bool | None, Field(title="Dry Run")] = True
-    only_failed: Annotated[bool | None, Field(title="Only Failed")] = False
-    only_new: Annotated[
-        bool | None,
-        Field(
-            description="Only queue newly added tasks in the latest Dag 
version without clearing existing tasks.",
-            title="Only New",
-        ),
-    ] = False
-    run_on_latest_version: Annotated[
-        bool | None,
-        Field(
-            description="(Experimental) Run on the latest bundle version of 
the Dag after clearing. If not specified, falls back to the DAG-level 
``rerun_with_latest_version`` parameter, then the ``[core] 
rerun_with_latest_version`` config option, and finally ``False``.",
-            title="Run On Latest Version",
-        ),
-    ] = None
-    note: Annotated[Note | None, Field(title="Note")] = None
-    dag_runs: Annotated[list[BulkDAGRunBody], Field(min_length=1, title="Dag 
Runs")]
-
-
-class BulkDeleteActionBulkDAGRunBody(BaseModel):
-    model_config = ConfigDict(
-        extra="forbid",
-    )
-    action: Annotated[
-        Literal["delete"], Field(description="The action to be performed on 
the entities.", title="Action")
-    ]
-    entities: Annotated[
-        list[str | BulkDAGRunBody],
-        Field(description="A list of entity id/key or entity objects to be 
deleted.", title="Entities"),
-    ]
-    action_on_non_existence: BulkActionNotOnExistence | None = "fail"
-
-
 class BulkResponse(BaseModel):
     """
     Serializer for responses to bulk entity operations.
@@ -227,26 +173,6 @@ class BulkResponse(BaseModel):
     ] = None
 
 
-class BulkUpdateActionBulkDAGRunBody(BaseModel):
-    model_config = ConfigDict(
-        extra="forbid",
-    )
-    action: Annotated[
-        Literal["update"], Field(description="The action to be performed on 
the entities.", title="Action")
-    ]
-    entities: Annotated[
-        list[BulkDAGRunBody], Field(description="A list of entities to be 
updated.", title="Entities")
-    ]
-    update_mask: Annotated[
-        list[str] | None,
-        Field(
-            description="A list of field names to update for each entity.Only 
these fields will be applied from the request body to the database model.Any 
extra fields provided will be ignored.",
-            title="Update Mask",
-        ),
-    ] = None
-    action_on_non_existence: BulkActionNotOnExistence | None = "fail"
-
-
 class TaskIds(RootModel[list]):
     root: Annotated[list, Field(max_length=2, min_length=2)]
 
@@ -1420,7 +1346,7 @@ class BackfillResponse(BaseModel):
     dag_display_name: Annotated[str, Field(title="Dag Display Name")]
 
 
-class BulkCreateActionBulkDAGRunBody(BaseModel):
+class BulkCreateActionConnectionBody(BaseModel):
     model_config = ConfigDict(
         extra="forbid",
     )
@@ -1428,12 +1354,12 @@ class BulkCreateActionBulkDAGRunBody(BaseModel):
         Literal["create"], Field(description="The action to be performed on 
the entities.", title="Action")
     ]
     entities: Annotated[
-        list[BulkDAGRunBody], Field(description="A list of entities to be 
created.", title="Entities")
+        list[ConnectionBody], Field(description="A list of entities to be 
created.", title="Entities")
     ]
     action_on_existence: BulkActionOnExistence | None = "fail"
 
 
-class BulkCreateActionConnectionBody(BaseModel):
+class BulkCreateActionPoolBody(BaseModel):
     model_config = ConfigDict(
         extra="forbid",
     )
@@ -1441,12 +1367,12 @@ class BulkCreateActionConnectionBody(BaseModel):
         Literal["create"], Field(description="The action to be performed on 
the entities.", title="Action")
     ]
     entities: Annotated[
-        list[ConnectionBody], Field(description="A list of entities to be 
created.", title="Entities")
+        list[PoolBody], Field(description="A list of entities to be created.", 
title="Entities")
     ]
     action_on_existence: BulkActionOnExistence | None = "fail"
 
 
-class BulkCreateActionPoolBody(BaseModel):
+class BulkCreateActionVariableBody(BaseModel):
     model_config = ConfigDict(
         extra="forbid",
     )
@@ -1454,22 +1380,65 @@ class BulkCreateActionPoolBody(BaseModel):
         Literal["create"], Field(description="The action to be performed on 
the entities.", title="Action")
     ]
     entities: Annotated[
-        list[PoolBody], Field(description="A list of entities to be created.", 
title="Entities")
+        list[VariableBody], Field(description="A list of entities to be 
created.", title="Entities")
     ]
     action_on_existence: BulkActionOnExistence | None = "fail"
 
 
-class BulkCreateActionVariableBody(BaseModel):
+class BulkDAGRunBody(BaseModel):
+    """
+    Request body for bulk operations on Dag Runs.
+    """
+
+    model_config = ConfigDict(
+        extra="forbid",
+    )
+    dag_run_id: Annotated[str, Field(title="Dag Run Id")]
+    dag_id: Annotated[str | None, Field(title="Dag Id")] = None
+    state: DagRunMutableStates | None = None
+    note: Annotated[Note | None, Field(title="Note")] = None
+
+
+class BulkDAGRunClearBody(BaseModel):
+    """
+    Request body for the bulk clear Dag Runs endpoint.
+    """
+
+    model_config = ConfigDict(
+        extra="forbid",
+    )
+    dry_run: Annotated[bool | None, Field(title="Dry Run")] = True
+    only_failed: Annotated[bool | None, Field(title="Only Failed")] = False
+    only_new: Annotated[
+        bool | None,
+        Field(
+            description="Only queue newly added tasks in the latest Dag 
version without clearing existing tasks.",
+            title="Only New",
+        ),
+    ] = False
+    run_on_latest_version: Annotated[
+        bool | None,
+        Field(
+            description="(Experimental) Run on the latest bundle version of 
the Dag after clearing. If not specified, falls back to the DAG-level 
``rerun_with_latest_version`` parameter, then the ``[core] 
rerun_with_latest_version`` config option, and finally ``False``.",
+            title="Run On Latest Version",
+        ),
+    ] = None
+    note: Annotated[Note | None, Field(title="Note")] = None
+    dag_runs: Annotated[list[BulkDAGRunBody], Field(min_length=1, title="Dag 
Runs")]
+
+
+class BulkDeleteActionBulkDAGRunBody(BaseModel):
     model_config = ConfigDict(
         extra="forbid",
     )
     action: Annotated[
-        Literal["create"], Field(description="The action to be performed on 
the entities.", title="Action")
+        Literal["delete"], Field(description="The action to be performed on 
the entities.", title="Action")
     ]
     entities: Annotated[
-        list[VariableBody], Field(description="A list of entities to be 
created.", title="Entities")
+        list[str | BulkDAGRunBody],
+        Field(description="A list of entity id/key or entity objects to be 
deleted.", title="Entities"),
     ]
-    action_on_existence: BulkActionOnExistence | None = "fail"
+    action_on_non_existence: BulkActionNotOnExistence | None = "fail"
 
 
 class BulkDeleteActionConnectionBody(BaseModel):
@@ -1534,6 +1503,26 @@ class BulkTaskInstanceBody(BaseModel):
     dag_run_id: Annotated[str | None, Field(title="Dag Run Id")] = None
 
 
+class BulkUpdateActionBulkDAGRunBody(BaseModel):
+    model_config = ConfigDict(
+        extra="forbid",
+    )
+    action: Annotated[
+        Literal["update"], Field(description="The action to be performed on 
the entities.", title="Action")
+    ]
+    entities: Annotated[
+        list[BulkDAGRunBody], Field(description="A list of entities to be 
updated.", title="Entities")
+    ]
+    update_mask: Annotated[
+        list[str] | None,
+        Field(
+            description="A list of field names to update for each entity.Only 
these fields will be applied from the request body to the database model.Any 
extra fields provided will be ignored.",
+            title="Update Mask",
+        ),
+    ] = None
+    action_on_non_existence: BulkActionNotOnExistence | None = "fail"
+
+
 class BulkUpdateActionBulkTaskInstanceBody(BaseModel):
     model_config = ConfigDict(
         extra="forbid",
@@ -2178,18 +2167,6 @@ class BackfillCollectionResponse(BaseModel):
     total_entries: Annotated[int, Field(title="Total Entries")]
 
 
-class BulkBodyBulkDAGRunBody(BaseModel):
-    model_config = ConfigDict(
-        extra="forbid",
-    )
-    actions: Annotated[
-        list[
-            BulkCreateActionBulkDAGRunBody | BulkUpdateActionBulkDAGRunBody | 
BulkDeleteActionBulkDAGRunBody
-        ],
-        Field(title="Actions"),
-    ]
-
-
 class BulkBodyConnectionBody(BaseModel):
     model_config = ConfigDict(
         extra="forbid",
@@ -2222,6 +2199,19 @@ class BulkBodyVariableBody(BaseModel):
     ]
 
 
+class BulkCreateActionBulkDAGRunBody(BaseModel):
+    model_config = ConfigDict(
+        extra="forbid",
+    )
+    action: Annotated[
+        Literal["create"], Field(description="The action to be performed on 
the entities.", title="Action")
+    ]
+    entities: Annotated[
+        list[BulkDAGRunBody], Field(description="A list of entities to be 
created.", title="Entities")
+    ]
+    action_on_existence: BulkActionOnExistence | None = "fail"
+
+
 class BulkCreateActionBulkTaskInstanceBody(BaseModel):
     model_config = ConfigDict(
         extra="forbid",
@@ -2435,6 +2425,18 @@ class TaskInstanceHistoryCollectionResponse(BaseModel):
     total_entries: Annotated[int, Field(title="Total Entries")]
 
 
+class BulkBodyBulkDAGRunBody(BaseModel):
+    model_config = ConfigDict(
+        extra="forbid",
+    )
+    actions: Annotated[
+        list[
+            BulkCreateActionBulkDAGRunBody | BulkUpdateActionBulkDAGRunBody | 
BulkDeleteActionBulkDAGRunBody
+        ],
+        Field(title="Actions"),
+    ]
+
+
 class BulkBodyBulkTaskInstanceBody(BaseModel):
     model_config = ConfigDict(
         extra="forbid",


Reply via email to