This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fe92cfd4e01 API: Add bulk update to mark Dag runs as success/failed
(#67948)
fe92cfd4e01 is described below
commit fe92cfd4e01d17bdfcb76d6a2a229a4f1a2c50f3
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Mon Jun 8 20:58:44 2026 +0200
API: Add bulk update to mark Dag runs as success/failed (#67948)
* API: Add bulk update to mark Dag runs as success/failed
* Address review: factor out dag-run patch helpers and align bulk update
with task-instance service
* Adjusments on action_on_existence behavior
* Make cross-module patch_dag_run helpers public (drop leading underscore)
patch_dag_run_state / patch_dag_run_note are imported by the route module,
so
the leading underscore was misleading. Also rename the bulk-delete `keys`
set
to `to_delete_keys` to mirror the bulk-update handler.
---
.../api_fastapi/core_api/datamodels/dag_run.py | 4 +-
.../core_api/openapi/v2-rest-api-generated.yaml | 14 +-
.../api_fastapi/core_api/routes/public/dag_run.py | 48 +---
.../core_api/services/public/dag_run.py | 244 +++++++++++++++------
.../src/airflow/ui/openapi-gen/queries/queries.ts | 2 +-
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 24 +-
.../ui/openapi-gen/requests/services.gen.ts | 2 +-
.../airflow/ui/openapi-gen/requests/types.gen.ts | 4 +-
.../core_api/routes/public/test_dag_run.py | 133 ++++++++++-
.../src/airflowctl/api/datamodels/generated.py | 194 ++++++++--------
10 files changed, 454 insertions(+), 215 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
index d3a3f4badb2..88ffceca6d7 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -51,10 +51,12 @@ class DAGRunPatchBody(StrictBaseModel):
class BulkDAGRunBody(StrictBaseModel):
- """Request body for bulk delete operations on Dag Runs."""
+ """Request body for bulk operations on Dag Runs."""
dag_run_id: str
dag_id: str | None = None
+ state: DagRunMutableStates | None = None
+ note: str | None = Field(None, max_length=1000)
class BaseDAGRunClear(StrictBaseModel):
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 1d82fb555d5..87fc2a858b9 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -2153,7 +2153,7 @@ paths:
tags:
- DagRun
summary: Bulk Dag Runs
- description: Bulk delete Dag Runs.
+ description: Bulk update or delete Dag Runs.
operationId: bulk_dag_runs
security:
- OAuth2PasswordBearer: []
@@ -12122,12 +12122,22 @@ components:
- type: string
- type: 'null'
title: Dag Id
+ state:
+ anyOf:
+ - $ref: '#/components/schemas/DagRunMutableStates'
+ - type: 'null'
+ note:
+ anyOf:
+ - type: string
+ maxLength: 1000
+ - type: 'null'
+ title: Note
additionalProperties: false
type: object
required:
- dag_run_id
title: BulkDAGRunBody
- description: Request body for bulk delete operations on Dag Runs.
+ description: Request body for bulk operations on Dag Runs.
BulkDAGRunClearBody:
properties:
dry_run:
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index b1667b5eb2d..784e549f1be 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -20,7 +20,6 @@ from __future__ import annotations
import textwrap
from typing import Annotated, Literal, cast
-import structlog
from fastapi import Depends, HTTPException, Query, Request, status
from fastapi.exceptions import RequestValidationError
from fastapi.responses import StreamingResponse
@@ -28,11 +27,6 @@ from pydantic import ValidationError
from sqlalchemy import select
from sqlalchemy.orm import joinedload
-from airflow.api.common.mark_tasks import (
- set_dag_run_state_to_failed,
- set_dag_run_state_to_queued,
- set_dag_run_state_to_success,
-)
from airflow.api_fastapi.app import get_auth_manager
from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity, DagDetails
from airflow.api_fastapi.common.cursors import (
@@ -106,19 +100,18 @@ from airflow.api_fastapi.core_api.services.public.dag_run
import (
DagRunWaiter,
dry_run_clear_dag_run,
get_dag_run_and_dag_for_clear,
+ patch_dag_run_note,
+ patch_dag_run_state,
perform_clear_dag_run,
)
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import ParamValidationError
-from airflow.listeners.listener import get_listener_manager
from airflow.models import DagModel, DagRun
from airflow.models.asset import AssetEvent
from airflow.models.dag_version import DagVersion
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
-log = structlog.get_logger(__name__)
-
dag_run_router = AirflowRouter(tags=["DagRun"],
prefix="/dags/{dag_id}/dagRuns")
dag_run_at_dag_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}")
@@ -227,33 +220,12 @@ def patch_dag_run(
data = patch_body.model_dump(include=fields_to_update, by_alias=True)
for attr_name, attr_value_raw in data.items():
- if attr_name == "state":
- attr_value = getattr(patch_body, "state")
- if attr_value == DagRunMutableStates.SUCCESS:
- set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id,
commit=True, session=session)
- try:
-
get_listener_manager().hook.on_dag_run_success(dag_run=dag_run, msg="")
- except Exception:
- log.exception("error calling listener")
-
- # TODO AIP-103: https://github.com/apache/airflow/issues/66755
- # Handle clearing states for all task instances in a dagrun when
cleared
- elif attr_value == DagRunMutableStates.QUEUED:
- set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id,
commit=True, session=session)
- # Not notifying on queued - only notifying on RUNNING, this is
happening in scheduler
- elif attr_value == DagRunMutableStates.FAILED:
- set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id,
commit=True, session=session)
- try:
-
get_listener_manager().hook.on_dag_run_failed(dag_run=dag_run, msg="")
- except Exception:
- log.exception("error calling listener")
+ if attr_name == "state" and patch_body.state is not None:
+ patch_dag_run_state(dag=dag, dag_run=dag_run,
state=patch_body.state, session=session)
elif attr_name == "note":
updated_dag_run = session.get(DagRun, dag_run.id)
- if updated_dag_run and updated_dag_run.dag_run_note is None:
- updated_dag_run.note = (attr_value_raw, user.get_id())
- elif updated_dag_run:
- updated_dag_run.dag_run_note.content = attr_value_raw
- updated_dag_run.dag_run_note.user_id = user.get_id()
+ if updated_dag_run is not None:
+ patch_dag_run_note(dag_run=updated_dag_run,
note=attr_value_raw, user=user)
final_dag_run = session.get(DagRun, dag_run.id)
if not final_dag_run:
@@ -270,9 +242,13 @@ def bulk_dag_runs(
request: BulkBody[BulkDAGRunBody],
session: SessionDep,
dag_id: str,
+ dag_bag: DagBagDep,
+ user: GetUserDep,
) -> BulkResponse:
- """Bulk delete Dag Runs."""
- return BulkDagRunService(session=session, request=request,
dag_id=dag_id).handle_request()
+ """Bulk update or delete Dag Runs."""
+ return BulkDagRunService(
+ session=session, request=request, dag_id=dag_id, dag_bag=dag_bag,
user=user
+ ).handle_request()
@dag_run_router.get(
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
index 3c9d8cd7eba..d063493a994 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
@@ -29,8 +29,13 @@ from fastapi import HTTPException, status
from sqlalchemy import select, tuple_
from sqlalchemy.orm import Session, joinedload
+from airflow.api.common.mark_tasks import (
+ set_dag_run_state_to_failed,
+ set_dag_run_state_to_queued,
+ set_dag_run_state_to_success,
+)
from airflow.api_fastapi.auth.managers.models.base_user import BaseUser
-from airflow.api_fastapi.common.dagbag import DagBagDep,
get_latest_version_of_dag
+from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run,
get_latest_version_of_dag
from airflow.api_fastapi.common.db.task_instances import
eager_load_TI_and_TIH_for_validation
from airflow.api_fastapi.core_api.datamodels.common import (
BulkActionNotOnExistence,
@@ -43,6 +48,7 @@ from airflow.api_fastapi.core_api.datamodels.common import (
from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunBody,
DagRunMutableStates
from airflow.api_fastapi.core_api.datamodels.task_instances import
NewTaskResponse
from airflow.api_fastapi.core_api.services.public.common import BulkService,
resolve_run_on_latest_version
+from airflow.listeners.listener import get_listener_manager
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.models.xcom import XCOM_RETURN_KEY, XComModel
@@ -140,14 +146,46 @@ def perform_clear_dag_run(
if not dag_run_cleared:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Dag run not found
after clearing")
if note is not None:
- if dag_run_cleared.dag_run_note is None:
- dag_run_cleared.note = (note, user.get_id())
- else:
- dag_run_cleared.dag_run_note.content = note
- dag_run_cleared.dag_run_note.user_id = user.get_id()
+ patch_dag_run_note(dag_run=dag_run_cleared, note=note, user=user)
return dag_run_cleared
+def patch_dag_run_state(
+ *,
+ dag: SerializedDAG,
+ dag_run: DagRun,
+ state: DagRunMutableStates,
+ session: Session,
+) -> None:
+ """Set a Dag Run's state (success/queued/failed), firing the matching
listener hooks."""
+ if state == DagRunMutableStates.SUCCESS:
+ set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id,
commit=True, session=session)
+ try:
+ get_listener_manager().hook.on_dag_run_success(dag_run=dag_run,
msg="")
+ except Exception:
+ log.exception("error calling listener")
+ elif state == DagRunMutableStates.QUEUED:
+ # TODO AIP-103: https://github.com/apache/airflow/issues/66755
+ # Handle clearing states for all task instances in a dagrun when
cleared.
+ # Not notifying on queued - only notifying on RUNNING, which happens
in the scheduler.
+ set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id,
commit=True, session=session)
+ elif state == DagRunMutableStates.FAILED:
+ set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id,
commit=True, session=session)
+ try:
+ get_listener_manager().hook.on_dag_run_failed(dag_run=dag_run,
msg="")
+ except Exception:
+ log.exception("error calling listener")
+
+
+def patch_dag_run_note(*, dag_run: DagRun, note: str | None, user: BaseUser)
-> None:
+ """Set or update a Dag Run's note."""
+ if dag_run.dag_run_note is None:
+ dag_run.note = (note, user.get_id())
+ else:
+ dag_run.dag_run_note.content = note
+ dag_run.dag_run_note.user_id = user.get_id()
+
+
@attrs.define
class DagRunWaiter:
"""Wait for the specified dag run to finish, and collect info from it."""
@@ -209,9 +247,13 @@ class BulkDagRunService(BulkService[BulkDAGRunBody]):
session: Session,
request: BulkBody[BulkDAGRunBody],
dag_id: str,
+ dag_bag: DagBagDep,
+ user: BaseUser,
):
super().__init__(session, request)
self.dag_id = dag_id
+ self.dag_bag = dag_bag
+ self.user = user
def handle_bulk_create(
self, action: BulkCreateAction[BulkDAGRunBody], results:
BulkActionResponse
@@ -223,77 +265,151 @@ class BulkDagRunService(BulkService[BulkDAGRunBody]):
}
)
+ def _resolve_entity_key(
+ self, entity: str | BulkDAGRunBody, results: BulkActionResponse
+ ) -> tuple[str, str] | None:
+ """
+ Resolve the ``(dag_id, dag_run_id)`` for an entity.
+
+ Records a 400 error and returns ``None`` when a wildcard ``~`` leaves
the
+ dag_id unresolved. Shared by the bulk update and delete handlers.
+ """
+ if isinstance(entity, str):
+ dag_id, dag_run_id = self.dag_id, entity
+ else:
+ dag_id = entity.dag_id or self.dag_id
+ dag_run_id = entity.dag_run_id
+
+ if dag_id == "~" or dag_run_id == "~":
+ if isinstance(entity, str):
+ error_msg = (
+ "When using wildcard in path, dag_id must be specified in
BulkDAGRunBody"
+ f" object, not as string for dag_run_id: {entity}"
+ )
+ else:
+ error_msg = (
+ "When using wildcard in path, dag_id must be specified in
request body for"
+ f" dag_run_id: {entity.dag_run_id}"
+ )
+ results.errors.append({"error": error_msg, "status_code":
status.HTTP_400_BAD_REQUEST})
+ return None
+
+ return (dag_id, dag_run_id)
+
+ def _categorize_dag_runs(
+ self, keys: set[tuple[str, str]]
+ ) -> tuple[dict[tuple[str, str], DagRun], set[tuple[str, str]],
set[tuple[str, str]]]:
+ """
+ Split the requested ``(dag_id, dag_run_id)`` keys into existing and
missing ones.
+
+ :return: tuple of (dag_run_map, matched_keys, not_found_keys). Shared
by the bulk
+ update and delete handlers.
+ """
+ dag_run_map = {
+ (dr.dag_id, dr.run_id): dr
+ for dr in self.session.scalars(
+ select(DagRun).where(tuple_(DagRun.dag_id,
DagRun.run_id).in_(list(keys)))
+ )
+ }
+ matched_keys = set(dag_run_map.keys())
+ not_found_keys = keys - matched_keys
+ return dag_run_map, matched_keys, not_found_keys
+
def handle_bulk_update(
self, action: BulkUpdateAction[BulkDAGRunBody], results:
BulkActionResponse
) -> None:
- results.errors.append(
- {
- "error": "Dag Runs bulk update is not supported yet. Use the
patch Dag Run endpoint per run instead.",
- "status_code": status.HTTP_405_METHOD_NOT_ALLOWED,
- }
- )
-
- def handle_bulk_delete(
- self, action: BulkDeleteAction[BulkDAGRunBody], results:
BulkActionResponse
- ) -> None:
- """Bulk delete Dag Runs."""
- keys: set[tuple[str, str]] = set()
-
+ """Bulk update Dag Runs (mark as success/failed/queued and/or set a
note)."""
+ entities_by_key: dict[tuple[str, str], BulkDAGRunBody] = {}
for entity in action.entities:
if isinstance(entity, str):
- dag_id, dag_run_id = self.dag_id, entity
- else:
- dag_id = entity.dag_id or self.dag_id
- dag_run_id = entity.dag_run_id
-
- if dag_id == "~" or dag_run_id == "~":
- if isinstance(entity, str):
- error_msg = (
- "When using wildcard in path, dag_id must be specified
in BulkDAGRunBody"
- f" object, not as string for dag_run_id: {entity}"
- )
- else:
- error_msg = (
- "When using wildcard in path, dag_id must be specified
in request body for"
- f" dag_run_id: {entity.dag_run_id}"
- )
results.errors.append(
- {"error": error_msg, "status_code":
status.HTTP_400_BAD_REQUEST},
+ {
+ "error": (
+ "Bulk update requires a BulkDAGRunBody object,"
+ f" not a string for dag_run_id: {entity}"
+ ),
+ "status_code": status.HTTP_400_BAD_REQUEST,
+ }
)
continue
+ key = self._resolve_entity_key(entity, results)
+ if key is not None:
+ entities_by_key[key] = entity
- keys.add((dag_id, dag_run_id))
-
- if not keys:
+ if not entities_by_key:
return
- dag_runs = self.session.scalars(
- select(DagRun).where(tuple_(DagRun.dag_id,
DagRun.run_id).in_(list(keys)))
- ).all()
- dag_run_map = {(dr.dag_id, dr.run_id): dr for dr in dag_runs}
- not_found = keys - dag_run_map.keys()
+ to_update_keys = set(entities_by_key.keys())
+ dag_run_map, matched_keys, not_found_keys =
self._categorize_dag_runs(to_update_keys)
- if action.action_on_non_existence == BulkActionNotOnExistence.FAIL:
- for dag_id, run_id in sorted(not_found):
- results.errors.append(
- {
- "error": (f"The DagRun with dag_id: `{dag_id}` and
run_id: `{run_id}` was not found"),
- "status_code": status.HTTP_404_NOT_FOUND,
- }
+ try:
+ if action.action_on_non_existence == BulkActionNotOnExistence.FAIL
and not_found_keys:
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND,
+ f"The DagRuns with these identifiers:
{sorted(not_found_keys)} were not found",
)
+ update_keys = (
+ matched_keys
+ if action.action_on_non_existence ==
BulkActionNotOnExistence.SKIP
+ else to_update_keys
+ )
+
+ for key, entity in entities_by_key.items():
+ if key not in update_keys:
+ continue
+ dag_id, run_id = key
+ dag_run = dag_run_map[key]
+ if entity.state is not None:
+ dag = get_dag_for_run(self.dag_bag, dag_run,
session=self.session)
+ patch_dag_run_state(dag=dag, dag_run=dag_run,
state=entity.state, session=self.session)
+ if entity.note is not None:
+ patch_dag_run_note(dag_run=dag_run, note=entity.note,
user=self.user)
+ results.success.append(f"{dag_id}.{run_id}")
+ except HTTPException as e:
+ results.errors.append({"error": f"{e.detail}", "status_code":
e.status_code})
+ def handle_bulk_delete(
+ self, action: BulkDeleteAction[BulkDAGRunBody], results:
BulkActionResponse
+ ) -> None:
+ """Bulk delete Dag Runs."""
+ to_delete_keys: set[tuple[str, str]] = set()
+ for entity in action.entities:
+ key = self._resolve_entity_key(entity, results)
+ if key is not None:
+ to_delete_keys.add(key)
+
+ if not to_delete_keys:
+ return
+
+ dag_run_map, matched_keys, not_found_keys =
self._categorize_dag_runs(to_delete_keys)
deletable_states = {s.value for s in DagRunMutableStates}
- for (dag_id, run_id), dag_run in dag_run_map.items():
- if dag_run.state not in deletable_states:
- results.errors.append(
- {
- "error": (
- f"The DagRun with dag_id: `{dag_id}` and run_id:
`{run_id}` "
- f"cannot be deleted in {dag_run.state} state"
- ),
- "status_code": status.HTTP_409_CONFLICT,
- }
+
+ try:
+ if action.action_on_non_existence == BulkActionNotOnExistence.FAIL
and not_found_keys:
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND,
+ f"The DagRuns with these identifiers:
{sorted(not_found_keys)} were not found",
)
- continue
- self.session.delete(dag_run)
- results.success.append(f"{dag_id}.{run_id}")
+ delete_keys = (
+ matched_keys
+ if action.action_on_non_existence ==
BulkActionNotOnExistence.SKIP
+ else to_delete_keys
+ )
+
+ for dag_id, run_id in sorted(delete_keys):
+ dag_run = dag_run_map[(dag_id, run_id)]
+ if dag_run.state not in deletable_states:
+ results.errors.append(
+ {
+ "error": (
+ f"The DagRun with dag_id: `{dag_id}` and
run_id: `{run_id}` "
+ f"cannot be deleted in {dag_run.state} state"
+ ),
+ "status_code": status.HTTP_409_CONFLICT,
+ }
+ )
+ continue
+ self.session.delete(dag_run)
+ results.success.append(f"{dag_id}.{run_id}")
+ except HTTPException as e:
+ results.errors.append({"error": f"{e.detail}", "status_code":
e.status_code})
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index dc8b4e1cb21..8f5c660029c 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -2523,7 +2523,7 @@ export const useDagRunServicePatchDagRun = <TData =
Common.DagRunServicePatchDag
}, TContext>({ mutationFn: ({ dagId, dagRunId, requestBody, updateMask }) =>
DagRunService.patchDagRun({ dagId, dagRunId, requestBody, updateMask }) as
unknown as Promise<TData>, ...options });
/**
* Bulk Dag Runs
-* Bulk delete Dag Runs.
+* Bulk update or delete Dag Runs.
* @param data The data for the request.
* @param data.dagId
* @param data.requestBody
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 651d22d03b5..6e73f5b9204 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1104,13 +1104,35 @@ export const $BulkDAGRunBody = {
}
],
title: 'Dag Id'
+ },
+ state: {
+ anyOf: [
+ {
+ '$ref': '#/components/schemas/DagRunMutableStates'
+ },
+ {
+ type: 'null'
+ }
+ ]
+ },
+ note: {
+ anyOf: [
+ {
+ type: 'string',
+ maxLength: 1000
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Note'
}
},
additionalProperties: false,
type: 'object',
required: ['dag_run_id'],
title: 'BulkDAGRunBody',
- description: 'Request body for bulk delete operations on Dag Runs.'
+ description: 'Request body for bulk operations on Dag Runs.'
} as const;
export const $BulkDAGRunClearBody = {
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index b88e7522b03..d69a06aa60c 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -985,7 +985,7 @@ export class DagRunService {
/**
* Bulk Dag Runs
- * Bulk delete Dag Runs.
+ * Bulk update or delete Dag Runs.
* @param data The data for the request.
* @param data.dagId
* @param data.requestBody
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index fb93b5e093d..62260fa6113 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -334,11 +334,13 @@ export type BulkCreateAction_VariableBody_ = {
};
/**
- * Request body for bulk delete operations on Dag Runs.
+ * Request body for bulk operations on Dag Runs.
*/
export type BulkDAGRunBody = {
dag_run_id: string;
dag_id?: string | null;
+ state?: DagRunMutableStates | null;
+ note?: string | null;
};
/**
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index a13175970b5..4dfb5bb889a 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -3034,7 +3034,7 @@ class TestBulkDagRuns:
assert session.scalar(select(DagRun).where(DagRun.run_id ==
"running_run")) is not None
def test_bulk_delete_not_found_fails(self, test_client, session):
- """When FAIL semantics, each missing run is reported individually and
matched runs still get deleted."""
+ """FAIL semantics: a single missing run fails the whole action and
nothing is deleted."""
response = test_client.patch(
self.ENDPOINT_URL,
json={
@@ -3048,16 +3048,15 @@ class TestBulkDagRuns:
)
assert response.status_code == 200
body = response.json()
- assert body["delete"]["success"] == [f"{DAG1_ID}.{DAG1_RUN1_ID}"]
+ assert body["delete"]["success"] == []
errors = body["delete"]["errors"]
- assert len(errors) == 2
- assert all(err["status_code"] == 404 for err in errors)
- assert {err["error"] for err in errors} == {
- f"The DagRun with dag_id: `{DAG1_ID}` and run_id:
`another_missing_run` was not found",
- f"The DagRun with dag_id: `{DAG1_ID}` and run_id:
`non_existent_run` was not found",
- }
+ assert len(errors) == 1
+ assert errors[0]["status_code"] == 404
+ assert "non_existent_run" in errors[0]["error"]
+ assert "another_missing_run" in errors[0]["error"]
session.expire_all()
- assert session.scalar(select(DagRun).where(DagRun.run_id ==
DAG1_RUN1_ID)) is None
+ # The matched run must not be deleted when another entity is missing.
+ assert session.scalar(select(DagRun).where(DagRun.run_id ==
DAG1_RUN1_ID)) is not None
def test_bulk_delete_not_found_skip(self, test_client, session):
response = test_client.patch(
@@ -3137,14 +3136,94 @@ class TestBulkDagRuns:
assert len(body["create"]["errors"]) == 1
assert body["create"]["errors"][0]["status_code"] == 405
- def test_bulk_update_not_supported(self, test_client):
+ def test_bulk_update_marks_state(self, test_client, session):
+ """Bulk update marks the selected Dag Runs to the requested state in a
single call."""
response = test_client.patch(
self.ENDPOINT_URL,
json={
"actions": [
{
"action": "update",
- "entities": [{"dag_run_id": DAG1_RUN1_ID}],
+ "entities": [
+ {"dag_run_id": DAG1_RUN1_ID, "state": "failed"},
+ {"dag_run_id": DAG1_RUN2_ID, "state": "failed"},
+ ],
+ }
+ ]
+ },
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert sorted(body["update"]["success"]) == sorted(
+ [f"{DAG1_ID}.{DAG1_RUN1_ID}", f"{DAG1_ID}.{DAG1_RUN2_ID}"]
+ )
+ assert body["update"]["errors"] == []
+ session.expire_all()
+ for run_id in (DAG1_RUN1_ID, DAG1_RUN2_ID):
+ dag_run = session.scalar(select(DagRun).where(DagRun.dag_id ==
DAG1_ID, DagRun.run_id == run_id))
+ assert dag_run.state == DagRunState.FAILED
+
+ def test_bulk_update_across_dags_with_wildcard(self, test_client, session):
+ """``~`` URL with per-entity dag_id marks runs across Dags in one
call."""
+ response = test_client.patch(
+ self.WILDCARD_ENDPOINT,
+ json={
+ "actions": [
+ {
+ "action": "update",
+ "entities": [
+ {"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN1_ID,
"state": "success"},
+ {"dag_id": DAG2_ID, "dag_run_id": DAG2_RUN1_ID,
"state": "success"},
+ ],
+ }
+ ]
+ },
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert sorted(body["update"]["success"]) == sorted(
+ [f"{DAG1_ID}.{DAG1_RUN1_ID}", f"{DAG2_ID}.{DAG2_RUN1_ID}"]
+ )
+ session.expire_all()
+ for run_id in (DAG1_RUN1_ID, DAG2_RUN1_ID):
+ assert session.scalar(select(DagRun).where(DagRun.run_id ==
run_id)).state == DagRunState.SUCCESS
+
+ def test_bulk_update_note_only(self, test_client, session):
+ """A bulk update may set only the note, without a target state."""
+ response = test_client.patch(
+ self.ENDPOINT_URL,
+ json={
+ "actions": [
+ {
+ "action": "update",
+ "entities": [{"dag_run_id": DAG1_RUN1_ID, "note":
"bulk note"}],
+ }
+ ]
+ },
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert body["update"]["success"] == [f"{DAG1_ID}.{DAG1_RUN1_ID}"]
+ assert body["update"]["errors"] == []
+ session.expire_all()
+ dag_run = session.scalar(
+ select(DagRun).where(DagRun.dag_id == DAG1_ID, DagRun.run_id ==
DAG1_RUN1_ID)
+ )
+ assert dag_run.note == "bulk note"
+ assert dag_run.state == DAG1_RUN1_STATE
+
+ def test_bulk_update_not_found_fails(self, test_client, session):
+ """FAIL semantics: a single missing run fails the whole action and
nothing is updated."""
+ response = test_client.patch(
+ self.ENDPOINT_URL,
+ json={
+ "actions": [
+ {
+ "action": "update",
+ "entities": [
+ {"dag_run_id": DAG1_RUN1_ID, "state": "failed"},
+ {"dag_run_id": "non_existent_run", "state":
"failed"},
+ ],
}
]
},
@@ -3153,7 +3232,37 @@ class TestBulkDagRuns:
body = response.json()
assert body["update"]["success"] == []
assert len(body["update"]["errors"]) == 1
- assert body["update"]["errors"][0]["status_code"] == 405
+ assert body["update"]["errors"][0]["status_code"] == 404
+ assert "non_existent_run" in body["update"]["errors"][0]["error"]
+ session.expire_all()
+ # The matched run must keep its original state when another entity is
missing.
+ dag_run = session.scalar(select(DagRun).where(DagRun.run_id ==
DAG1_RUN1_ID))
+ assert dag_run.state == DAG1_RUN1_STATE
+
+ def test_bulk_update_not_found_skip(self, test_client, session):
+ """SKIP semantics: missing runs are ignored and matched runs are still
updated."""
+ response = test_client.patch(
+ self.ENDPOINT_URL,
+ json={
+ "actions": [
+ {
+ "action": "update",
+ "action_on_non_existence": "skip",
+ "entities": [
+ {"dag_run_id": DAG1_RUN1_ID, "state": "failed"},
+ {"dag_run_id": "non_existent_run", "state":
"failed"},
+ ],
+ }
+ ]
+ },
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert body["update"]["success"] == [f"{DAG1_ID}.{DAG1_RUN1_ID}"]
+ assert body["update"]["errors"] == []
+ session.expire_all()
+ dag_run = session.scalar(select(DagRun).where(DagRun.run_id ==
DAG1_RUN1_ID))
+ assert dag_run.state == DagRunState.FAILED
def test_bulk_delete_rejects_unauthorized_dag_ids_from_request_body(self,
test_client, session):
"""A 403 at the route level if any entity references a Dag the user
can't access."""
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 4e13fd0fcb6..ec2476ce6ab 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -146,64 +146,10 @@ class BulkActionResponse(BaseModel):
] = []
-class BulkDAGRunBody(BaseModel):
- """
- Request body for bulk delete operations on Dag Runs.
- """
-
- model_config = ConfigDict(
- extra="forbid",
- )
- dag_run_id: Annotated[str, Field(title="Dag Run Id")]
- dag_id: Annotated[str | None, Field(title="Dag Id")] = None
-
-
class Note(RootModel[str]):
root: Annotated[str, Field(max_length=1000, title="Note")]
-class BulkDAGRunClearBody(BaseModel):
- """
- Request body for the bulk clear Dag Runs endpoint.
- """
-
- model_config = ConfigDict(
- extra="forbid",
- )
- dry_run: Annotated[bool | None, Field(title="Dry Run")] = True
- only_failed: Annotated[bool | None, Field(title="Only Failed")] = False
- only_new: Annotated[
- bool | None,
- Field(
- description="Only queue newly added tasks in the latest Dag
version without clearing existing tasks.",
- title="Only New",
- ),
- ] = False
- run_on_latest_version: Annotated[
- bool | None,
- Field(
- description="(Experimental) Run on the latest bundle version of
the Dag after clearing. If not specified, falls back to the DAG-level
``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version`` config option, and finally ``False``.",
- title="Run On Latest Version",
- ),
- ] = None
- note: Annotated[Note | None, Field(title="Note")] = None
- dag_runs: Annotated[list[BulkDAGRunBody], Field(min_length=1, title="Dag
Runs")]
-
-
-class BulkDeleteActionBulkDAGRunBody(BaseModel):
- model_config = ConfigDict(
- extra="forbid",
- )
- action: Annotated[
- Literal["delete"], Field(description="The action to be performed on
the entities.", title="Action")
- ]
- entities: Annotated[
- list[str | BulkDAGRunBody],
- Field(description="A list of entity id/key or entity objects to be
deleted.", title="Entities"),
- ]
- action_on_non_existence: BulkActionNotOnExistence | None = "fail"
-
-
class BulkResponse(BaseModel):
"""
Serializer for responses to bulk entity operations.
@@ -227,26 +173,6 @@ class BulkResponse(BaseModel):
] = None
-class BulkUpdateActionBulkDAGRunBody(BaseModel):
- model_config = ConfigDict(
- extra="forbid",
- )
- action: Annotated[
- Literal["update"], Field(description="The action to be performed on
the entities.", title="Action")
- ]
- entities: Annotated[
- list[BulkDAGRunBody], Field(description="A list of entities to be
updated.", title="Entities")
- ]
- update_mask: Annotated[
- list[str] | None,
- Field(
- description="A list of field names to update for each entity.Only
these fields will be applied from the request body to the database model.Any
extra fields provided will be ignored.",
- title="Update Mask",
- ),
- ] = None
- action_on_non_existence: BulkActionNotOnExistence | None = "fail"
-
-
class TaskIds(RootModel[list]):
root: Annotated[list, Field(max_length=2, min_length=2)]
@@ -1420,7 +1346,7 @@ class BackfillResponse(BaseModel):
dag_display_name: Annotated[str, Field(title="Dag Display Name")]
-class BulkCreateActionBulkDAGRunBody(BaseModel):
+class BulkCreateActionConnectionBody(BaseModel):
model_config = ConfigDict(
extra="forbid",
)
@@ -1428,12 +1354,12 @@ class BulkCreateActionBulkDAGRunBody(BaseModel):
Literal["create"], Field(description="The action to be performed on
the entities.", title="Action")
]
entities: Annotated[
- list[BulkDAGRunBody], Field(description="A list of entities to be
created.", title="Entities")
+ list[ConnectionBody], Field(description="A list of entities to be
created.", title="Entities")
]
action_on_existence: BulkActionOnExistence | None = "fail"
-class BulkCreateActionConnectionBody(BaseModel):
+class BulkCreateActionPoolBody(BaseModel):
model_config = ConfigDict(
extra="forbid",
)
@@ -1441,12 +1367,12 @@ class BulkCreateActionConnectionBody(BaseModel):
Literal["create"], Field(description="The action to be performed on
the entities.", title="Action")
]
entities: Annotated[
- list[ConnectionBody], Field(description="A list of entities to be
created.", title="Entities")
+ list[PoolBody], Field(description="A list of entities to be created.",
title="Entities")
]
action_on_existence: BulkActionOnExistence | None = "fail"
-class BulkCreateActionPoolBody(BaseModel):
+class BulkCreateActionVariableBody(BaseModel):
model_config = ConfigDict(
extra="forbid",
)
@@ -1454,22 +1380,65 @@ class BulkCreateActionPoolBody(BaseModel):
Literal["create"], Field(description="The action to be performed on
the entities.", title="Action")
]
entities: Annotated[
- list[PoolBody], Field(description="A list of entities to be created.",
title="Entities")
+ list[VariableBody], Field(description="A list of entities to be
created.", title="Entities")
]
action_on_existence: BulkActionOnExistence | None = "fail"
-class BulkCreateActionVariableBody(BaseModel):
+class BulkDAGRunBody(BaseModel):
+ """
+ Request body for bulk operations on Dag Runs.
+ """
+
+ model_config = ConfigDict(
+ extra="forbid",
+ )
+ dag_run_id: Annotated[str, Field(title="Dag Run Id")]
+ dag_id: Annotated[str | None, Field(title="Dag Id")] = None
+ state: DagRunMutableStates | None = None
+ note: Annotated[Note | None, Field(title="Note")] = None
+
+
+class BulkDAGRunClearBody(BaseModel):
+ """
+ Request body for the bulk clear Dag Runs endpoint.
+ """
+
+ model_config = ConfigDict(
+ extra="forbid",
+ )
+ dry_run: Annotated[bool | None, Field(title="Dry Run")] = True
+ only_failed: Annotated[bool | None, Field(title="Only Failed")] = False
+ only_new: Annotated[
+ bool | None,
+ Field(
+ description="Only queue newly added tasks in the latest Dag
version without clearing existing tasks.",
+ title="Only New",
+ ),
+ ] = False
+ run_on_latest_version: Annotated[
+ bool | None,
+ Field(
+ description="(Experimental) Run on the latest bundle version of
the Dag after clearing. If not specified, falls back to the DAG-level
``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version`` config option, and finally ``False``.",
+ title="Run On Latest Version",
+ ),
+ ] = None
+ note: Annotated[Note | None, Field(title="Note")] = None
+ dag_runs: Annotated[list[BulkDAGRunBody], Field(min_length=1, title="Dag
Runs")]
+
+
+class BulkDeleteActionBulkDAGRunBody(BaseModel):
model_config = ConfigDict(
extra="forbid",
)
action: Annotated[
- Literal["create"], Field(description="The action to be performed on
the entities.", title="Action")
+ Literal["delete"], Field(description="The action to be performed on
the entities.", title="Action")
]
entities: Annotated[
- list[VariableBody], Field(description="A list of entities to be
created.", title="Entities")
+ list[str | BulkDAGRunBody],
+ Field(description="A list of entity id/key or entity objects to be
deleted.", title="Entities"),
]
- action_on_existence: BulkActionOnExistence | None = "fail"
+ action_on_non_existence: BulkActionNotOnExistence | None = "fail"
class BulkDeleteActionConnectionBody(BaseModel):
@@ -1534,6 +1503,26 @@ class BulkTaskInstanceBody(BaseModel):
dag_run_id: Annotated[str | None, Field(title="Dag Run Id")] = None
+class BulkUpdateActionBulkDAGRunBody(BaseModel):
+ model_config = ConfigDict(
+ extra="forbid",
+ )
+ action: Annotated[
+ Literal["update"], Field(description="The action to be performed on
the entities.", title="Action")
+ ]
+ entities: Annotated[
+ list[BulkDAGRunBody], Field(description="A list of entities to be
updated.", title="Entities")
+ ]
+ update_mask: Annotated[
+ list[str] | None,
+ Field(
+ description="A list of field names to update for each entity.Only
these fields will be applied from the request body to the database model.Any
extra fields provided will be ignored.",
+ title="Update Mask",
+ ),
+ ] = None
+ action_on_non_existence: BulkActionNotOnExistence | None = "fail"
+
+
class BulkUpdateActionBulkTaskInstanceBody(BaseModel):
model_config = ConfigDict(
extra="forbid",
@@ -2178,18 +2167,6 @@ class BackfillCollectionResponse(BaseModel):
total_entries: Annotated[int, Field(title="Total Entries")]
-class BulkBodyBulkDAGRunBody(BaseModel):
- model_config = ConfigDict(
- extra="forbid",
- )
- actions: Annotated[
- list[
- BulkCreateActionBulkDAGRunBody | BulkUpdateActionBulkDAGRunBody |
BulkDeleteActionBulkDAGRunBody
- ],
- Field(title="Actions"),
- ]
-
-
class BulkBodyConnectionBody(BaseModel):
model_config = ConfigDict(
extra="forbid",
@@ -2222,6 +2199,19 @@ class BulkBodyVariableBody(BaseModel):
]
+class BulkCreateActionBulkDAGRunBody(BaseModel):
+ model_config = ConfigDict(
+ extra="forbid",
+ )
+ action: Annotated[
+ Literal["create"], Field(description="The action to be performed on
the entities.", title="Action")
+ ]
+ entities: Annotated[
+ list[BulkDAGRunBody], Field(description="A list of entities to be
created.", title="Entities")
+ ]
+ action_on_existence: BulkActionOnExistence | None = "fail"
+
+
class BulkCreateActionBulkTaskInstanceBody(BaseModel):
model_config = ConfigDict(
extra="forbid",
@@ -2435,6 +2425,18 @@ class TaskInstanceHistoryCollectionResponse(BaseModel):
total_entries: Annotated[int, Field(title="Total Entries")]
+class BulkBodyBulkDAGRunBody(BaseModel):
+ model_config = ConfigDict(
+ extra="forbid",
+ )
+ actions: Annotated[
+ list[
+ BulkCreateActionBulkDAGRunBody | BulkUpdateActionBulkDAGRunBody |
BulkDeleteActionBulkDAGRunBody
+ ],
+ Field(title="Actions"),
+ ]
+
+
class BulkBodyBulkTaskInstanceBody(BaseModel):
model_config = ConfigDict(
extra="forbid",