This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b9546078ba1 API: Add POST /dags/{dag_id}/clearDagRuns bulk endpoint
(#67709)
b9546078ba1 is described below
commit b9546078ba1f5b1e7522be1b2e231220a92ef6d2
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Mon Jun 1 19:13:44 2026 +0200
API: Add POST /dags/{dag_id}/clearDagRuns bulk endpoint (#67709)
* API: Add POST /dags/{dag_id}/clearDagRuns bulk endpoint
Mirrors ``POST /dags/{dag_id}/clearTaskInstances`` for Dag Runs: a
single round-trip clears N runs and (optionally) attaches a note in
the same transaction, removing the per-run fan-out the UI does today.
The URL sits at the parent-Dag level (``/clearDagRuns`` not
``/dagRuns/clear``) on purpose, to match the existing
``clearTaskInstances`` convention. To register at that prefix without
collapsing into ``task_instances_router``, ``dag_run.py`` gains a
sibling router ``dag_run_at_dag_router`` with prefix ``/dags/{dag_id}``.
URL ``dag_id`` is concrete or ``~``. With ``~``, every entry in
``dag_runs`` must carry its own ``dag_id``. With a specific Dag,
entries may omit ``dag_id`` and inherit from the URL (and the route
rejects mismatches with 400). Duplicate ``(dag_id, run_id)`` entries
collapse to one operation, matching ``BulkDagRunService``'s
``handle_bulk_delete`` semantics.
``dry_run`` is the safe default — returns the union of affected task
instances across the listed runs (or the ``NewTaskResponse``
placeholders for the ``only_new`` path) without touching state.
Real clear returns a ``DAGRunCollectionResponse`` with the post-clear
runs.
To keep both endpoints in sync, the per-run lookup, dry-run TI
computation, and the clear+note step are pulled out of the single
``clear_dag_run`` route into ``services/public/dag_run.py`` as
``get_dag_run_and_dag_for_clear`` / ``dry_run_clear_dag_run`` /
``perform_clear_dag_run``. The single-run route now composes them
instead of duplicating the logic.
Auth uses a new ``requires_access_dag_run_clear_bulk`` dependency
modelled on ``requires_access_dag_run_bulk`` — same per-Dag team
caching, same wildcard-then-400 contract.
* Simplify clearDagRuns dedup with an ordered dict and rename handler
* Regenerate OpenAPI spec and UI client for clearDagRuns rename
* Factor out shared bulk dag-run authorization request builder
* Restore per-dag team-name dedup in bulk dag-run authorization
* Patch resolver in service module for clear-endpoint resolver test
* Test bulk clearDagRuns rejects unauthorized dag_ids in request body
---
.../api_fastapi/core_api/datamodels/dag_run.py | 26 ++-
.../core_api/openapi/v2-rest-api-generated.yaml | 112 ++++++++++-
.../api_fastapi/core_api/routes/public/__init__.py | 3 +-
.../api_fastapi/core_api/routes/public/dag_run.py | 175 ++++++++++-------
.../src/airflow/api_fastapi/core_api/security.py | 75 ++++---
.../core_api/services/public/dag_run.py | 106 +++++++++-
.../src/airflow/ui/openapi-gen/queries/common.ts | 1 +
.../src/airflow/ui/openapi-gen/queries/queries.ts | 18 +-
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 60 +++++-
.../ui/openapi-gen/requests/services.gen.ts | 30 ++-
.../airflow/ui/openapi-gen/requests/types.gen.ts | 58 +++++-
.../core_api/routes/public/test_dag_run.py | 216 ++++++++++++++++++++-
.../src/airflowctl/api/datamodels/generated.py | 38 +++-
13 files changed, 798 insertions(+), 120 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 8373847328d..2cfc4a00017 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -57,8 +57,8 @@ class BulkDAGRunBody(StrictBaseModel):
dag_id: str | None = None
-class DAGRunClearBody(StrictBaseModel):
- """Dag Run serializer for clear endpoint body."""
+class BaseDAGRunClear(StrictBaseModel):
+ """Shared options for the single-run and bulk Dag Run clear endpoints."""
dry_run: bool = True
only_failed: bool = False
@@ -68,25 +68,31 @@ class DAGRunClearBody(StrictBaseModel):
)
run_on_latest_version: bool | None = Field(
default=None,
- description="(Experimental) Run on the latest bundle version of the
Dag after clearing the Dag Run. "
+ description="(Experimental) Run on the latest bundle version of the
Dag after clearing. "
"If not specified, falls back to the DAG-level
``rerun_with_latest_version`` parameter, "
"then the ``[core] rerun_with_latest_version`` config option, "
- "and finally ``False`` (the historical default for clear/rerun).",
- )
- note: str | None = Field(
- default=None,
- max_length=1000,
+ "and finally ``False``.",
)
+ note: str | None = Field(default=None, max_length=1000)
@model_validator(mode="before")
@classmethod
- def validate_model(cls, data: Any) -> Any:
- """Validate clear Dag run form."""
+ def validate_only_new_only_failed_mutually_exclusive(cls, data: Any) ->
Any:
if data.get("only_new") and data.get("only_failed"):
raise ValueError("only_new and only_failed are mutually exclusive")
return data
+class DAGRunClearBody(BaseDAGRunClear):
+ """Dag Run serializer for clear endpoint body."""
+
+
+class BulkDAGRunClearBody(BaseDAGRunClear):
+ """Request body for the bulk clear Dag Runs endpoint."""
+
+ dag_runs: list[BulkDAGRunBody] = Field(min_length=1)
+
+
class DAGRunResponse(BaseModel):
"""Dag Run serializer for responses."""
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 9cfa32f2411..be17e2fd70e 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -2980,6 +2980,69 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /api/v2/dags/{dag_id}/clearDagRuns:
+ post:
+ tags:
+ - DagRun
+ summary: Clear Dag Runs
+ description: Clear multiple Dag Runs in a single request.
+ operationId: clear_dag_runs
+ security:
+ - OAuth2PasswordBearer: []
+ - HTTPBearer: []
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ requestBody:
+ required: true
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/BulkDAGRunClearBody'
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ anyOf:
+ - $ref:
'#/components/schemas/ClearTaskInstanceCollectionResponse'
+ - $ref: '#/components/schemas/DAGRunCollectionResponse'
+ title: Response Clear Dag Runs
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '400':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Bad Request
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/api/v2/dagSources/{dag_id}:
get:
tags:
@@ -11883,6 +11946,49 @@ components:
- dag_run_id
title: BulkDAGRunBody
description: Request body for bulk delete operations on Dag Runs.
+ BulkDAGRunClearBody:
+ properties:
+ dry_run:
+ type: boolean
+ title: Dry Run
+ default: true
+ only_failed:
+ type: boolean
+ title: Only Failed
+ default: false
+ only_new:
+ type: boolean
+ title: Only New
+ description: Only queue newly added tasks in the latest Dag version
without
+ clearing existing tasks.
+ default: false
+ run_on_latest_version:
+ anyOf:
+ - type: boolean
+ - type: 'null'
+ title: Run On Latest Version
+ description: (Experimental) Run on the latest bundle version of the
Dag
+ after clearing. If not specified, falls back to the DAG-level
``rerun_with_latest_version``
+ parameter, then the ``[core] rerun_with_latest_version`` config
option,
+ and finally ``False``.
+ note:
+ anyOf:
+ - type: string
+ maxLength: 1000
+ - type: 'null'
+ title: Note
+ dag_runs:
+ items:
+ $ref: '#/components/schemas/BulkDAGRunBody'
+ type: array
+ minItems: 1
+ title: Dag Runs
+ additionalProperties: false
+ type: object
+ required:
+ - dag_runs
+ title: BulkDAGRunClearBody
+ description: Request body for the bulk clear Dag Runs endpoint.
BulkDeleteAction_BulkDAGRunBody_:
properties:
action:
@@ -13093,9 +13199,9 @@ components:
- type: 'null'
title: Run On Latest Version
description: (Experimental) Run on the latest bundle version of the
Dag
- after clearing the Dag Run. If not specified, falls back to the
DAG-level
- ``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version``
- config option, and finally ``False`` (the historical default for
clear/rerun).
+ after clearing. If not specified, falls back to the DAG-level
``rerun_with_latest_version``
+ parameter, then the ``[core] rerun_with_latest_version`` config
option,
+ and finally ``False``.
note:
anyOf:
- type: string
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py
index 4e6ebba1178..e305f0ac248 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py
@@ -28,7 +28,7 @@ from airflow.api_fastapi.core_api.routes.public.backfills
import backfills_route
from airflow.api_fastapi.core_api.routes.public.config import config_router
from airflow.api_fastapi.core_api.routes.public.connections import
connections_router
from airflow.api_fastapi.core_api.routes.public.dag_parsing import
dag_parsing_router
-from airflow.api_fastapi.core_api.routes.public.dag_run import dag_run_router
+from airflow.api_fastapi.core_api.routes.public.dag_run import
dag_run_at_dag_router, dag_run_router
from airflow.api_fastapi.core_api.routes.public.dag_sources import
dag_sources_router
from airflow.api_fastapi.core_api.routes.public.dag_stats import
dag_stats_router
from airflow.api_fastapi.core_api.routes.public.dag_tags import dag_tags_router
@@ -69,6 +69,7 @@ authenticated_router.include_router(assets_router)
authenticated_router.include_router(backfills_router)
authenticated_router.include_router(connections_router)
authenticated_router.include_router(dag_run_router)
+authenticated_router.include_router(dag_run_at_dag_router)
authenticated_router.include_router(dag_sources_router)
authenticated_router.include_router(dag_stats_router)
authenticated_router.include_router(config_router)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index aa82b14fefd..18af252c495 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -47,7 +47,6 @@ from airflow.api_fastapi.common.db.dag_runs import (
attach_dag_versions_to_runs,
eager_load_dag_run_for_list,
)
-from airflow.api_fastapi.common.db.task_instances import
eager_load_TI_and_TIH_for_validation
from airflow.api_fastapi.common.parameters import (
FilterOptionEnum,
FilterParam,
@@ -79,6 +78,7 @@ from airflow.api_fastapi.core_api.datamodels.assets import
AssetEventCollectionR
from airflow.api_fastapi.core_api.datamodels.common import BulkBody,
BulkResponse
from airflow.api_fastapi.core_api.datamodels.dag_run import (
BulkDAGRunBody,
+ BulkDAGRunClearBody,
DAGRunClearBody,
DAGRunCollectionResponse,
DagRunMutableStates,
@@ -99,22 +99,28 @@ from airflow.api_fastapi.core_api.security import (
requires_access_asset,
requires_access_dag,
requires_access_dag_run_bulk,
+ requires_access_dag_run_clear_bulk,
+)
+from airflow.api_fastapi.core_api.services.public.dag_run import (
+ BulkDagRunService,
+ DagRunWaiter,
+ dry_run_clear_dag_run,
+ get_dag_run_and_dag_for_clear,
+ perform_clear_dag_run,
)
-from airflow.api_fastapi.core_api.services.public.common import
resolve_run_on_latest_version
-from airflow.api_fastapi.core_api.services.public.dag_run import
BulkDagRunService, DagRunWaiter
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import ParamValidationError
from airflow.listeners.listener import get_listener_manager
from airflow.models import DagModel, DagRun
from airflow.models.asset import AssetEvent
from airflow.models.dag_version import DagVersion
-from airflow.models.taskinstance import TaskInstance
-from airflow.utils.state import DagRunState, TaskInstanceState
+from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
log = structlog.get_logger(__name__)
dag_run_router = AirflowRouter(tags=["DagRun"],
prefix="/dags/{dag_id}/dagRuns")
+dag_run_at_dag_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}")
@dag_run_router.get(
@@ -321,81 +327,116 @@ def clear_dag_run(
session: SessionDep,
user: GetUserDep,
) -> ClearTaskInstanceCollectionResponse | DAGRunResponse:
- dag_run = session.scalar(
- select(DagRun).filter_by(dag_id=dag_id,
run_id=dag_run_id).options(joinedload(DagRun.dag_model))
+ dag_run, dag = get_dag_run_and_dag_for_clear(
+ session=session, dag_bag=dag_bag, dag_id=dag_id, dag_run_id=dag_run_id
)
- if dag_run is None:
- raise HTTPException(
- status.HTTP_404_NOT_FOUND,
- f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}`
was not found",
+
+ if body.dry_run:
+ task_instances = dry_run_clear_dag_run(
+ session=session,
+ dag_bag=dag_bag,
+ dag_id=dag_id,
+ dag_run_id=dag_run_id,
+ only_failed=body.only_failed,
+ only_new=body.only_new,
+ )
+ return ClearTaskInstanceCollectionResponse(
+ task_instances=task_instances,
+ total_entries=len(task_instances),
)
- dag = dag_bag.get_dag_for_run(dag_run, session=session)
+ return perform_clear_dag_run(
+ session=session,
+ dag=dag,
+ dag_run=dag_run,
+ dag_id=dag_id,
+ only_failed=body.only_failed,
+ only_new=body.only_new,
+ run_on_latest_version=body.run_on_latest_version,
+ note=body.note,
+ user=user,
+ )
- if not dag:
- raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id}
was not found")
- resolved_run_on_latest =
resolve_run_on_latest_version(body.run_on_latest_version, dag_id, session)
+@dag_run_at_dag_router.post(
+ "/clearDagRuns",
+ responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST,
status.HTTP_404_NOT_FOUND]),
+ dependencies=[Depends(requires_access_dag_run_clear_bulk()),
Depends(action_logging())],
+)
+def clear_dag_runs(
+ dag_id: str,
+ body: BulkDAGRunClearBody,
+ dag_bag: DagBagDep,
+ session: SessionDep,
+ user: GetUserDep,
+) -> ClearTaskInstanceCollectionResponse | DAGRunCollectionResponse:
+ """Clear multiple Dag Runs in a single request."""
+ url_dag_id_is_wildcard = dag_id == "~"
+
+ # No ordered set type in Python, using a dict with throwaway values as
replacement.
+ runs_to_clear: dict[tuple[str, str], None] = {}
+ for run in body.dag_runs:
+ if url_dag_id_is_wildcard:
+ if not run.dag_id or run.dag_id == "~":
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST,
+ f"When the URL dag_id is '~', every entry must provide a
concrete dag_id "
+ f"(missing on dag_run_id: {run.dag_run_id!r}).",
+ )
+ run_to_clear = (run.dag_id, run.dag_run_id)
+ else:
+ entity_dag_id = run.dag_id or dag_id
+ if entity_dag_id != dag_id:
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST,
+ f"Entry dag_id {entity_dag_id!r} does not match the URL
dag_id {dag_id!r}.",
+ )
+ run_to_clear = (dag_id, run.dag_run_id)
+ runs_to_clear[run_to_clear] = None
if body.dry_run:
- if body.only_new:
- # Determine "new" tasks by TI existence: a task is new when the
latest Dag
- # version contains it but the current run has no TaskInstance row
for it yet.
- # This is more reliable than the version-comparison approach used
by
- # dag.clear(only_new=True, dry_run=True) which returns an empty
set when
- # created_dag_version_id is None (e.g. LocalDagBundle).
- latest_dag = get_latest_version_of_dag(dag_bag, dag_id, session)
- existing_task_ids = set(
- session.scalars(
- select(TaskInstance.task_id).where(
- TaskInstance.dag_id == dag_id,
- TaskInstance.run_id == dag_run_id,
- )
- ).all()
+ affected: list[TaskInstanceResponse | NewTaskResponse] = []
+ for run_dag_id, run_id in runs_to_clear:
+ get_dag_run_and_dag_for_clear(
+ session=session, dag_bag=dag_bag, dag_id=run_dag_id,
dag_run_id=run_id
)
- new_task_ids = sorted(set(latest_dag.task_ids) - existing_task_ids)
- task_instances: list[TaskInstanceResponse | NewTaskResponse] = [
- NewTaskResponse(task_id=task_id, task_display_name=task_id)
for task_id in new_task_ids
- ]
- else:
- # Query task instances directly with proper eager loading so that
all
- # relationships required by TaskInstanceResponse (dag_run,
dag_model,
- # dag_version, rendered_task_instance_fields) are populated.
- # dag.clear(dry_run=True) returns raw ORM objects without these
joins.
- ti_query =
eager_load_TI_and_TIH_for_validation(select(TaskInstance))
- ti_query = ti_query.where(
- TaskInstance.dag_id == dag_id,
- TaskInstance.run_id == dag_run_id,
- )
- if body.only_failed:
- ti_query = ti_query.where(
- TaskInstance.state.in_([TaskInstanceState.FAILED,
TaskInstanceState.UPSTREAM_FAILED])
+ affected.extend(
+ dry_run_clear_dag_run(
+ session=session,
+ dag_bag=dag_bag,
+ dag_id=run_dag_id,
+ dag_run_id=run_id,
+ only_failed=body.only_failed,
+ only_new=body.only_new,
)
- task_instances = list(session.scalars(ti_query))
-
+ )
return ClearTaskInstanceCollectionResponse(
- task_instances=task_instances,
- total_entries=len(task_instances),
+ task_instances=affected,
+ total_entries=len(affected),
)
- dag.clear(
- run_id=dag_run_id,
- task_ids=None,
- only_new=body.only_new,
- only_failed=body.only_failed,
- run_on_latest_version=resolved_run_on_latest,
- session=session,
+ cleared_runs: list[DagRun] = []
+ for run_dag_id, run_id in runs_to_clear:
+ dag_run, dag = get_dag_run_and_dag_for_clear(
+ session=session, dag_bag=dag_bag, dag_id=run_dag_id,
dag_run_id=run_id
+ )
+ cleared_runs.append(
+ perform_clear_dag_run(
+ session=session,
+ dag=dag,
+ dag_run=dag_run,
+ dag_id=run_dag_id,
+ only_failed=body.only_failed,
+ only_new=body.only_new,
+ run_on_latest_version=body.run_on_latest_version,
+ note=body.note,
+ user=user,
+ )
+ )
+ return DAGRunCollectionResponse(
+ dag_runs=cleared_runs,
+ total_entries=len(cleared_runs),
)
- dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id ==
dag_run.id))
- if not dag_run_cleared:
- raise HTTPException(status.HTTP_404_NOT_FOUND, "Dag run not found
after clearing")
- if body.note is not None:
- if dag_run_cleared.dag_run_note is None:
- dag_run_cleared.note = (body.note, user.get_id())
- else:
- dag_run_cleared.dag_run_note.content = body.note
- dag_run_cleared.dag_run_note.user_id = user.get_id()
- return dag_run_cleared
@dag_run_router.get(
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/security.py
b/airflow-core/src/airflow/api_fastapi/core_api/security.py
index bb3413b7f05..154929af064 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/security.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/security.py
@@ -63,7 +63,7 @@ from airflow.api_fastapi.core_api.datamodels.common import (
BulkUpdateAction,
)
from airflow.api_fastapi.core_api.datamodels.connections import ConnectionBody
-from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunBody
+from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunBody,
BulkDAGRunClearBody
from airflow.api_fastapi.core_api.datamodels.pools import PoolBody
from airflow.api_fastapi.core_api.datamodels.variables import VariableBody
from airflow.configuration import conf
@@ -730,46 +730,73 @@ def requires_access_variable_bulk() ->
Callable[[BulkBody[VariableBody], BaseUse
return inner
+def _build_dag_run_access_requests(
+ entity_methods: list[tuple[str, ResourceMethod]],
+) -> list[IsAuthorizedDagRequest]:
+ """
+ Build per-entity DagRun authorization requests for a batched access check.
+
+ ``entity_methods`` is a list of ``(dag_id, method)`` pairs with
unresolvable
+ entries (no dag_id or the ``~`` wildcard) already filtered out by the
caller.
+ The team for each dag is resolved once and shared across that dag's
requests.
+ """
+ resolved_dag_ids = {dag_id for dag_id, _ in entity_methods}
+ dag_id_to_team = {dag_id: DagModel.get_team_name(dag_id) for dag_id in
resolved_dag_ids}
+ return [
+ {
+ "method": method,
+ "access_entity": DagAccessEntity.RUN,
+ "details": DagDetails(id=dag_id,
team_name=dag_id_to_team.get(dag_id)),
+ }
+ for dag_id, method in entity_methods
+ ]
+
+
def requires_access_dag_run_bulk() -> Callable[[BulkBody[BulkDAGRunBody],
BaseUser, str], None]:
def inner(
request: BulkBody[BulkDAGRunBody],
user: GetUserDep,
dag_id: str,
) -> None:
- resolved_dag_ids: set[str] = set()
- for action in request.actions:
- for entity in action.entities:
- if isinstance(entity, str):
- entity_dag_id: str | None = dag_id
- else:
- entity_dag_id = entity.dag_id or dag_id
- if entity_dag_id and entity_dag_id != "~":
- resolved_dag_ids.add(entity_dag_id)
-
- dag_id_to_team = {d: DagModel.get_team_name(d) for d in
resolved_dag_ids}
-
- requests: list[IsAuthorizedDagRequest] = []
+ entity_methods: list[tuple[str, ResourceMethod]] = []
for action in request.actions:
methods = _get_resource_methods_from_bulk_request(action)
for entity in action.entities:
if isinstance(entity, str):
- entity_dag_id = dag_id
+ entity_dag_id: str | None = dag_id
else:
entity_dag_id = entity.dag_id or dag_id
# Entities that can't be resolved are surfaced as 400 in the
service's BulkResponse.
if not entity_dag_id or entity_dag_id == "~":
continue
for method in methods:
- requests.append(
- {
- "method": method,
- "access_entity": DagAccessEntity.RUN,
- "details": DagDetails(
- id=entity_dag_id,
team_name=dag_id_to_team.get(entity_dag_id)
- ),
- }
- )
+ entity_methods.append((entity_dag_id, method))
+
+ requests = _build_dag_run_access_requests(entity_methods)
+ _requires_access(
+ is_authorized_callback=lambda:
get_auth_manager().batch_is_authorized_dag(
+ requests=requests,
+ user=user,
+ )
+ )
+
+ return inner
+
+
+def requires_access_dag_run_clear_bulk() -> Callable[[BulkDAGRunClearBody,
BaseUser, str], None]:
+ def inner(
+ body: BulkDAGRunClearBody,
+ user: GetUserDep,
+ dag_id: str,
+ ) -> None:
+ entity_methods: list[tuple[str, ResourceMethod]] = []
+ for run in body.dag_runs:
+ entity_dag_id = run.dag_id or dag_id
+ if not entity_dag_id or entity_dag_id == "~":
+ continue
+ entity_methods.append((entity_dag_id, "PUT"))
+ requests = _build_dag_run_access_requests(entity_methods)
_requires_access(
is_authorized_callback=lambda:
get_auth_manager().batch_is_authorized_dag(
requests=requests,
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
index 4c5a5b090b6..3c9d8cd7eba 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
@@ -25,10 +25,13 @@ from typing import TYPE_CHECKING, Any
import attrs
import structlog
-from fastapi import status
+from fastapi import HTTPException, status
from sqlalchemy import select, tuple_
-from sqlalchemy.orm import Session
+from sqlalchemy.orm import Session, joinedload
+from airflow.api_fastapi.auth.managers.models.base_user import BaseUser
+from airflow.api_fastapi.common.dagbag import DagBagDep,
get_latest_version_of_dag
+from airflow.api_fastapi.common.db.task_instances import
eager_load_TI_and_TIH_for_validation
from airflow.api_fastapi.core_api.datamodels.common import (
BulkActionNotOnExistence,
BulkActionResponse,
@@ -38,18 +41,113 @@ from airflow.api_fastapi.core_api.datamodels.common import
(
BulkUpdateAction,
)
from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunBody,
DagRunMutableStates
-from airflow.api_fastapi.core_api.services.public.common import BulkService
+from airflow.api_fastapi.core_api.datamodels.task_instances import
NewTaskResponse
+from airflow.api_fastapi.core_api.services.public.common import BulkService,
resolve_run_on_latest_version
from airflow.models.dagrun import DagRun
+from airflow.models.taskinstance import TaskInstance
from airflow.models.xcom import XCOM_RETURN_KEY, XComModel
from airflow.utils.session import create_session_async
-from airflow.utils.state import State
+from airflow.utils.state import State, TaskInstanceState
if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Iterator
+ from airflow.serialization.definitions.dag import SerializedDAG
+
log = structlog.get_logger(__name__)
+def get_dag_run_and_dag_for_clear(
+ *,
+ session: Session,
+ dag_bag: DagBagDep,
+ dag_id: str,
+ dag_run_id: str,
+) -> tuple[DagRun, SerializedDAG]:
+ dag_run = session.scalar(
+ select(DagRun).filter_by(dag_id=dag_id,
run_id=dag_run_id).options(joinedload(DagRun.dag_model))
+ )
+ if dag_run is None:
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND,
+ f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}`
was not found",
+ )
+ dag = dag_bag.get_dag_for_run(dag_run, session=session)
+ if not dag:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id}
was not found")
+ return dag_run, dag
+
+
+def dry_run_clear_dag_run(
+ *,
+ session: Session,
+ dag_bag: DagBagDep,
+ dag_id: str,
+ dag_run_id: str,
+ only_failed: bool,
+ only_new: bool,
+) -> list[Any]:
+ if only_new:
+ # ``dag.clear(only_new=True, dry_run=True)`` returns nothing when
+ # ``created_dag_version_id`` is None (e.g. LocalDagBundle), so derive
new
+ # tasks from TI existence instead.
+ latest_dag = get_latest_version_of_dag(dag_bag, dag_id, session)
+ existing_task_ids = set(
+ session.scalars(
+ select(TaskInstance.task_id).where(
+ TaskInstance.dag_id == dag_id,
+ TaskInstance.run_id == dag_run_id,
+ )
+ ).all()
+ )
+ new_task_ids = sorted(set(latest_dag.task_ids) - existing_task_ids)
+ return [NewTaskResponse(task_id=task_id, task_display_name=task_id)
for task_id in new_task_ids]
+
+ ti_query = eager_load_TI_and_TIH_for_validation(select(TaskInstance))
+ ti_query = ti_query.where(
+ TaskInstance.dag_id == dag_id,
+ TaskInstance.run_id == dag_run_id,
+ )
+ if only_failed:
+ ti_query = ti_query.where(
+ TaskInstance.state.in_([TaskInstanceState.FAILED,
TaskInstanceState.UPSTREAM_FAILED])
+ )
+ return list(session.scalars(ti_query))
+
+
+def perform_clear_dag_run(
+ *,
+ session: Session,
+ dag: SerializedDAG,
+ dag_run: DagRun,
+ dag_id: str,
+ only_failed: bool,
+ only_new: bool,
+ run_on_latest_version: bool | None,
+ note: str | None,
+ user: BaseUser,
+) -> DagRun:
+ resolved_run_on_latest =
resolve_run_on_latest_version(run_on_latest_version, dag_id, session)
+ dag.clear(
+ run_id=dag_run.run_id,
+ task_ids=None,
+ only_new=only_new,
+ only_failed=only_failed,
+ run_on_latest_version=resolved_run_on_latest,
+ session=session,
+ )
+ dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id ==
dag_run.id))
+ if not dag_run_cleared:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, "Dag run not found
after clearing")
+ if note is not None:
+ if dag_run_cleared.dag_run_note is None:
+ dag_run_cleared.note = (note, user.get_id())
+ else:
+ dag_run_cleared.dag_run_note.content = note
+ dag_run_cleared.dag_run_note.user_id = user.get_id()
+ return dag_run_cleared
+
+
@attrs.define
class DagRunWaiter:
"""Wait for the specified dag run to finish, and collect info from it."""
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
index 632d3111576..fb0ab311c5a 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
@@ -1037,6 +1037,7 @@ export type
ConnectionServiceCreateDefaultConnectionsMutationResult = Awaited<Re
export type DagRunServiceTriggerDagRunMutationResult =
Awaited<ReturnType<typeof DagRunService.triggerDagRun>>;
export type DagRunServiceClearDagRunMutationResult = Awaited<ReturnType<typeof
DagRunService.clearDagRun>>;
export type DagRunServiceGetListDagRunsBatchMutationResult =
Awaited<ReturnType<typeof DagRunService.getListDagRunsBatch>>;
+export type DagRunServiceClearDagRunsMutationResult =
Awaited<ReturnType<typeof DagRunService.clearDagRuns>>;
export type DagServiceFavoriteDagMutationResult = Awaited<ReturnType<typeof
DagService.favoriteDag>>;
export type DagServiceUnfavoriteDagMutationResult = Awaited<ReturnType<typeof
DagService.unfavoriteDag>>;
export type TaskInstanceServiceGetTaskInstancesBatchMutationResult =
Awaited<ReturnType<typeof TaskInstanceService.getTaskInstancesBatch>>;
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index 01e3d78e69c..41118b64c74 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -2,7 +2,7 @@
import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from
"@tanstack/react-query";
import { AssetService, AssetStateService, AuthLinksService, BackfillService,
CalendarService, ConfigService, ConnectionService, DagParsingService,
DagRunService, DagService, DagSourceService, DagStatsService,
DagVersionService, DagWarningService, DashboardService, DeadlinesService,
DependenciesService, EventLogService, ExperimentalService, ExtraLinksService,
GanttService, GridService, ImportErrorService, JobService, LoginService,
MonitorService, PartitionedDagRunService, PluginService, P [...]
-import { AssetStateBody, BackfillPostBody, BulkBody_BulkDAGRunBody_,
BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_,
BulkBody_VariableBody_, ClearTaskInstancesBody, ConnectionBody,
CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody,
DAGRunsBatchBody, DagRunState, DagWarningType, GenerateTokenBody,
MaterializeAssetBody, PatchTaskInstanceBody, PoolBody, PoolPatchBody,
TaskInstancesBatchBody, TaskStateBody, TaskStatePatchBody, TriggerDAGRunPo [...]
+import { AssetStateBody, BackfillPostBody, BulkBody_BulkDAGRunBody_,
BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_,
BulkBody_VariableBody_, BulkDAGRunClearBody, ClearTaskInstancesBody,
ConnectionBody, CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody,
DAGRunPatchBody, DAGRunsBatchBody, DagRunState, DagWarningType,
GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody, PoolBody,
PoolPatchBody, TaskInstancesBatchBody, TaskStateBody, TaskStatePatch [...]
import * as Common from "./common";
/**
* Get Assets
@@ -2210,6 +2210,22 @@ export const useDagRunServiceGetListDagRunsBatch =
<TData = Common.DagRunService
requestBody: DAGRunsBatchBody;
}, TContext>({ mutationFn: ({ dagId, requestBody }) =>
DagRunService.getListDagRunsBatch({ dagId, requestBody }) as unknown as
Promise<TData>, ...options });
/**
+* Clear Dag Runs
+* Clear multiple Dag Runs in a single request.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.requestBody
+* @returns unknown Successful Response
+* @throws ApiError
+*/
+export const useDagRunServiceClearDagRuns = <TData =
Common.DagRunServiceClearDagRunsMutationResult, TError = unknown, TContext =
unknown>(options?: Omit<UseMutationOptions<TData, TError, {
+ dagId: string;
+ requestBody: BulkDAGRunClearBody;
+}, TContext>, "mutationFn">) => useMutation<TData, TError, {
+ dagId: string;
+ requestBody: BulkDAGRunClearBody;
+}, TContext>({ mutationFn: ({ dagId, requestBody }) =>
DagRunService.clearDagRuns({ dagId, requestBody }) as unknown as
Promise<TData>, ...options });
+/**
* Favorite Dag
* Mark the Dag as favorite.
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index d3ab8bafd6c..53482dbff5c 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -971,6 +971,64 @@ export const $BulkDAGRunBody = {
description: 'Request body for bulk delete operations on Dag Runs.'
} as const;
+export const $BulkDAGRunClearBody = {
+ properties: {
+ dry_run: {
+ type: 'boolean',
+ title: 'Dry Run',
+ default: true
+ },
+ only_failed: {
+ type: 'boolean',
+ title: 'Only Failed',
+ default: false
+ },
+ only_new: {
+ type: 'boolean',
+ title: 'Only New',
+ description: 'Only queue newly added tasks in the latest Dag
version without clearing existing tasks.',
+ default: false
+ },
+ run_on_latest_version: {
+ anyOf: [
+ {
+ type: 'boolean'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Run On Latest Version',
+ description: '(Experimental) Run on the latest bundle version of
the Dag after clearing. If not specified, falls back to the DAG-level
``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version`` config option, and finally ``False``.'
+ },
+ note: {
+ anyOf: [
+ {
+ type: 'string',
+ maxLength: 1000
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Note'
+ },
+ dag_runs: {
+ items: {
+ '$ref': '#/components/schemas/BulkDAGRunBody'
+ },
+ type: 'array',
+ minItems: 1,
+ title: 'Dag Runs'
+ }
+ },
+ additionalProperties: false,
+ type: 'object',
+ required: ['dag_runs'],
+ title: 'BulkDAGRunClearBody',
+ description: 'Request body for the bulk clear Dag Runs endpoint.'
+} as const;
+
export const $BulkDeleteAction_BulkDAGRunBody_ = {
properties: {
action: {
@@ -2774,7 +2832,7 @@ export const $DAGRunClearBody = {
}
],
title: 'Run On Latest Version',
- description: '(Experimental) Run on the latest bundle version of
the Dag after clearing the Dag Run. If not specified, falls back to the
DAG-level ``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version`` config option, and finally ``False`` (the
historical default for clear/rerun).'
+ description: '(Experimental) Run on the latest bundle version of
the Dag after clearing. If not specified, falls back to the DAG-level
``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version`` config option, and finally ``False``.'
},
note: {
anyOf: [
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index 74502a154c8..04b43773dbe 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -3,7 +3,7 @@
import type { CancelablePromise } from './core/CancelablePromise';
import { OpenAPI } from './core/OpenAPI';
import { request as __request } from './core/request';
-import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData,
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse,
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData,
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse,
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse,
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData,
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse,
Dele [...]
+import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData,
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse,
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData,
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse,
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse,
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData,
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse,
Dele [...]
export class AssetService {
/**
@@ -1233,6 +1233,34 @@ export class DagRunService {
});
}
+ /**
+ * Clear Dag Runs
+ * Clear multiple Dag Runs in a single request.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.requestBody
+ * @returns unknown Successful Response
+ * @throws ApiError
+ */
+ public static clearDagRuns(data: ClearDagRunsData):
CancelablePromise<ClearDagRunsResponse> {
+ return __request(OpenAPI, {
+ method: 'POST',
+ url: '/api/v2/dags/{dag_id}/clearDagRuns',
+ path: {
+ dag_id: data.dagId
+ },
+ body: data.requestBody,
+ mediaType: 'application/json',
+ errors: {
+ 400: 'Bad Request',
+ 401: 'Unauthorized',
+ 403: 'Forbidden',
+ 404: 'Not Found',
+ 422: 'Validation Error'
+ }
+ });
+ }
+
/**
* Get Dag Run Stats
* Get duration statistics for a DAG based on its historical completed
runs.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 2000ec81e8f..91c11c794e0 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -301,6 +301,24 @@ export type BulkDAGRunBody = {
dag_id?: string | null;
};
+/**
+ * Request body for the bulk clear Dag Runs endpoint.
+ */
+export type BulkDAGRunClearBody = {
+ dry_run?: boolean;
+ only_failed?: boolean;
+ /**
+ * Only queue newly added tasks in the latest Dag version without clearing
existing tasks.
+ */
+ only_new?: boolean;
+ /**
+ * (Experimental) Run on the latest bundle version of the Dag after
clearing. If not specified, falls back to the DAG-level
``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version`` config option, and finally ``False``.
+ */
+ run_on_latest_version?: boolean | null;
+ note?: string | null;
+ dag_runs: Array<BulkDAGRunBody>;
+};
+
export type BulkDeleteAction_BulkDAGRunBody_ = {
/**
* The action to be performed on the entities.
@@ -748,7 +766,7 @@ export type DAGRunClearBody = {
*/
only_new?: boolean;
/**
- * (Experimental) Run on the latest bundle version of the Dag after
clearing the Dag Run. If not specified, falls back to the DAG-level
``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version`` config option, and finally ``False`` (the
historical default for clear/rerun).
+ * (Experimental) Run on the latest bundle version of the Dag after
clearing. If not specified, falls back to the DAG-level
``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version`` config option, and finally ``False``.
*/
run_on_latest_version?: boolean | null;
note?: string | null;
@@ -2956,6 +2974,13 @@ export type GetListDagRunsBatchData = {
export type GetListDagRunsBatchResponse = DAGRunCollectionResponse;
+export type ClearDagRunsData = {
+ dagId: string;
+ requestBody: BulkDAGRunClearBody;
+};
+
+export type ClearDagRunsResponse = ClearTaskInstanceCollectionResponse |
DAGRunCollectionResponse;
+
export type GetDagRunStatsData = {
dagId: string;
dagRunId: string;
@@ -5478,6 +5503,37 @@ export type $OpenApiTs = {
};
};
};
+ '/api/v2/dags/{dag_id}/clearDagRuns': {
+ post: {
+ req: ClearDagRunsData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: ClearTaskInstanceCollectionResponse |
DAGRunCollectionResponse;
+ /**
+ * Bad Request
+ */
+ 400: HTTPExceptionResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
'/ui/dags/{dag_id}/dagRuns/{dag_run_id}/stats': {
get: {
req: GetDagRunStatsData;
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 318544014e3..f570e5ca3a7 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -1882,6 +1882,218 @@ class TestClearDagRun:
assert response.status_code == 422
+class TestBulkClearDagRuns:
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_bulk_clear_specific_dag(self, test_client, session):
+ """Specific dag_id in URL, dag_run_id in body — clears both runs and
queues them."""
+ response = test_client.post(
+ f"/dags/{DAG1_ID}/clearDagRuns",
+ json={
+ "dry_run": False,
+ "dag_runs": [{"dag_run_id": DAG1_RUN1_ID}, {"dag_run_id":
DAG1_RUN2_ID}],
+ },
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert body["total_entries"] == 2
+ returned_run_ids = sorted(run["dag_run_id"] for run in
body["dag_runs"])
+ assert returned_run_ids == sorted([DAG1_RUN1_ID, DAG1_RUN2_ID])
+ for run in body["dag_runs"]:
+ assert run["state"] == "queued"
+ assert run["dag_id"] == DAG1_ID
+
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_bulk_clear_wildcard_across_dags(self, test_client, session):
+ """``~`` URL with per-entity dag_id — clears runs across Dags in one
call."""
+ response = test_client.post(
+ "/dags/~/clearDagRuns",
+ json={
+ "dry_run": False,
+ "dag_runs": [
+ {"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN1_ID},
+ {"dag_id": DAG2_ID, "dag_run_id": DAG2_RUN1_ID},
+ ],
+ },
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert body["total_entries"] == 2
+ pairs = sorted((run["dag_id"], run["dag_run_id"]) for run in
body["dag_runs"])
+ assert pairs == sorted([(DAG1_ID, DAG1_RUN1_ID), (DAG2_ID,
DAG2_RUN1_ID)])
+ for run in body["dag_runs"]:
+ assert run["state"] == "queued"
+
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_bulk_clear_dry_run_collects_affected_tis_across_runs(self,
test_client, session):
+ """Dry-run returns the union of affected TIs across the listed runs
without mutating state."""
+ response = test_client.post(
+ f"/dags/{DAG1_ID}/clearDagRuns",
+ json={
+ "dry_run": True,
+ "dag_runs": [{"dag_run_id": DAG1_RUN1_ID}, {"dag_run_id":
DAG1_RUN2_ID}],
+ },
+ )
+ assert response.status_code == 200
+ body = response.json()
+ # Both DAG1 runs have two task instances each.
+ assert body["total_entries"] == 4
+ run_ids_in_response = {ti["dag_run_id"] for ti in
body["task_instances"]}
+ assert run_ids_in_response == {DAG1_RUN1_ID, DAG1_RUN2_ID}
+ # No state changes — dry_run never writes.
+ dag_run = session.scalar(
+ select(DagRun).where(DagRun.dag_id == DAG1_ID, DagRun.run_id ==
DAG1_RUN1_ID)
+ )
+ assert dag_run.state == DAG1_RUN1_STATE
+
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_bulk_clear_dry_run_only_failed_filters(self, test_client):
+ """``only_failed=True`` shrinks the dry-run preview to failed TIs
only."""
+ response = test_client.post(
+ f"/dags/{DAG1_ID}/clearDagRuns",
+ json={
+ "dry_run": True,
+ "only_failed": True,
+ "dag_runs": [{"dag_run_id": DAG1_RUN2_ID}],
+ },
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert all(ti["state"] == "failed" for ti in body["task_instances"])
+ assert body["total_entries"] == 1
+
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_bulk_clear_applies_note_to_each_run(self, test_client, session):
+ """``note`` in the body is applied to every cleared run in the same
transaction."""
+ response = test_client.post(
+ f"/dags/{DAG1_ID}/clearDagRuns",
+ json={
+ "dry_run": False,
+ "note": "bulk cleared by test",
+ "dag_runs": [{"dag_run_id": DAG1_RUN1_ID}, {"dag_run_id":
DAG1_RUN2_ID}],
+ },
+ )
+ assert response.status_code == 200
+ for run_id in (DAG1_RUN1_ID, DAG1_RUN2_ID):
+ dag_run = session.scalar(select(DagRun).where(DagRun.dag_id ==
DAG1_ID, DagRun.run_id == run_id))
+ assert dag_run.note == "bulk cleared by test"
+
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_bulk_clear_wildcard_rejects_missing_dag_id(self, test_client):
+ """``~`` URL requires every entry to carry a concrete dag_id; 400
otherwise."""
+ response = test_client.post(
+ "/dags/~/clearDagRuns",
+ json={
+ "dry_run": False,
+ "dag_runs": [{"dag_run_id": DAG1_RUN1_ID}],
+ },
+ )
+ assert response.status_code == 400
+ assert DAG1_RUN1_ID in response.json()["detail"]
+
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_bulk_clear_specific_url_rejects_mismatched_dag_id(self,
test_client):
+ """When the URL has a specific dag_id, mismatched per-entity dag_id is
rejected."""
+ response = test_client.post(
+ f"/dags/{DAG1_ID}/clearDagRuns",
+ json={
+ "dry_run": False,
+ "dag_runs": [{"dag_id": DAG2_ID, "dag_run_id": DAG2_RUN1_ID}],
+ },
+ )
+ assert response.status_code == 400
+
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_bulk_clear_missing_run_returns_404(self, test_client):
+ response = test_client.post(
+ f"/dags/{DAG1_ID}/clearDagRuns",
+ json={
+ "dry_run": False,
+ "dag_runs": [{"dag_run_id": "does_not_exist"}],
+ },
+ )
+ assert response.status_code == 404
+
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_bulk_clear_rejects_only_new_with_only_failed(self, test_client):
+ """``only_new`` and ``only_failed`` are mutually exclusive at the body
validator level."""
+ response = test_client.post(
+ f"/dags/{DAG1_ID}/clearDagRuns",
+ json={
+ "dry_run": True,
+ "only_new": True,
+ "only_failed": True,
+ "dag_runs": [{"dag_run_id": DAG1_RUN1_ID}],
+ },
+ )
+ assert response.status_code == 422
+
+ def test_bulk_clear_unauthenticated_returns_401(self,
unauthenticated_test_client):
+ response = unauthenticated_test_client.post(
+ f"/dags/{DAG1_ID}/clearDagRuns",
+ json={"dry_run": False, "dag_runs": [{"dag_run_id":
DAG1_RUN1_ID}]},
+ )
+ assert response.status_code == 401
+
+ def test_bulk_clear_unauthorized_returns_403(self,
unauthorized_test_client):
+ response = unauthorized_test_client.post(
+ f"/dags/{DAG1_ID}/clearDagRuns",
+ json={"dry_run": False, "dag_runs": [{"dag_run_id":
DAG1_RUN1_ID}]},
+ )
+ assert response.status_code == 403
+
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_bulk_clear_rejects_unauthorized_dag_ids_from_request_body(self,
test_client, session):
+ """A 403 at the route level if any entry references a Dag the user
can't access; nothing is cleared."""
+ restricted_bundle_name = "restricted-bundle-clear"
+ restricted_team_name = "restricted-team-clear"
+ restricted_bundle = DagBundleModel(name=restricted_bundle_name)
+ restricted_team = Team(name=restricted_team_name)
+ restricted_bundle.teams.append(restricted_team)
+ session.add_all([restricted_bundle, restricted_team])
+ session.flush()
+ # Restrict DAG2 by attaching it to a team-scoped bundle the limited
user has no access to.
+ session.execute(
+ update(DagModel).where(DagModel.dag_id ==
DAG2_ID).values(bundle_name=restricted_bundle_name)
+ )
+ session.commit()
+
+ states_before = {
+ run_id: session.scalar(select(DagRun.state).where(DagRun.run_id ==
run_id))
+ for run_id in (DAG1_RUN1_ID, DAG2_RUN1_ID)
+ }
+
+ auth_manager = test_client.app.state.auth_manager
+ token = auth_manager._get_token_signer().generate(
+ auth_manager.serialize_user(
+ SimpleAuthManagerUser(username="limited-user", role="user",
teams=[]),
+ )
+ )
+ with (
+ mock.patch("airflow.models.revoked_token.RevokedToken.is_revoked",
return_value=False),
+ TestClient(
+ test_client.app,
+ headers={"Authorization": f"Bearer {token}"},
+ base_url=str(test_client.base_url),
+ ) as limited_test_client,
+ ):
+ response = limited_test_client.post(
+ "/dags/~/clearDagRuns",
+ json={
+ "dry_run": False,
+ "dag_runs": [
+ {"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN1_ID},
+ {"dag_id": DAG2_ID, "dag_run_id": DAG2_RUN1_ID},
+ ],
+ },
+ )
+
+ assert response.status_code == 403
+ # The batched auth check rejects the whole request, so the authorized
Dag's run is not cleared either.
+ session.expire_all()
+ for run_id, state_before in states_before.items():
+ assert session.scalar(select(DagRun.state).where(DagRun.run_id ==
run_id)) == state_before
+
+
class TestClearDagRunOnlyNew:
"""Integration tests for only_new=True using a real two-version DAG.
@@ -2568,12 +2780,12 @@ class TestResolveRunOnLatestVersion:
def test_clear_endpoint_invokes_resolver_when_field_omitted(self,
test_client):
"""Clearing without run_on_latest_version triggers the server-side
resolver."""
with mock.patch(
-
"airflow.api_fastapi.core_api.routes.public.dag_run.resolve_run_on_latest_version",
+
"airflow.api_fastapi.core_api.services.public.dag_run.resolve_run_on_latest_version",
return_value=False,
) as mock_resolver:
response = test_client.post(
f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear",
- json={"dry_run": True},
+ json={"dry_run": False},
)
assert response.status_code == 200
mock_resolver.assert_called_once()
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index c019b4381f3..7b9e95e5528 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -120,6 +120,38 @@ class BulkDAGRunBody(BaseModel):
dag_id: Annotated[str | None, Field(title="Dag Id")] = None
+class Note(RootModel[str]):
+ root: Annotated[str, Field(max_length=1000, title="Note")]
+
+
+class BulkDAGRunClearBody(BaseModel):
+ """
+ Request body for the bulk clear Dag Runs endpoint.
+ """
+
+ model_config = ConfigDict(
+ extra="forbid",
+ )
+ dry_run: Annotated[bool | None, Field(title="Dry Run")] = True
+ only_failed: Annotated[bool | None, Field(title="Only Failed")] = False
+ only_new: Annotated[
+ bool | None,
+ Field(
+ description="Only queue newly added tasks in the latest Dag
version without clearing existing tasks.",
+ title="Only New",
+ ),
+ ] = False
+ run_on_latest_version: Annotated[
+ bool | None,
+ Field(
+ description="(Experimental) Run on the latest bundle version of
the Dag after clearing. If not specified, falls back to the DAG-level
``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version`` config option, and finally ``False``.",
+ title="Run On Latest Version",
+ ),
+ ] = None
+ note: Annotated[Note | None, Field(title="Note")] = None
+ dag_runs: Annotated[list[BulkDAGRunBody], Field(min_length=1, title="Dag
Runs")]
+
+
class BulkDeleteActionBulkDAGRunBody(BaseModel):
model_config = ConfigDict(
extra="forbid",
@@ -157,10 +189,6 @@ class BulkResponse(BaseModel):
] = None
-class Note(RootModel[str]):
- root: Annotated[str, Field(max_length=1000, title="Note")]
-
-
class BulkUpdateActionBulkDAGRunBody(BaseModel):
model_config = ConfigDict(
extra="forbid",
@@ -344,7 +372,7 @@ class DAGRunClearBody(BaseModel):
run_on_latest_version: Annotated[
bool | None,
Field(
- description="(Experimental) Run on the latest bundle version of
the Dag after clearing the Dag Run. If not specified, falls back to the
DAG-level ``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version`` config option, and finally ``False`` (the
historical default for clear/rerun).",
+ description="(Experimental) Run on the latest bundle version of
the Dag after clearing. If not specified, falls back to the DAG-level
``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version`` config option, and finally ``False``.",
title="Run On Latest Version",
),
] = None