This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9288949db28 Add bulk delete Dag Runs (#67095)
9288949db28 is described below
commit 9288949db28e811bb0fbeb88ebd8d509fb159e61
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Thu May 21 22:14:30 2026 +0200
Add bulk delete Dag Runs (#67095)
* Add bulk delete endpoint for Dag Runs
Restores feature parity with Airflow 2.x where DagRunModelView exposed
collective Delete on the Dag Runs list view. Adds:
- PATCH /dags/{dag_id}/dagRuns — bulk endpoint structured like the
existing bulk task-instances endpoint. Only ``delete`` is supported in
this PR; ``create`` and ``update`` are wired to return 405 in the
BulkResponse so future PRs can fill them in without changing the
route surface.
- BulkDagRunService with deletable-state enforcement (matches the
single-run delete: only QUEUED / SUCCESS / FAILED can be deleted),
per-Dag authorization caching for the wildcard path
/dags/~/dagRuns, and ``action_on_non_existence: fail | skip``
semantics.
- Row selection + a Delete bulk action on the runs list page,
mirroring how Task Instances does it.
Bulk Mark-as and Bulk Clear are intentionally out of scope and will
follow in separate PRs.
The grid view stays single-select; multi-select on the grid was not
available in 2.x either, and the runs list page is the natural target
for bulk operations on a filtered set (e.g. state=failed).
closes: #52439
* Address self-review on bulk delete Dag Runs
- Switch BulkDagRunService._fetch_dag_runs to tuple_(dag_id, run_id).in_()
to avoid a Cartesian over-fetch when /dags/~/dagRuns is called with
pairs spanning multiple Dags. Matches BulkTaskInstanceService.
- Narrow _check_dag_authorization's method type to Literal["DELETE"];
this PR only does delete, no point in exposing PUT/POST/GET on the
signature.
- Add a wildcard test that exercises the per-Dag authorization path
(limited user accepted for one Dag, rejected with 403 in the
BulkResponse for a team-restricted Dag).
* Apply Brent's review feedback from the closed twin PR
Pulls in the substantive UI feedback @bbovenzi left on #66554 so this
PR lands without re-litigating the same comments. The closed PR was
broader (clear / mark / delete) but the structural feedback applies
verbatim to delete-only:
- Move all DagRuns-related files into ``pages/DagRuns/``, mirroring
``pages/TaskInstances/``. Adds a small ``index.ts`` re-export so
``router.tsx`` keeps the same import path.
- Rename ``useBulkDagRuns`` to ``useBulkDeleteDagRuns`` so the hook
name matches the single button it serves (one-hook-one-button
symmetry — when we add bulk update/clear we'll add sibling hooks).
- Stop hand-rolling pending/error state with ``useState<unknown>``;
read ``error`` straight off ``useMutation``'s return.
- Surface ALL per-entity errors from ``BulkResponse.delete.errors``
instead of just the first; render each as its own ``Alert`` row.
- Only invalidate the dag-runs / task-instances queries when at least
one entry actually succeeded — a 200 with all-errors should not
churn the table.
- Keep the dialog open when per-entity errors come back so the user
can read what failed; ``reset()`` clears them on close.
* Address inline review: mirror task-instance bulk delete patterns
Backend:
- Inline ``_resolve_dag_id``, ``_result_key`` and ``_fetch_dag_runs`` —
each had exactly one caller. Memory rule added so future PRs avoid
premature helpers.
- Drop the deletable-state restriction. Bulk task-instance delete has no
such restriction; bulk Dag-run delete shouldn't either.
- Emit one 404 error per missing entity when ``action_on_non_existence``
is ``fail`` instead of collating them into a single error, and stop
early-returning so matched runs still get deleted. The invariant
``len(success) + len(errors) == len(requested entities)`` now holds.
- Distinguish the two ways a wildcard can leak into ``dag_id`` (path
vs body) in the 400 message, mirroring
``BulkTaskInstanceService._categorize_entities``.
UI:
- Mirror ``useBulkTaskInstances`` exactly: bring back ``useState`` for
the error, the shared ``handleActionResult`` helper, single-error
surfacing via ``ErrorAlert``, and the ``bulkAction(requestBody)``
shape so the consumer constructs the full ``BulkBody``. Brent's prior
review of the closed twin PR pushed past this pattern, but until TI
is updated we want both hooks symmetrical — a follow-up can improve
both at once.
- Inline the affected-runs column array into ``BulkDeleteDagRunsButton``
and delete the standalone ``bulkDagRunsColumns.tsx`` file (single
caller).
* Move bulk Dag Run authorization to a dedicated route dependency
Adds ``requires_access_dag_run_bulk`` in ``core_api/security.py``
following the ``requires_access_connection_bulk`` pattern. The
dependency reads the parsed ``BulkBody[BulkDAGRunBody]``, resolves
each entity's ``dag_id`` (body wins, falling back to the path),
collects team mappings per Dag, and uses ``batch_is_authorized_dag``
to enforce auth before the route handler runs.
The route now declares only this dependency plus ``action_logging``;
the per-entity auth check is no longer duplicated inside
``BulkDagRunService``. Unauthorized requests fail with a single
route-level 403 instead of returning 200 with per-entity 403s in the
``BulkResponse``, matching how connections / pools / variables behave.
* Mirror single-run DELETE state restriction on bulk delete
Single ``DELETE /dags/{dag_id}/dagRuns/{dag_run_id}`` rejects RUNNING
runs with 409; bulk delete now does the same — a RUNNING entity yields
a per-entity 409 in ``BulkResponse.errors`` and the matched non-running
entities still get deleted.
Also renames ``DAGRunPatchStates`` to ``DagRunMutableStates`` since it
now gates both PATCH (mark-as) and DELETE — same set of states (QUEUED,
SUCCESS, FAILED), broader meaning. Propagated through the route handlers,
the bulk service, and the UI components that import the generated type.
* Invalidate dependent queries after bulk delete Dag Run
Bulk delete only invalidated the top-level dag-runs and task-instances
lists. Two more cache sets stay stale otherwise:
- Per-attempt TI caches (log / extra links / try details), keyed by
TI identity. When the user navigates back to a TI try detail via
the browser back button after deleting its run, react-query serves
the cached log instead of letting the request hit and return 404.
- The grid view query set for each affected Dag — the grid renders
one bar per Dag Run, so bulk delete literally removes bars.
The per-attempt set mirrors the addition ``useBulkTaskInstances``
already gained in #67212. The grid invalidation is specific to this
hook because deleting Dag Runs (unlike deleting TIs) changes what the
grid bars themselves represent.
The affected ``dag_id`` set is captured in ``bulkAction`` from the
request body and read in ``onSuccess``, same lifecycle as
``useBulkClearTaskInstances``'s ``byDagRun`` grouping.
* Restore resolve_run_on_latest_version import dropped during rebase
Rebasing the branch onto the latest ``apache/main`` with the
``-X theirs`` strategy used the incoming side for every conflict in
``routes/public/dag_run.py``. That dropped the
``resolve_run_on_latest_version`` import that #63884 added — the
function call at line 336 was preserved but the import line wasn't,
so the module failed to load and every test in ``test_dag_run.py``
crashed at collection (Static checks, mypy, and all DB-core test
matrices on CI).
This commit only adds the missing import back; no other change.
---
.../api_fastapi/core_api/datamodels/dag_run.py | 13 +-
.../core_api/openapi/v2-rest-api-generated.yaml | 328 +++++++++++++++------
.../api_fastapi/core_api/routes/public/dag_run.py | 28 +-
.../src/airflow/api_fastapi/core_api/security.py | 52 ++++
.../core_api/services/public/dag_run.py | 115 +++++++-
.../src/airflow/ui/openapi-gen/queries/common.ts | 17 +-
.../ui/openapi-gen/queries/ensureQueryData.ts | 26 +-
.../src/airflow/ui/openapi-gen/queries/prefetch.ts | 26 +-
.../src/airflow/ui/openapi-gen/queries/queries.ts | 76 +++--
.../src/airflow/ui/openapi-gen/queries/suspense.ts | 26 +-
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 170 ++++++++++-
.../ui/openapi-gen/requests/services.gen.ts | 98 +++---
.../airflow/ui/openapi-gen/requests/types.gen.ts | 154 +++++++---
.../src/components/MarkAs/Run/MarkRunAsButton.tsx | 4 +-
.../src/components/MarkAs/Run/MarkRunAsDialog.tsx | 4 +-
.../src/airflow/ui/src/components/MarkAs/utils.ts | 4 +-
.../src/pages/DagRuns/BulkDeleteDagRunsButton.tsx | 176 +++++++++++
.../ui/src/pages/{ => DagRuns}/DagRuns.test.tsx | 0
.../airflow/ui/src/pages/{ => DagRuns}/DagRuns.tsx | 79 ++++-
.../ui/src/pages/{ => DagRuns}/DagRunsFilters.tsx | 0
.../ui/src/pages/{ => DagRuns}/DeleteRunButton.tsx | 0
.../MarkAs/utils.ts => pages/DagRuns/index.ts} | 4 +-
.../src/airflow/ui/src/pages/Run/Header.tsx | 2 +-
.../airflow/ui/src/queries/useBulkDeleteDagRuns.ts | 114 +++++++
.../core_api/routes/public/test_dag_run.py | 274 ++++++++++++++++-
.../src/airflowctl/api/datamodels/generated.py | 93 +++++-
26 files changed, 1602 insertions(+), 281 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
index b1e2500203e..97b37fdaae5 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -35,8 +35,8 @@ if TYPE_CHECKING:
from airflow.serialization.definitions.dag import SerializedDAG
-class DAGRunPatchStates(str, Enum):
- """Enum for Dag Run states when updating a Dag Run."""
+class DagRunMutableStates(str, Enum):
+ """Dag Run states from which the run may be mutated (patched, deleted)."""
QUEUED = DagRunState.QUEUED
SUCCESS = DagRunState.SUCCESS
@@ -46,10 +46,17 @@ class DAGRunPatchStates(str, Enum):
class DAGRunPatchBody(StrictBaseModel):
"""Dag Run Serializer for PATCH requests."""
- state: DAGRunPatchStates | None = None
+ state: DagRunMutableStates | None = None
note: str | None = Field(None, max_length=1000)
+class BulkDAGRunBody(StrictBaseModel):
+ """Request body for bulk delete operations on Dag Runs."""
+
+ dag_run_id: str
+ dag_id: str | None = None
+
+
class DAGRunClearBody(StrictBaseModel):
"""Dag Run serializer for clear endpoint body."""
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 00a9c60d858..23f2622b13b 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -2052,67 +2052,13 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
- /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents:
- get:
- tags:
- - DagRun
- summary: Get Upstream Asset Events
- description: If dag run is asset-triggered, return the asset events that
triggered
- it.
- operationId: get_upstream_asset_events
- security:
- - OAuth2PasswordBearer: []
- - HTTPBearer: []
- parameters:
- - name: dag_id
- in: path
- required: true
- schema:
- type: string
- title: Dag Id
- - name: dag_run_id
- in: path
- required: true
- schema:
- type: string
- title: Dag Run Id
- responses:
- '200':
- description: Successful Response
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/AssetEventCollectionResponse'
- '401':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Unauthorized
- '403':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Forbidden
- '404':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Not Found
- '422':
- description: Validation Error
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPValidationError'
- /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear:
- post:
+ /api/v2/dags/{dag_id}/dagRuns:
+ patch:
tags:
- DagRun
- summary: Clear Dag Run
- operationId: clear_dag_run
+ summary: Bulk Dag Runs
+ description: Bulk delete Dag Runs.
+ operationId: bulk_dag_runs
security:
- OAuth2PasswordBearer: []
- HTTPBearer: []
@@ -2123,28 +2069,19 @@ paths:
schema:
type: string
title: Dag Id
- - name: dag_run_id
- in: path
- required: true
- schema:
- type: string
- title: Dag Run Id
requestBody:
required: true
content:
application/json:
schema:
- $ref: '#/components/schemas/DAGRunClearBody'
+ $ref: '#/components/schemas/BulkBody_BulkDAGRunBody_'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
- anyOf:
- - $ref:
'#/components/schemas/ClearTaskInstanceCollectionResponse'
- - $ref: '#/components/schemas/DAGRunResponse'
- title: Response Clear Dag Run
+ $ref: '#/components/schemas/BulkResponse'
'401':
content:
application/json:
@@ -2157,19 +2094,12 @@ paths:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
- '404':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
- /api/v2/dags/{dag_id}/dagRuns:
get:
tags:
- DagRun
@@ -2792,6 +2722,123 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents:
+ get:
+ tags:
+ - DagRun
+ summary: Get Upstream Asset Events
+ description: If dag run is asset-triggered, return the asset events that
triggered
+ it.
+ operationId: get_upstream_asset_events
+ security:
+ - OAuth2PasswordBearer: []
+ - HTTPBearer: []
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: dag_run_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Run Id
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/AssetEventCollectionResponse'
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
+ /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear:
+ post:
+ tags:
+ - DagRun
+ summary: Clear Dag Run
+ operationId: clear_dag_run
+ security:
+ - OAuth2PasswordBearer: []
+ - HTTPBearer: []
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: dag_run_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Run Id
+ requestBody:
+ required: true
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/DAGRunClearBody'
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ anyOf:
+ - $ref:
'#/components/schemas/ClearTaskInstanceCollectionResponse'
+ - $ref: '#/components/schemas/DAGRunResponse'
+ title: Response Clear Dag Run
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait:
get:
tags:
@@ -11560,6 +11607,21 @@ components:
This structure helps users understand which key actions succeeded and
which
failed.'
+ BulkBody_BulkDAGRunBody_:
+ properties:
+ actions:
+ items:
+ oneOf:
+ - $ref: '#/components/schemas/BulkCreateAction_BulkDAGRunBody_'
+ - $ref: '#/components/schemas/BulkUpdateAction_BulkDAGRunBody_'
+ - $ref: '#/components/schemas/BulkDeleteAction_BulkDAGRunBody_'
+ type: array
+ title: Actions
+ additionalProperties: false
+ type: object
+ required:
+ - actions
+ title: BulkBody[BulkDAGRunBody]
BulkBody_BulkTaskInstanceBody_:
properties:
actions:
@@ -11620,6 +11682,28 @@ components:
required:
- actions
title: BulkBody[VariableBody]
+ BulkCreateAction_BulkDAGRunBody_:
+ properties:
+ action:
+ type: string
+ const: create
+ title: Action
+ description: The action to be performed on the entities.
+ entities:
+ items:
+ $ref: '#/components/schemas/BulkDAGRunBody'
+ type: array
+ title: Entities
+ description: A list of entities to be created.
+ action_on_existence:
+ $ref: '#/components/schemas/BulkActionOnExistence'
+ default: fail
+ additionalProperties: false
+ type: object
+ required:
+ - action
+ - entities
+ title: BulkCreateAction[BulkDAGRunBody]
BulkCreateAction_BulkTaskInstanceBody_:
properties:
action:
@@ -11708,6 +11792,46 @@ components:
- action
- entities
title: BulkCreateAction[VariableBody]
+ BulkDAGRunBody:
+ properties:
+ dag_run_id:
+ type: string
+ title: Dag Run Id
+ dag_id:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Dag Id
+ additionalProperties: false
+ type: object
+ required:
+ - dag_run_id
+ title: BulkDAGRunBody
+ description: Request body for bulk delete operations on Dag Runs.
+ BulkDeleteAction_BulkDAGRunBody_:
+ properties:
+ action:
+ type: string
+ const: delete
+ title: Action
+ description: The action to be performed on the entities.
+ entities:
+ items:
+ anyOf:
+ - type: string
+ - $ref: '#/components/schemas/BulkDAGRunBody'
+ type: array
+ title: Entities
+ description: A list of entity id/key or entity objects to be deleted.
+ action_on_non_existence:
+ $ref: '#/components/schemas/BulkActionNotOnExistence'
+ default: fail
+ additionalProperties: false
+ type: object
+ required:
+ - action
+ - entities
+ title: BulkDeleteAction[BulkDAGRunBody]
BulkDeleteAction_BulkTaskInstanceBody_:
properties:
action:
@@ -11889,6 +12013,38 @@ components:
- task_id
title: BulkTaskInstanceBody
description: Request body for bulk update, and delete task instances.
+ BulkUpdateAction_BulkDAGRunBody_:
+ properties:
+ action:
+ type: string
+ const: update
+ title: Action
+ description: The action to be performed on the entities.
+ entities:
+ items:
+ $ref: '#/components/schemas/BulkDAGRunBody'
+ type: array
+ title: Entities
+ description: A list of entities to be updated.
+ update_mask:
+ anyOf:
+ - items:
+ type: string
+ type: array
+ - type: 'null'
+ title: Update Mask
+ description: A list of field names to update for each entity.Only
these
+ fields will be applied from the request body to the database
model.Any
+ extra fields provided will be ignored.
+ action_on_non_existence:
+ $ref: '#/components/schemas/BulkActionNotOnExistence'
+ default: fail
+ additionalProperties: false
+ type: object
+ required:
+ - action
+ - entities
+ title: BulkUpdateAction[BulkDAGRunBody]
BulkUpdateAction_BulkTaskInstanceBody_:
properties:
action:
@@ -12920,7 +13076,7 @@ components:
properties:
state:
anyOf:
- - $ref: '#/components/schemas/DAGRunPatchStates'
+ - $ref: '#/components/schemas/DagRunMutableStates'
- type: 'null'
note:
anyOf:
@@ -12932,14 +13088,6 @@ components:
type: object
title: DAGRunPatchBody
description: Dag Run Serializer for PATCH requests.
- DAGRunPatchStates:
- type: string
- enum:
- - queued
- - success
- - failed
- title: DAGRunPatchStates
- description: Enum for Dag Run states when updating a Dag Run.
DAGRunResponse:
properties:
dag_run_id:
@@ -13400,6 +13548,14 @@ components:
- partition_key
title: DagRunAssetReference
description: DagRun serializer for asset responses.
+ DagRunMutableStates:
+ type: string
+ enum:
+ - queued
+ - success
+ - failed
+ title: DagRunMutableStates
+ description: Dag Run states from which the run may be mutated (patched,
deleted).
DagRunState:
type: string
enum:
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 4577a516089..a5c76550be5 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -76,11 +76,13 @@ from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.common.types import Mimetype
from airflow.api_fastapi.core_api.base import OrmClause
from airflow.api_fastapi.core_api.datamodels.assets import
AssetEventCollectionResponse
+from airflow.api_fastapi.core_api.datamodels.common import BulkBody,
BulkResponse
from airflow.api_fastapi.core_api.datamodels.dag_run import (
+ BulkDAGRunBody,
DAGRunClearBody,
DAGRunCollectionResponse,
+ DagRunMutableStates,
DAGRunPatchBody,
- DAGRunPatchStates,
DAGRunResponse,
DAGRunsBatchBody,
TriggerDAGRunPostBody,
@@ -96,9 +98,10 @@ from airflow.api_fastapi.core_api.security import (
ReadableDagRunsFilterDep,
requires_access_asset,
requires_access_dag,
+ requires_access_dag_run_bulk,
)
from airflow.api_fastapi.core_api.services.public.common import
resolve_run_on_latest_version
-from airflow.api_fastapi.core_api.services.public.dag_run import DagRunWaiter
+from airflow.api_fastapi.core_api.services.public.dag_run import
BulkDagRunService, DagRunWaiter
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import ParamValidationError
from airflow.listeners.listener import get_listener_manager
@@ -152,7 +155,7 @@ def get_dag_run(dag_id: str, dag_run_id: str, session:
SessionDep) -> DAGRunResp
def delete_dag_run(dag_id: str, dag_run_id: str, session: SessionDep):
"""Delete a Dag Run entry."""
dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id,
run_id=dag_run_id))
- deletable_states = {s.value for s in DAGRunPatchStates}
+ deletable_states = {s.value for s in DagRunMutableStates}
if dag_run is None:
raise HTTPException(
@@ -220,7 +223,7 @@ def patch_dag_run(
for attr_name, attr_value_raw in data.items():
if attr_name == "state":
attr_value = getattr(patch_body, "state")
- if attr_value == DAGRunPatchStates.SUCCESS:
+ if attr_value == DagRunMutableStates.SUCCESS:
set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id,
commit=True, session=session)
try:
get_listener_manager().hook.on_dag_run_success(dag_run=dag_run, msg="")
@@ -229,10 +232,10 @@ def patch_dag_run(
# TODO AIP-103: https://github.com/apache/airflow/issues/66755
# Handle clearing states for all task instances in a dagrun when
cleared
- elif attr_value == DAGRunPatchStates.QUEUED:
+ elif attr_value == DagRunMutableStates.QUEUED:
set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id,
commit=True, session=session)
# Not notifying on queued - only notifying on RUNNING, this is
happening in scheduler
- elif attr_value == DAGRunPatchStates.FAILED:
+ elif attr_value == DagRunMutableStates.FAILED:
set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id,
commit=True, session=session)
try:
get_listener_manager().hook.on_dag_run_failed(dag_run=dag_run, msg="")
@@ -253,6 +256,19 @@ def patch_dag_run(
return final_dag_run
+@dag_run_router.patch(
+ "",
+ dependencies=[Depends(requires_access_dag_run_bulk()),
Depends(action_logging())],
+)
+def bulk_dag_runs(
+ request: BulkBody[BulkDAGRunBody],
+ session: SessionDep,
+ dag_id: str,
+) -> BulkResponse:
+ """Bulk delete Dag Runs."""
+ return BulkDagRunService(session=session, request=request,
dag_id=dag_id).handle_request()
+
+
@dag_run_router.get(
"/{dag_run_id}/upstreamAssetEvents",
responses=create_openapi_http_exception_doc(
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/security.py
b/airflow-core/src/airflow/api_fastapi/core_api/security.py
index a54425326d3..f139ebc8ce7 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/security.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/security.py
@@ -37,6 +37,7 @@ from airflow.api_fastapi.auth.managers.base_auth_manager
import (
from airflow.api_fastapi.auth.managers.models.base_user import BaseUser
from airflow.api_fastapi.auth.managers.models.batch_apis import (
IsAuthorizedConnectionRequest,
+ IsAuthorizedDagRequest,
IsAuthorizedPoolRequest,
IsAuthorizedVariableRequest,
)
@@ -62,6 +63,7 @@ from airflow.api_fastapi.core_api.datamodels.common import (
BulkUpdateAction,
)
from airflow.api_fastapi.core_api.datamodels.connections import ConnectionBody
+from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunBody
from airflow.api_fastapi.core_api.datamodels.pools import PoolBody
from airflow.api_fastapi.core_api.datamodels.variables import VariableBody
from airflow.configuration import conf
@@ -725,6 +727,56 @@ def requires_access_variable_bulk() ->
Callable[[BulkBody[VariableBody], BaseUse
return inner
+def requires_access_dag_run_bulk() -> Callable[[BulkBody[BulkDAGRunBody],
BaseUser, str], None]:
+ def inner(
+ request: BulkBody[BulkDAGRunBody],
+ user: GetUserDep,
+ dag_id: str,
+ ) -> None:
+ resolved_dag_ids: set[str] = set()
+ for action in request.actions:
+ for entity in action.entities:
+ if isinstance(entity, str):
+ entity_dag_id: str | None = dag_id
+ else:
+ entity_dag_id = entity.dag_id or dag_id
+ if entity_dag_id and entity_dag_id != "~":
+ resolved_dag_ids.add(entity_dag_id)
+
+ dag_id_to_team = {d: DagModel.get_team_name(d) for d in
resolved_dag_ids}
+
+ requests: list[IsAuthorizedDagRequest] = []
+ for action in request.actions:
+ methods = _get_resource_methods_from_bulk_request(action)
+ for entity in action.entities:
+ if isinstance(entity, str):
+ entity_dag_id = dag_id
+ else:
+ entity_dag_id = entity.dag_id or dag_id
+ # Entities that can't be resolved are surfaced as 400 in the
service's BulkResponse.
+ if not entity_dag_id or entity_dag_id == "~":
+ continue
+ for method in methods:
+ requests.append(
+ {
+ "method": method,
+ "access_entity": DagAccessEntity.RUN,
+ "details": DagDetails(
+ id=entity_dag_id,
team_name=dag_id_to_team.get(entity_dag_id)
+ ),
+ }
+ )
+
+ _requires_access(
+ is_authorized_callback=lambda:
get_auth_manager().batch_is_authorized_dag(
+ requests=requests,
+ user=user,
+ )
+ )
+
+ return inner
+
+
def requires_access_asset(method: ResourceMethod) -> Callable[[Request,
BaseUser], None]:
def inner(
request: Request,
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
index e7d7cb98c93..4c5a5b090b6 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
@@ -24,8 +24,21 @@ import operator
from typing import TYPE_CHECKING, Any
import attrs
-from sqlalchemy import select
+import structlog
+from fastapi import status
+from sqlalchemy import select, tuple_
+from sqlalchemy.orm import Session
+from airflow.api_fastapi.core_api.datamodels.common import (
+ BulkActionNotOnExistence,
+ BulkActionResponse,
+ BulkBody,
+ BulkCreateAction,
+ BulkDeleteAction,
+ BulkUpdateAction,
+)
+from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunBody,
DagRunMutableStates
+from airflow.api_fastapi.core_api.services.public.common import BulkService
from airflow.models.dagrun import DagRun
from airflow.models.xcom import XCOM_RETURN_KEY, XComModel
from airflow.utils.session import create_session_async
@@ -34,6 +47,8 @@ from airflow.utils.state import State
if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Iterator
+log = structlog.get_logger(__name__)
+
@attrs.define
class DagRunWaiter:
@@ -86,3 +101,101 @@ class DagRunWaiter:
await asyncio.sleep(self.interval)
yield await self._serialize_response(dag_run := await
self._get_dag_run())
yield "\n"
+
+
+class BulkDagRunService(BulkService[BulkDAGRunBody]):
+ """Service for handling bulk operations on Dag Runs."""
+
+ def __init__(
+ self,
+ session: Session,
+ request: BulkBody[BulkDAGRunBody],
+ dag_id: str,
+ ):
+ super().__init__(session, request)
+ self.dag_id = dag_id
+
+ def handle_bulk_create(
+ self, action: BulkCreateAction[BulkDAGRunBody], results:
BulkActionResponse
+ ) -> None:
+ results.errors.append(
+ {
+ "error": "Dag Runs bulk create is not supported. Use the
trigger Dag Run endpoint instead.",
+ "status_code": status.HTTP_405_METHOD_NOT_ALLOWED,
+ }
+ )
+
+ def handle_bulk_update(
+ self, action: BulkUpdateAction[BulkDAGRunBody], results:
BulkActionResponse
+ ) -> None:
+ results.errors.append(
+ {
+ "error": "Dag Runs bulk update is not supported yet. Use the
patch Dag Run endpoint per run instead.",
+ "status_code": status.HTTP_405_METHOD_NOT_ALLOWED,
+ }
+ )
+
+ def handle_bulk_delete(
+ self, action: BulkDeleteAction[BulkDAGRunBody], results:
BulkActionResponse
+ ) -> None:
+ """Bulk delete Dag Runs."""
+ keys: set[tuple[str, str]] = set()
+
+ for entity in action.entities:
+ if isinstance(entity, str):
+ dag_id, dag_run_id = self.dag_id, entity
+ else:
+ dag_id = entity.dag_id or self.dag_id
+ dag_run_id = entity.dag_run_id
+
+ if dag_id == "~" or dag_run_id == "~":
+ if isinstance(entity, str):
+ error_msg = (
+ "When using wildcard in path, dag_id must be specified
in BulkDAGRunBody"
+ f" object, not as string for dag_run_id: {entity}"
+ )
+ else:
+ error_msg = (
+ "When using wildcard in path, dag_id must be specified
in request body for"
+ f" dag_run_id: {entity.dag_run_id}"
+ )
+ results.errors.append(
+ {"error": error_msg, "status_code":
status.HTTP_400_BAD_REQUEST},
+ )
+ continue
+
+ keys.add((dag_id, dag_run_id))
+
+ if not keys:
+ return
+
+ dag_runs = self.session.scalars(
+ select(DagRun).where(tuple_(DagRun.dag_id,
DagRun.run_id).in_(list(keys)))
+ ).all()
+ dag_run_map = {(dr.dag_id, dr.run_id): dr for dr in dag_runs}
+ not_found = keys - dag_run_map.keys()
+
+ if action.action_on_non_existence == BulkActionNotOnExistence.FAIL:
+ for dag_id, run_id in sorted(not_found):
+ results.errors.append(
+ {
+ "error": (f"The DagRun with dag_id: `{dag_id}` and
run_id: `{run_id}` was not found"),
+ "status_code": status.HTTP_404_NOT_FOUND,
+ }
+ )
+
+ deletable_states = {s.value for s in DagRunMutableStates}
+ for (dag_id, run_id), dag_run in dag_run_map.items():
+ if dag_run.state not in deletable_states:
+ results.errors.append(
+ {
+ "error": (
+ f"The DagRun with dag_id: `{dag_id}` and run_id:
`{run_id}` "
+ f"cannot be deleted in {dag_run.state} state"
+ ),
+ "status_code": status.HTTP_409_CONFLICT,
+ }
+ )
+ continue
+ self.session.delete(dag_run)
+ results.success.append(f"{dag_id}.{run_id}")
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
index 812a41d9d08..f8e3dbe9af6 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
@@ -138,13 +138,6 @@ export const UseDagRunServiceGetDagRunKeyFn = ({ dagId,
dagRunId }: {
dagId: string;
dagRunId: string;
}, queryKey?: Array<unknown>) => [useDagRunServiceGetDagRunKey, ...(queryKey
?? [{ dagId, dagRunId }])];
-export type DagRunServiceGetUpstreamAssetEventsDefaultResponse =
Awaited<ReturnType<typeof DagRunService.getUpstreamAssetEvents>>;
-export type DagRunServiceGetUpstreamAssetEventsQueryResult<TData =
DagRunServiceGetUpstreamAssetEventsDefaultResponse, TError = unknown> =
UseQueryResult<TData, TError>;
-export const useDagRunServiceGetUpstreamAssetEventsKey =
"DagRunServiceGetUpstreamAssetEvents";
-export const UseDagRunServiceGetUpstreamAssetEventsKeyFn = ({ dagId, dagRunId
}: {
- dagId: string;
- dagRunId: string;
-}, queryKey?: Array<unknown>) => [useDagRunServiceGetUpstreamAssetEventsKey,
...(queryKey ?? [{ dagId, dagRunId }])];
export type DagRunServiceGetDagRunsDefaultResponse = Awaited<ReturnType<typeof
DagRunService.getDagRuns>>;
export type DagRunServiceGetDagRunsQueryResult<TData =
DagRunServiceGetDagRunsDefaultResponse, TError = unknown> =
UseQueryResult<TData, TError>;
export const useDagRunServiceGetDagRunsKey = "DagRunServiceGetDagRuns";
@@ -193,6 +186,13 @@ export const UseDagRunServiceGetDagRunsKeyFn = ({
bundleVersion, confContains, c
updatedAtLt?: string;
updatedAtLte?: string;
}, queryKey?: Array<unknown>) => [useDagRunServiceGetDagRunsKey, ...(queryKey
?? [{ bundleVersion, confContains, consumingAssetPattern, cursor, dagId,
dagIdPattern, dagIdPrefixPattern, dagVersion, durationGt, durationGte,
durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit,
logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy,
partitionKeyPattern, partitionKeyPrefixPattern, runAfterGt, runAfterGte,
runAfterLt, runAfterLte, runIdPattern, r [...]
+export type DagRunServiceGetUpstreamAssetEventsDefaultResponse =
Awaited<ReturnType<typeof DagRunService.getUpstreamAssetEvents>>;
+export type DagRunServiceGetUpstreamAssetEventsQueryResult<TData =
DagRunServiceGetUpstreamAssetEventsDefaultResponse, TError = unknown> =
UseQueryResult<TData, TError>;
+export const useDagRunServiceGetUpstreamAssetEventsKey =
"DagRunServiceGetUpstreamAssetEvents";
+export const UseDagRunServiceGetUpstreamAssetEventsKeyFn = ({ dagId, dagRunId
}: {
+ dagId: string;
+ dagRunId: string;
+}, queryKey?: Array<unknown>) => [useDagRunServiceGetUpstreamAssetEventsKey,
...(queryKey ?? [{ dagId, dagRunId }])];
export type DagRunServiceWaitDagRunUntilFinishedDefaultResponse =
Awaited<ReturnType<typeof DagRunService.waitDagRunUntilFinished>>;
export type DagRunServiceWaitDagRunUntilFinishedQueryResult<TData =
DagRunServiceWaitDagRunUntilFinishedDefaultResponse, TError = unknown> =
UseQueryResult<TData, TError>;
export const useDagRunServiceWaitDagRunUntilFinishedKey =
"DagRunServiceWaitDagRunUntilFinished";
@@ -1034,8 +1034,8 @@ export type
BackfillServiceCreateBackfillDryRunMutationResult = Awaited<ReturnTy
export type ConnectionServicePostConnectionMutationResult =
Awaited<ReturnType<typeof ConnectionService.postConnection>>;
export type ConnectionServiceTestConnectionMutationResult =
Awaited<ReturnType<typeof ConnectionService.testConnection>>;
export type ConnectionServiceCreateDefaultConnectionsMutationResult =
Awaited<ReturnType<typeof ConnectionService.createDefaultConnections>>;
-export type DagRunServiceClearDagRunMutationResult = Awaited<ReturnType<typeof
DagRunService.clearDagRun>>;
export type DagRunServiceTriggerDagRunMutationResult =
Awaited<ReturnType<typeof DagRunService.triggerDagRun>>;
+export type DagRunServiceClearDagRunMutationResult = Awaited<ReturnType<typeof
DagRunService.clearDagRun>>;
export type DagRunServiceGetListDagRunsBatchMutationResult =
Awaited<ReturnType<typeof DagRunService.getListDagRunsBatch>>;
export type DagServiceFavoriteDagMutationResult = Awaited<ReturnType<typeof
DagService.favoriteDag>>;
export type DagServiceUnfavoriteDagMutationResult = Awaited<ReturnType<typeof
DagService.unfavoriteDag>>;
@@ -1054,6 +1054,7 @@ export type DagParsingServiceReparseDagFileMutationResult
= Awaited<ReturnType<t
export type ConnectionServicePatchConnectionMutationResult =
Awaited<ReturnType<typeof ConnectionService.patchConnection>>;
export type ConnectionServiceBulkConnectionsMutationResult =
Awaited<ReturnType<typeof ConnectionService.bulkConnections>>;
export type DagRunServicePatchDagRunMutationResult = Awaited<ReturnType<typeof
DagRunService.patchDagRun>>;
+export type DagRunServiceBulkDagRunsMutationResult = Awaited<ReturnType<typeof
DagRunService.bulkDagRuns>>;
export type DagServicePatchDagsMutationResult = Awaited<ReturnType<typeof
DagService.patchDags>>;
export type DagServicePatchDagMutationResult = Awaited<ReturnType<typeof
DagService.patchDag>>;
export type TaskInstanceServicePatchTaskInstanceMutationResult =
Awaited<ReturnType<typeof TaskInstanceService.patchTaskInstance>>;
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
index 32d74fed6e9..f124d321a1f 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
@@ -264,19 +264,6 @@ export const ensureUseDagRunServiceGetDagRunData =
(queryClient: QueryClient, {
dagRunId: string;
}) => queryClient.ensureQueryData({ queryKey:
Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }), queryFn: () =>
DagRunService.getDagRun({ dagId, dagRunId }) });
/**
-* Get Upstream Asset Events
-* If dag run is asset-triggered, return the asset events that triggered it.
-* @param data The data for the request.
-* @param data.dagId
-* @param data.dagRunId
-* @returns AssetEventCollectionResponse Successful Response
-* @throws ApiError
-*/
-export const ensureUseDagRunServiceGetUpstreamAssetEventsData = (queryClient:
QueryClient, { dagId, dagRunId }: {
- dagId: string;
- dagRunId: string;
-}) => queryClient.ensureQueryData({ queryKey:
Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn({ dagId, dagRunId }),
queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }) });
-/**
* Get Dag Runs
* Get all Dag Runs.
*
@@ -391,6 +378,19 @@ export const ensureUseDagRunServiceGetDagRunsData =
(queryClient: QueryClient, {
updatedAtLte?: string;
}) => queryClient.ensureQueryData({ queryKey:
Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains,
consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern,
dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt,
endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte,
logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern,
partitionKeyPrefixPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte,
runIdPattern [...]
/**
+* Get Upstream Asset Events
+* If dag run is asset-triggered, return the asset events that triggered it.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @returns AssetEventCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const ensureUseDagRunServiceGetUpstreamAssetEventsData = (queryClient:
QueryClient, { dagId, dagRunId }: {
+ dagId: string;
+ dagRunId: string;
+}) => queryClient.ensureQueryData({ queryKey:
Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn({ dagId, dagRunId }),
queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }) });
+/**
* Experimental: Wait for a dag run to complete, and return task results if
requested.
* 🚧 This is an experimental endpoint and may change or be removed without
notice.Successful response are streamed as newline-delimited JSON (NDJSON).
Each line is a JSON object representing the Dag run state.
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
index ae012539160..5bdcfe667b2 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -264,19 +264,6 @@ export const prefetchUseDagRunServiceGetDagRun =
(queryClient: QueryClient, { da
dagRunId: string;
}) => queryClient.prefetchQuery({ queryKey:
Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }), queryFn: () =>
DagRunService.getDagRun({ dagId, dagRunId }) });
/**
-* Get Upstream Asset Events
-* If dag run is asset-triggered, return the asset events that triggered it.
-* @param data The data for the request.
-* @param data.dagId
-* @param data.dagRunId
-* @returns AssetEventCollectionResponse Successful Response
-* @throws ApiError
-*/
-export const prefetchUseDagRunServiceGetUpstreamAssetEvents = (queryClient:
QueryClient, { dagId, dagRunId }: {
- dagId: string;
- dagRunId: string;
-}) => queryClient.prefetchQuery({ queryKey:
Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn({ dagId, dagRunId }),
queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }) });
-/**
* Get Dag Runs
* Get all Dag Runs.
*
@@ -391,6 +378,19 @@ export const prefetchUseDagRunServiceGetDagRuns =
(queryClient: QueryClient, { b
updatedAtLte?: string;
}) => queryClient.prefetchQuery({ queryKey:
Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains,
consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern,
dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt,
endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte,
logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern,
partitionKeyPrefixPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte,
runIdPattern, [...]
/**
+* Get Upstream Asset Events
+* If dag run is asset-triggered, return the asset events that triggered it.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @returns AssetEventCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const prefetchUseDagRunServiceGetUpstreamAssetEvents = (queryClient:
QueryClient, { dagId, dagRunId }: {
+ dagId: string;
+ dagRunId: string;
+}) => queryClient.prefetchQuery({ queryKey:
Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn({ dagId, dagRunId }),
queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId, dagRunId }) });
+/**
* Experimental: Wait for a dag run to complete, and return task results if
requested.
* 🚧 This is an experimental endpoint and may change or be removed without
notice.Successful response are streamed as newline-delimited JSON (NDJSON).
Each line is a JSON object representing the Dag run state.
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index 17ee1009ae0..8c0976ec328 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -2,7 +2,7 @@
import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from
"@tanstack/react-query";
import { AssetService, AssetStateService, AuthLinksService, BackfillService,
CalendarService, ConfigService, ConnectionService, DagParsingService,
DagRunService, DagService, DagSourceService, DagStatsService,
DagVersionService, DagWarningService, DashboardService, DeadlinesService,
DependenciesService, EventLogService, ExperimentalService, ExtraLinksService,
GanttService, GridService, ImportErrorService, JobService, LoginService,
MonitorService, PartitionedDagRunService, PluginService, P [...]
-import { AssetStateBody, BackfillPostBody, BulkBody_BulkTaskInstanceBody_,
BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_,
ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody,
DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState,
DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody,
PoolBody, PoolPatchBody, TaskInstancesBatchBody, TaskStateBody,
TriggerDAGRunPostBody, UpdateHITLDetailPayload, VariableBody, [...]
+import { AssetStateBody, BackfillPostBody, BulkBody_BulkDAGRunBody_,
BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_,
BulkBody_VariableBody_, ClearTaskInstancesBody, ConnectionBody,
CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody,
DAGRunsBatchBody, DagRunState, DagWarningType, GenerateTokenBody,
MaterializeAssetBody, PatchTaskInstanceBody, PoolBody, PoolPatchBody,
TaskInstancesBatchBody, TaskStateBody, TriggerDAGRunPostBody, UpdateHITLDe [...]
import * as Common from "./common";
/**
* Get Assets
@@ -264,19 +264,6 @@ export const useDagRunServiceGetDagRun = <TData =
Common.DagRunServiceGetDagRunD
dagRunId: string;
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }, queryKey), queryFn:
() => DagRunService.getDagRun({ dagId, dagRunId }) as TData, ...options });
/**
-* Get Upstream Asset Events
-* If dag run is asset-triggered, return the asset events that triggered it.
-* @param data The data for the request.
-* @param data.dagId
-* @param data.dagRunId
-* @returns AssetEventCollectionResponse Successful Response
-* @throws ApiError
-*/
-export const useDagRunServiceGetUpstreamAssetEvents = <TData =
Common.DagRunServiceGetUpstreamAssetEventsDefaultResponse, TError = unknown,
TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId }: {
- dagId: string;
- dagRunId: string;
-}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn({ dagId, dagRunId },
queryKey), queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId,
dagRunId }) as TData, ...options });
-/**
* Get Dag Runs
* Get all Dag Runs.
*
@@ -391,6 +378,19 @@ export const useDagRunServiceGetDagRuns = <TData =
Common.DagRunServiceGetDagRun
updatedAtLte?: string;
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains,
consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern,
dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt,
endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte,
logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, [...]
/**
+* Get Upstream Asset Events
+* If dag run is asset-triggered, return the asset events that triggered it.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @returns AssetEventCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const useDagRunServiceGetUpstreamAssetEvents = <TData =
Common.DagRunServiceGetUpstreamAssetEventsDefaultResponse, TError = unknown,
TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId }: {
+ dagId: string;
+ dagRunId: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn({ dagId, dagRunId },
queryKey), queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId,
dagRunId }) as TData, ...options });
+/**
* Experimental: Wait for a dag run to complete, and return task results if
requested.
* 🚧 This is an experimental endpoint and may change or be removed without
notice.Successful response are streamed as newline-delimited JSON (NDJSON).
Each line is a JSON object representing the Dag run state.
* @param data The data for the request.
@@ -2160,6 +2160,22 @@ export const useConnectionServiceTestConnection = <TData
= Common.ConnectionServ
*/
export const useConnectionServiceCreateDefaultConnections = <TData =
Common.ConnectionServiceCreateDefaultConnectionsMutationResult, TError =
unknown, TContext = unknown>(options?: Omit<UseMutationOptions<TData, TError,
void, TContext>, "mutationFn">) => useMutation<TData, TError, void, TContext>({
mutationFn: () => ConnectionService.createDefaultConnections() as unknown as
Promise<TData>, ...options });
/**
+* Trigger Dag Run
+* Trigger a Dag.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.requestBody
+* @returns DAGRunResponse Successful Response
+* @throws ApiError
+*/
+export const useDagRunServiceTriggerDagRun = <TData =
Common.DagRunServiceTriggerDagRunMutationResult, TError = unknown, TContext =
unknown>(options?: Omit<UseMutationOptions<TData, TError, {
+ dagId: unknown;
+ requestBody: TriggerDAGRunPostBody;
+}, TContext>, "mutationFn">) => useMutation<TData, TError, {
+ dagId: unknown;
+ requestBody: TriggerDAGRunPostBody;
+}, TContext>({ mutationFn: ({ dagId, requestBody }) =>
DagRunService.triggerDagRun({ dagId, requestBody }) as unknown as
Promise<TData>, ...options });
+/**
* Clear Dag Run
* @param data The data for the request.
* @param data.dagId
@@ -2178,22 +2194,6 @@ export const useDagRunServiceClearDagRun = <TData =
Common.DagRunServiceClearDag
requestBody: DAGRunClearBody;
}, TContext>({ mutationFn: ({ dagId, dagRunId, requestBody }) =>
DagRunService.clearDagRun({ dagId, dagRunId, requestBody }) as unknown as
Promise<TData>, ...options });
/**
-* Trigger Dag Run
-* Trigger a Dag.
-* @param data The data for the request.
-* @param data.dagId
-* @param data.requestBody
-* @returns DAGRunResponse Successful Response
-* @throws ApiError
-*/
-export const useDagRunServiceTriggerDagRun = <TData =
Common.DagRunServiceTriggerDagRunMutationResult, TError = unknown, TContext =
unknown>(options?: Omit<UseMutationOptions<TData, TError, {
- dagId: unknown;
- requestBody: TriggerDAGRunPostBody;
-}, TContext>, "mutationFn">) => useMutation<TData, TError, {
- dagId: unknown;
- requestBody: TriggerDAGRunPostBody;
-}, TContext>({ mutationFn: ({ dagId, requestBody }) =>
DagRunService.triggerDagRun({ dagId, requestBody }) as unknown as
Promise<TData>, ...options });
-/**
* Get List Dag Runs Batch
* Get a list of Dag Runs.
* @param data The data for the request.
@@ -2482,6 +2482,22 @@ export const useDagRunServicePatchDagRun = <TData =
Common.DagRunServicePatchDag
updateMask?: string[];
}, TContext>({ mutationFn: ({ dagId, dagRunId, requestBody, updateMask }) =>
DagRunService.patchDagRun({ dagId, dagRunId, requestBody, updateMask }) as
unknown as Promise<TData>, ...options });
/**
+* Bulk Dag Runs
+* Bulk delete Dag Runs.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.requestBody
+* @returns BulkResponse Successful Response
+* @throws ApiError
+*/
+export const useDagRunServiceBulkDagRuns = <TData =
Common.DagRunServiceBulkDagRunsMutationResult, TError = unknown, TContext =
unknown>(options?: Omit<UseMutationOptions<TData, TError, {
+ dagId: string;
+ requestBody: BulkBody_BulkDAGRunBody_;
+}, TContext>, "mutationFn">) => useMutation<TData, TError, {
+ dagId: string;
+ requestBody: BulkBody_BulkDAGRunBody_;
+}, TContext>({ mutationFn: ({ dagId, requestBody }) =>
DagRunService.bulkDagRuns({ dagId, requestBody }) as unknown as Promise<TData>,
...options });
+/**
* Patch Dags
* Patch multiple Dags.
*
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
index 8cdfee041b6..e11772395f0 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
@@ -264,19 +264,6 @@ export const useDagRunServiceGetDagRunSuspense = <TData =
Common.DagRunServiceGe
dagRunId: string;
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }, queryKey), queryFn:
() => DagRunService.getDagRun({ dagId, dagRunId }) as TData, ...options });
/**
-* Get Upstream Asset Events
-* If dag run is asset-triggered, return the asset events that triggered it.
-* @param data The data for the request.
-* @param data.dagId
-* @param data.dagRunId
-* @returns AssetEventCollectionResponse Successful Response
-* @throws ApiError
-*/
-export const useDagRunServiceGetUpstreamAssetEventsSuspense = <TData =
Common.DagRunServiceGetUpstreamAssetEventsDefaultResponse, TError = unknown,
TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId }: {
- dagId: string;
- dagRunId: string;
-}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn({ dagId, dagRunId },
queryKey), queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId,
dagRunId }) as TData, ...options });
-/**
* Get Dag Runs
* Get all Dag Runs.
*
@@ -391,6 +378,19 @@ export const useDagRunServiceGetDagRunsSuspense = <TData =
Common.DagRunServiceG
updatedAtLte?: string;
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains,
consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern,
dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt,
endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte,
logicalDateLt, logicalDateLte, offset, orderBy, partitionKey [...]
/**
+* Get Upstream Asset Events
+* If dag run is asset-triggered, return the asset events that triggered it.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @returns AssetEventCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const useDagRunServiceGetUpstreamAssetEventsSuspense = <TData =
Common.DagRunServiceGetUpstreamAssetEventsDefaultResponse, TError = unknown,
TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId }: {
+ dagId: string;
+ dagRunId: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UseDagRunServiceGetUpstreamAssetEventsKeyFn({ dagId, dagRunId },
queryKey), queryFn: () => DagRunService.getUpstreamAssetEvents({ dagId,
dagRunId }) as TData, ...options });
+/**
* Experimental: Wait for a dag run to complete, and return task results if
requested.
* 🚧 This is an experimental endpoint and may change or be removed without
notice.Successful response are streamed as newline-delimited JSON (NDJSON).
Each line is a JSON object representing the Dag run state.
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 108480b3dd1..4a4b95f1830 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -684,6 +684,32 @@ The response includes a list of successful keys and any
errors encountered durin
This structure helps users understand which key actions succeeded and which
failed.`
} as const;
+export const $BulkBody_BulkDAGRunBody_ = {
+ properties: {
+ actions: {
+ items: {
+ oneOf: [
+ {
+ '$ref':
'#/components/schemas/BulkCreateAction_BulkDAGRunBody_'
+ },
+ {
+ '$ref':
'#/components/schemas/BulkUpdateAction_BulkDAGRunBody_'
+ },
+ {
+ '$ref':
'#/components/schemas/BulkDeleteAction_BulkDAGRunBody_'
+ }
+ ]
+ },
+ type: 'array',
+ title: 'Actions'
+ }
+ },
+ additionalProperties: false,
+ type: 'object',
+ required: ['actions'],
+ title: 'BulkBody[BulkDAGRunBody]'
+} as const;
+
export const $BulkBody_BulkTaskInstanceBody_ = {
properties: {
actions: {
@@ -788,6 +814,33 @@ export const $BulkBody_VariableBody_ = {
title: 'BulkBody[VariableBody]'
} as const;
+export const $BulkCreateAction_BulkDAGRunBody_ = {
+ properties: {
+ action: {
+ type: 'string',
+ const: 'create',
+ title: 'Action',
+ description: 'The action to be performed on the entities.'
+ },
+ entities: {
+ items: {
+ '$ref': '#/components/schemas/BulkDAGRunBody'
+ },
+ type: 'array',
+ title: 'Entities',
+ description: 'A list of entities to be created.'
+ },
+ action_on_existence: {
+ '$ref': '#/components/schemas/BulkActionOnExistence',
+ default: 'fail'
+ }
+ },
+ additionalProperties: false,
+ type: 'object',
+ required: ['action', 'entities'],
+ title: 'BulkCreateAction[BulkDAGRunBody]'
+} as const;
+
export const $BulkCreateAction_BulkTaskInstanceBody_ = {
properties: {
action: {
@@ -896,6 +949,65 @@ export const $BulkCreateAction_VariableBody_ = {
title: 'BulkCreateAction[VariableBody]'
} as const;
+export const $BulkDAGRunBody = {
+ properties: {
+ dag_run_id: {
+ type: 'string',
+ title: 'Dag Run Id'
+ },
+ dag_id: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Dag Id'
+ }
+ },
+ additionalProperties: false,
+ type: 'object',
+ required: ['dag_run_id'],
+ title: 'BulkDAGRunBody',
+ description: 'Request body for bulk delete operations on Dag Runs.'
+} as const;
+
+export const $BulkDeleteAction_BulkDAGRunBody_ = {
+ properties: {
+ action: {
+ type: 'string',
+ const: 'delete',
+ title: 'Action',
+ description: 'The action to be performed on the entities.'
+ },
+ entities: {
+ items: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ '$ref': '#/components/schemas/BulkDAGRunBody'
+ }
+ ]
+ },
+ type: 'array',
+ title: 'Entities',
+ description: 'A list of entity id/key or entity objects to be
deleted.'
+ },
+ action_on_non_existence: {
+ '$ref': '#/components/schemas/BulkActionNotOnExistence',
+ default: 'fail'
+ }
+ },
+ additionalProperties: false,
+ type: 'object',
+ required: ['action', 'entities'],
+ title: 'BulkDeleteAction[BulkDAGRunBody]'
+} as const;
+
export const $BulkDeleteAction_BulkTaskInstanceBody_ = {
properties: {
action: {
@@ -1166,6 +1278,48 @@ export const $BulkTaskInstanceBody = {
description: 'Request body for bulk update, and delete task instances.'
} as const;
+export const $BulkUpdateAction_BulkDAGRunBody_ = {
+ properties: {
+ action: {
+ type: 'string',
+ const: 'update',
+ title: 'Action',
+ description: 'The action to be performed on the entities.'
+ },
+ entities: {
+ items: {
+ '$ref': '#/components/schemas/BulkDAGRunBody'
+ },
+ type: 'array',
+ title: 'Entities',
+ description: 'A list of entities to be updated.'
+ },
+ update_mask: {
+ anyOf: [
+ {
+ items: {
+ type: 'string'
+ },
+ type: 'array'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Update Mask',
+ description: 'A list of field names to update for each entity.Only
these fields will be applied from the request body to the database model.Any
extra fields provided will be ignored.'
+ },
+ action_on_non_existence: {
+ '$ref': '#/components/schemas/BulkActionNotOnExistence',
+ default: 'fail'
+ }
+ },
+ additionalProperties: false,
+ type: 'object',
+ required: ['action', 'entities'],
+ title: 'BulkUpdateAction[BulkDAGRunBody]'
+} as const;
+
export const $BulkUpdateAction_BulkTaskInstanceBody_ = {
properties: {
action: {
@@ -2696,7 +2850,7 @@ export const $DAGRunPatchBody = {
state: {
anyOf: [
{
- '$ref': '#/components/schemas/DAGRunPatchStates'
+ '$ref': '#/components/schemas/DagRunMutableStates'
},
{
type: 'null'
@@ -2722,13 +2876,6 @@ export const $DAGRunPatchBody = {
description: 'Dag Run Serializer for PATCH requests.'
} as const;
-export const $DAGRunPatchStates = {
- type: 'string',
- enum: ['queued', 'success', 'failed'],
- title: 'DAGRunPatchStates',
- description: 'Enum for Dag Run states when updating a Dag Run.'
-} as const;
-
export const $DAGRunResponse = {
properties: {
dag_run_id: {
@@ -3488,6 +3635,13 @@ export const $DagRunAssetReference = {
description: 'DagRun serializer for asset responses.'
} as const;
+export const $DagRunMutableStates = {
+ type: 'string',
+ enum: ['queued', 'success', 'failed'],
+ title: 'DagRunMutableStates',
+ description: 'Dag Run states from which the run may be mutated (patched,
deleted).'
+} as const;
+
export const $DagRunState = {
type: 'string',
enum: ['queued', 'running', 'success', 'failed'],
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index bc1f4471470..75468184977 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -3,7 +3,7 @@
import type { CancelablePromise } from './core/CancelablePromise';
import { OpenAPI } from './core/OpenAPI';
import { request as __request } from './core/request';
-import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData,
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse,
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData,
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse,
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse,
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData,
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse,
Dele [...]
+import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData,
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse,
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData,
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse,
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse,
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData,
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse,
Dele [...]
export class AssetService {
/**
@@ -937,54 +937,26 @@ export class DagRunService {
}
/**
- * Get Upstream Asset Events
- * If dag run is asset-triggered, return the asset events that triggered
it.
+ * Bulk Dag Runs
+ * Bulk delete Dag Runs.
* @param data The data for the request.
* @param data.dagId
- * @param data.dagRunId
- * @returns AssetEventCollectionResponse Successful Response
- * @throws ApiError
- */
- public static getUpstreamAssetEvents(data: GetUpstreamAssetEventsData):
CancelablePromise<GetUpstreamAssetEventsResponse> {
- return __request(OpenAPI, {
- method: 'GET',
- url:
'/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents',
- path: {
- dag_id: data.dagId,
- dag_run_id: data.dagRunId
- },
- errors: {
- 401: 'Unauthorized',
- 403: 'Forbidden',
- 404: 'Not Found',
- 422: 'Validation Error'
- }
- });
- }
-
- /**
- * Clear Dag Run
- * @param data The data for the request.
- * @param data.dagId
- * @param data.dagRunId
* @param data.requestBody
- * @returns unknown Successful Response
+ * @returns BulkResponse Successful Response
* @throws ApiError
*/
- public static clearDagRun(data: ClearDagRunData):
CancelablePromise<ClearDagRunResponse> {
+ public static bulkDagRuns(data: BulkDagRunsData):
CancelablePromise<BulkDagRunsResponse> {
return __request(OpenAPI, {
- method: 'POST',
- url: '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear',
+ method: 'PATCH',
+ url: '/api/v2/dags/{dag_id}/dagRuns',
path: {
- dag_id: data.dagId,
- dag_run_id: data.dagRunId
+ dag_id: data.dagId
},
body: data.requestBody,
mediaType: 'application/json',
errors: {
401: 'Unauthorized',
403: 'Forbidden',
- 404: 'Not Found',
422: 'Validation Error'
}
});
@@ -1148,6 +1120,60 @@ export class DagRunService {
});
}
+ /**
+ * Get Upstream Asset Events
+ * If dag run is asset-triggered, return the asset events that triggered
it.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @returns AssetEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+ public static getUpstreamAssetEvents(data: GetUpstreamAssetEventsData):
CancelablePromise<GetUpstreamAssetEventsResponse> {
+ return __request(OpenAPI, {
+ method: 'GET',
+ url:
'/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents',
+ path: {
+ dag_id: data.dagId,
+ dag_run_id: data.dagRunId
+ },
+ errors: {
+ 401: 'Unauthorized',
+ 403: 'Forbidden',
+ 404: 'Not Found',
+ 422: 'Validation Error'
+ }
+ });
+ }
+
+ /**
+ * Clear Dag Run
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.requestBody
+ * @returns unknown Successful Response
+ * @throws ApiError
+ */
+ public static clearDagRun(data: ClearDagRunData):
CancelablePromise<ClearDagRunResponse> {
+ return __request(OpenAPI, {
+ method: 'POST',
+ url: '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear',
+ path: {
+ dag_id: data.dagId,
+ dag_run_id: data.dagRunId
+ },
+ body: data.requestBody,
+ mediaType: 'application/json',
+ errors: {
+ 401: 'Unauthorized',
+ 403: 'Forbidden',
+ 404: 'Not Found',
+ 422: 'Validation Error'
+ }
+ });
+ }
+
/**
* Experimental: Wait for a dag run to complete, and return task results
if requested.
* 🚧 This is an experimental endpoint and may change or be removed without
notice.Successful response are streamed as newline-delimited JSON (NDJSON).
Each line is a JSON object representing the Dag run state.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 1ea5961b919..70c9fa131c1 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -213,6 +213,10 @@ export type BulkActionResponse = {
}>;
};
+export type BulkBody_BulkDAGRunBody_ = {
+ actions: Array<(BulkCreateAction_BulkDAGRunBody_ |
BulkUpdateAction_BulkDAGRunBody_ | BulkDeleteAction_BulkDAGRunBody_)>;
+};
+
export type BulkBody_BulkTaskInstanceBody_ = {
actions: Array<(BulkCreateAction_BulkTaskInstanceBody_ |
BulkUpdateAction_BulkTaskInstanceBody_ |
BulkDeleteAction_BulkTaskInstanceBody_)>;
};
@@ -229,6 +233,18 @@ export type BulkBody_VariableBody_ = {
actions: Array<(BulkCreateAction_VariableBody_ |
BulkUpdateAction_VariableBody_ | BulkDeleteAction_VariableBody_)>;
};
+export type BulkCreateAction_BulkDAGRunBody_ = {
+ /**
+ * The action to be performed on the entities.
+ */
+ action: "create";
+ /**
+ * A list of entities to be created.
+ */
+ entities: Array<BulkDAGRunBody>;
+ action_on_existence?: BulkActionOnExistence;
+};
+
export type BulkCreateAction_BulkTaskInstanceBody_ = {
/**
* The action to be performed on the entities.
@@ -277,6 +293,26 @@ export type BulkCreateAction_VariableBody_ = {
action_on_existence?: BulkActionOnExistence;
};
+/**
+ * Request body for bulk delete operations on Dag Runs.
+ */
+export type BulkDAGRunBody = {
+ dag_run_id: string;
+ dag_id?: string | null;
+};
+
+export type BulkDeleteAction_BulkDAGRunBody_ = {
+ /**
+ * The action to be performed on the entities.
+ */
+ action: "delete";
+ /**
+ * A list of entity id/key or entity objects to be deleted.
+ */
+ entities: Array<(string | BulkDAGRunBody)>;
+ action_on_non_existence?: BulkActionNotOnExistence;
+};
+
export type BulkDeleteAction_BulkTaskInstanceBody_ = {
/**
* The action to be performed on the entities.
@@ -363,6 +399,22 @@ export type BulkTaskInstanceBody = {
dag_run_id?: string | null;
};
+export type BulkUpdateAction_BulkDAGRunBody_ = {
+ /**
+ * The action to be performed on the entities.
+ */
+ action: "update";
+ /**
+ * A list of entities to be updated.
+ */
+ entities: Array<BulkDAGRunBody>;
+ /**
+ * A list of field names to update for each entity.Only these fields will
be applied from the request body to the database model.Any extra fields
provided will be ignored.
+ */
+ update_mask?: Array<(string)> | null;
+ action_on_non_existence?: BulkActionNotOnExistence;
+};
+
export type BulkUpdateAction_BulkTaskInstanceBody_ = {
/**
* The action to be performed on the entities.
@@ -731,15 +783,10 @@ export type DAGRunCollectionResponse = {
* Dag Run Serializer for PATCH requests.
*/
export type DAGRunPatchBody = {
- state?: DAGRunPatchStates | null;
+ state?: DagRunMutableStates | null;
note?: string | null;
};
-/**
- * Enum for Dag Run states when updating a Dag Run.
- */
-export type DAGRunPatchStates = 'queued' | 'success' | 'failed';
-
/**
* Dag Run serializer for responses.
*/
@@ -869,6 +916,11 @@ export type DagRunAssetReference = {
partition_key: string | null;
};
+/**
+ * Dag Run states from which the run may be mutated (patched, deleted).
+ */
+export type DagRunMutableStates = 'queued' | 'success' | 'failed';
+
/**
* All possible states that a DagRun can be in.
*
@@ -2746,20 +2798,12 @@ export type PatchDagRunData = {
export type PatchDagRunResponse = DAGRunResponse;
-export type GetUpstreamAssetEventsData = {
+export type BulkDagRunsData = {
dagId: string;
- dagRunId: string;
+ requestBody: BulkBody_BulkDAGRunBody_;
};
-export type GetUpstreamAssetEventsResponse = AssetEventCollectionResponse;
-
-export type ClearDagRunData = {
- dagId: string;
- dagRunId: string;
- requestBody: DAGRunClearBody;
-};
-
-export type ClearDagRunResponse = ClearTaskInstanceCollectionResponse |
DAGRunResponse;
+export type BulkDagRunsResponse = BulkResponse;
export type GetDagRunsData = {
bundleVersion?: string | null;
@@ -2857,6 +2901,21 @@ export type TriggerDagRunData = {
export type TriggerDagRunResponse = DAGRunResponse;
+export type GetUpstreamAssetEventsData = {
+ dagId: string;
+ dagRunId: string;
+};
+
+export type GetUpstreamAssetEventsResponse = AssetEventCollectionResponse;
+
+export type ClearDagRunData = {
+ dagId: string;
+ dagRunId: string;
+ requestBody: DAGRunClearBody;
+};
+
+export type ClearDagRunResponse = ClearTaskInstanceCollectionResponse |
DAGRunResponse;
+
export type WaitDagRunUntilFinishedData = {
dagId: string;
dagRunId: string;
@@ -5201,14 +5260,35 @@ export type $OpenApiTs = {
};
};
};
- '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents': {
+ '/api/v2/dags/{dag_id}/dagRuns': {
+ patch: {
+ req: BulkDagRunsData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: BulkResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
get: {
- req: GetUpstreamAssetEventsData;
+ req: GetDagRunsData;
res: {
/**
* Successful Response
*/
- 200: AssetEventCollectionResponse;
+ 200: DAGRunCollectionResponse;
/**
* Unauthorized
*/
@@ -5227,15 +5307,17 @@ export type $OpenApiTs = {
422: HTTPValidationError;
};
};
- };
- '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear': {
post: {
- req: ClearDagRunData;
+ req: TriggerDagRunData;
res: {
/**
* Successful Response
*/
- 200: ClearTaskInstanceCollectionResponse | DAGRunResponse;
+ 200: DAGRunResponse;
+ /**
+ * Bad Request
+ */
+ 400: HTTPExceptionResponse;
/**
* Unauthorized
*/
@@ -5248,6 +5330,10 @@ export type $OpenApiTs = {
* Not Found
*/
404: HTTPExceptionResponse;
+ /**
+ * Conflict
+ */
+ 409: HTTPExceptionResponse;
/**
* Validation Error
*/
@@ -5255,14 +5341,14 @@ export type $OpenApiTs = {
};
};
};
- '/api/v2/dags/{dag_id}/dagRuns': {
+ '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents': {
get: {
- req: GetDagRunsData;
+ req: GetUpstreamAssetEventsData;
res: {
/**
* Successful Response
*/
- 200: DAGRunCollectionResponse;
+ 200: AssetEventCollectionResponse;
/**
* Unauthorized
*/
@@ -5281,17 +5367,15 @@ export type $OpenApiTs = {
422: HTTPValidationError;
};
};
+ };
+ '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear': {
post: {
- req: TriggerDagRunData;
+ req: ClearDagRunData;
res: {
/**
* Successful Response
*/
- 200: DAGRunResponse;
- /**
- * Bad Request
- */
- 400: HTTPExceptionResponse;
+ 200: ClearTaskInstanceCollectionResponse | DAGRunResponse;
/**
* Unauthorized
*/
@@ -5304,10 +5388,6 @@ export type $OpenApiTs = {
* Not Found
*/
404: HTTPExceptionResponse;
- /**
- * Conflict
- */
- 409: HTTPExceptionResponse;
/**
* Validation Error
*/
diff --git
a/airflow-core/src/airflow/ui/src/components/MarkAs/Run/MarkRunAsButton.tsx
b/airflow-core/src/airflow/ui/src/components/MarkAs/Run/MarkRunAsButton.tsx
index 975691c9657..7268e837b71 100644
--- a/airflow-core/src/airflow/ui/src/components/MarkAs/Run/MarkRunAsButton.tsx
+++ b/airflow-core/src/airflow/ui/src/components/MarkAs/Run/MarkRunAsButton.tsx
@@ -23,7 +23,7 @@ import { useTranslation } from "react-i18next";
import { FiX } from "react-icons/fi";
import { LuCheck } from "react-icons/lu";
-import type { DAGRunPatchStates, DAGRunResponse } from
"openapi/requests/types.gen";
+import type { DagRunMutableStates, DAGRunResponse } from
"openapi/requests/types.gen";
import { StateBadge } from "src/components/StateBadge";
import { IconButton, Menu, Tooltip } from "src/components/ui";
@@ -37,7 +37,7 @@ type Props = {
const MarkRunAsButton = ({ dagRun, isHotkeyEnabled = false }: Props) => {
const { onClose, onOpen, open } = useDisclosure();
- const [state, setState] = useState<DAGRunPatchStates>("success");
+ const [state, setState] = useState<DagRunMutableStates>("success");
const { t: translate } = useTranslation();
useHotkeys(
diff --git
a/airflow-core/src/airflow/ui/src/components/MarkAs/Run/MarkRunAsDialog.tsx
b/airflow-core/src/airflow/ui/src/components/MarkAs/Run/MarkRunAsDialog.tsx
index bff16fcde0a..0c01cb01981 100644
--- a/airflow-core/src/airflow/ui/src/components/MarkAs/Run/MarkRunAsDialog.tsx
+++ b/airflow-core/src/airflow/ui/src/components/MarkAs/Run/MarkRunAsDialog.tsx
@@ -20,7 +20,7 @@ import { Button, Flex, Heading, VStack } from
"@chakra-ui/react";
import { useState } from "react";
import { useTranslation } from "react-i18next";
-import type { DAGRunPatchStates, DAGRunResponse } from
"openapi/requests/types.gen";
+import type { DagRunMutableStates, DAGRunResponse } from
"openapi/requests/types.gen";
import { ActionAccordion } from "src/components/ActionAccordion";
import { StateBadge } from "src/components/StateBadge";
import { Dialog } from "src/components/ui";
@@ -30,7 +30,7 @@ type Props = {
readonly dagRun: DAGRunResponse;
readonly onClose: () => void;
readonly open: boolean;
- readonly state: DAGRunPatchStates;
+ readonly state: DagRunMutableStates;
};
const MarkRunAsDialog = ({ dagRun, onClose, open, state }: Props) => {
diff --git a/airflow-core/src/airflow/ui/src/components/MarkAs/utils.ts
b/airflow-core/src/airflow/ui/src/components/MarkAs/utils.ts
index 14245602bfb..5d62fd27b94 100644
--- a/airflow-core/src/airflow/ui/src/components/MarkAs/utils.ts
+++ b/airflow-core/src/airflow/ui/src/components/MarkAs/utils.ts
@@ -16,6 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-import type { DAGRunPatchStates } from "openapi/requests/types.gen";
+import type { DagRunMutableStates } from "openapi/requests/types.gen";
-export const allowedStates: Array<DAGRunPatchStates> = ["success", "failed"];
+export const allowedStates: Array<DagRunMutableStates> = ["success", "failed"];
diff --git
a/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkDeleteDagRunsButton.tsx
b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkDeleteDagRunsButton.tsx
new file mode 100644
index 00000000000..e1a4b9c5c35
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/BulkDeleteDagRunsButton.tsx
@@ -0,0 +1,176 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Box, Button, Flex, Heading, Text, useDisclosure, VStack } from
"@chakra-ui/react";
+import type { ColumnDef } from "@tanstack/react-table";
+import type { TFunction } from "i18next";
+import { useTranslation } from "react-i18next";
+import { FiTrash2 } from "react-icons/fi";
+
+import type { DAGRunResponse } from "openapi/requests/types.gen";
+import { DataTable } from "src/components/DataTable";
+import { ErrorAlert } from "src/components/ErrorAlert";
+import { StateBadge } from "src/components/StateBadge";
+import Time from "src/components/Time";
+import { Accordion, Dialog } from "src/components/ui";
+import { useBulkDeleteDagRuns } from "src/queries/useBulkDeleteDagRuns";
+
+type Props = {
+ readonly clearSelections: VoidFunction;
+ readonly selectedDagRuns: Array<DAGRunResponse>;
+};
+
+const getColumns = (translate: TFunction): Array<ColumnDef<DAGRunResponse>> =>
[
+ {
+ accessorKey: "dag_run_id",
+ cell: ({ row: { original } }) => <Text>{original.dag_run_id}</Text>,
+ enableSorting: false,
+ header: translate("dagRunId"),
+ },
+ {
+ accessorKey: "state",
+ cell: ({ row: { original } }) => (
+ <StateBadge
state={original.state}>{translate(`common:states.${original.state}`)}</StateBadge>
+ ),
+ enableSorting: false,
+ header: translate("state"),
+ },
+ {
+ accessorKey: "run_after",
+ cell: ({ row: { original } }) => <Time datetime={original.run_after} />,
+ enableSorting: false,
+ header: translate("dagRun.runAfter"),
+ },
+];
+
+const BulkDeleteDagRunsButton = ({ clearSelections, selectedDagRuns }: Props)
=> {
+ const { t: translate } = useTranslation(["common", "dags"]);
+ const { onClose, onOpen, open } = useDisclosure();
+ const { bulkAction, error, isPending } = useBulkDeleteDagRuns({
+ clearSelections,
+ onSuccessConfirm: onClose,
+ });
+
+ const columns = getColumns(translate);
+
+ const byDagId = new Map<string, Array<DAGRunResponse>>();
+
+ for (const dagRun of selectedDagRuns) {
+ const group = byDagId.get(dagRun.dag_id) ?? [];
+
+ group.push(dagRun);
+ byDagId.set(dagRun.dag_id, group);
+ }
+
+ const isGrouped = byDagId.size > 1;
+
+ return (
+ <>
+ <Button colorPalette="danger" onClick={onOpen} size="sm"
variant="outline">
+ <FiTrash2 />
+ {translate("dags:runAndTaskActions.delete.button", { type:
translate("dagRun_other") })}
+ </Button>
+
+ <Dialog.Root onOpenChange={onClose} open={open} size="xl">
+ <Dialog.Content backdrop>
+ <Dialog.Header>
+ <VStack align="start" gap={4}>
+ <Heading size="xl">
+ {translate("dags:runAndTaskActions.delete.dialog.title", {
+ type: translate("dagRun_other"),
+ })}
+ </Heading>
+ </VStack>
+ </Dialog.Header>
+
+ <Dialog.CloseTrigger />
+ <Dialog.Body width="full">
+ <Text color="fg.subtle" fontSize="sm" mb={4}>
+ {translate("dags:runAndTaskActions.delete.dialog.warning", {
+ type: translate("dagRun_other"),
+ })}
+ </Text>
+
+ <Box maxH="400px" overflowY="auto">
+ {isGrouped ? (
+ <Accordion.Root collapsible multiple variant="enclosed">
+ {[...byDagId.entries()].map(([dagId, dagRuns]) => (
+ <Accordion.Item key={dagId} value={dagId}>
+ <Accordion.ItemTrigger>
+ <Text fontSize="sm" fontWeight="semibold">
+ {translate("dagId")}: {dagId}{" "}
+ <Text as="span" color="fg.subtle"
fontWeight="normal">
+ ({dagRuns.length})
+ </Text>
+ </Text>
+ </Accordion.ItemTrigger>
+ <Accordion.ItemContent>
+ <DataTable
+ columns={columns}
+ data={dagRuns}
+ displayMode="table"
+ modelName="common:dagRun"
+ total={dagRuns.length}
+ />
+ </Accordion.ItemContent>
+ </Accordion.Item>
+ ))}
+ </Accordion.Root>
+ ) : (
+ <DataTable
+ columns={columns}
+ data={selectedDagRuns}
+ displayMode="table"
+ modelName="common:dagRun"
+ total={selectedDagRuns.length}
+ />
+ )}
+ </Box>
+
+ <ErrorAlert error={error} />
+ <Flex justifyContent="end" mt={3}>
+ <Button
+ colorPalette="danger"
+ loading={isPending}
+ onClick={() => {
+ bulkAction({
+ actions: [
+ {
+ action: "delete" as const,
+ action_on_non_existence: "skip",
+ entities: selectedDagRuns.map((dagRun) => ({
+ dag_id: dagRun.dag_id,
+ dag_run_id: dagRun.dag_run_id,
+ })),
+ },
+ ],
+ });
+ }}
+ >
+ <FiTrash2 />
+ <Text fontWeight="bold">{translate("modal.confirm")}</Text>
+ </Button>
+ </Flex>
+ </Dialog.Body>
+ </Dialog.Content>
+ </Dialog.Root>
+ </>
+ );
+};
+
+export default BulkDeleteDagRunsButton;
diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.test.tsx
b/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.test.tsx
similarity index 100%
rename from airflow-core/src/airflow/ui/src/pages/DagRuns.test.tsx
rename to airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.test.tsx
diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx
b/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.tsx
similarity index 82%
rename from airflow-core/src/airflow/ui/src/pages/DagRuns.tsx
rename to airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.tsx
index bc1aa2f3d14..217a9e62d98 100644
--- a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRuns.tsx
@@ -27,6 +27,7 @@ import type { DAGRunResponse } from
"openapi/requests/types.gen";
import { ClearRunButton } from "src/components/Clear";
import { DagVersion } from "src/components/DagVersion";
import { DataTable } from "src/components/DataTable";
+import { useRowSelection, type GetColumnsParams } from
"src/components/DataTable/useRowSelection";
import { useTableURLState } from "src/components/DataTable/useTableUrlState";
import { ErrorAlert } from "src/components/ErrorAlert";
import { LimitedItemsList } from "src/components/LimitedItemsList";
@@ -37,12 +38,18 @@ import { StateBadge } from "src/components/StateBadge";
import Time from "src/components/Time";
import { TruncatedText } from "src/components/TruncatedText";
import { RouterLink } from "src/components/ui";
+import { ActionBar } from "src/components/ui/ActionBar";
+import { Checkbox } from "src/components/ui/Checkbox";
import { SearchParamsKeys, type SearchParamsKeysType } from
"src/constants/searchParams";
import { useAdvancedSearchArg } from "src/hooks/useAdvancedSearch";
-import { DagRunsFilters } from "src/pages/DagRunsFilters";
-import DeleteRunButton from "src/pages/DeleteRunButton";
import { renderDuration, useAutoRefresh, isStatePending } from "src/utils";
+import BulkDeleteDagRunsButton from "./BulkDeleteDagRunsButton";
+import { DagRunsFilters } from "./DagRunsFilters";
+import DeleteRunButton from "./DeleteRunButton";
+
+const getRowKey = (dagRun: DAGRunResponse) =>
`${dagRun.dag_id}:${dagRun.dag_run_id}`;
+
type DagRunRow = { row: { original: DAGRunResponse } };
const {
BUNDLE_VERSION: BUNDLE_VERSION_PARAM,
@@ -67,7 +74,43 @@ const {
TRIGGERING_USER_NAME_PATTERN: TRIGGERING_USER_NAME_PATTERN_PARAM,
}: SearchParamsKeysType = SearchParamsKeys;
-const runColumns = (translate: TFunction, dagId?: string):
Array<ColumnDef<DAGRunResponse>> => [
+type ColumnProps = {
+ readonly dagId?: string;
+ readonly translate: TFunction;
+} & GetColumnsParams;
+
+const runColumns = ({
+ allRowsSelected,
+ dagId,
+ onRowSelect,
+ onSelectAll,
+ selectedRows,
+ translate,
+}: ColumnProps): Array<ColumnDef<DAGRunResponse>> => [
+ {
+ accessorKey: "select",
+ cell: ({ row }) => (
+ <Checkbox
+ borderWidth={1}
+ checked={selectedRows.get(getRowKey(row.original))}
+ colorPalette="brand"
+ onCheckedChange={(event) => onRowSelect(getRowKey(row.original),
Boolean(event.checked))}
+ />
+ ),
+ enableHiding: false,
+ enableSorting: false,
+ header: () => (
+ <Checkbox
+ borderWidth={1}
+ checked={allRowsSelected}
+ colorPalette="brand"
+ onCheckedChange={(event) => onSelectAll(Boolean(event.checked))}
+ />
+ ),
+ meta: {
+ skeletonWidth: 10,
+ },
+ },
...(Boolean(dagId)
? []
: [
@@ -287,11 +330,27 @@ export const DagRuns = () => {
},
);
- const columns = runColumns(translate, dagId);
-
const nextCursor = data?.next_cursor ?? undefined;
const previousCursor = data?.previous_cursor ?? undefined;
+ const { allRowsSelected, clearSelections, handleRowSelect, handleSelectAll,
selectedRows } =
+ useRowSelection({
+ data: data?.dag_runs,
+ getKey: getRowKey,
+ });
+
+ const selectedDagRuns = (data?.dag_runs ?? []).filter((dagRun) =>
selectedRows.has(getRowKey(dagRun)));
+
+ const columns = runColumns({
+ allRowsSelected,
+ dagId,
+ multiTeam: false,
+ onRowSelect: handleRowSelect,
+ onSelectAll: handleSelectAll,
+ selectedRows,
+ translate,
+ });
+
return (
<>
<DagRunsFilters dagId={dagId} />
@@ -306,6 +365,16 @@ export const DagRuns = () => {
onStateChange={setTableURLState}
previousCursor={previousCursor}
/>
+ <ActionBar.Root closeOnInteractOutside={false}
open={Boolean(selectedRows.size)}>
+ <ActionBar.Content>
+ <ActionBar.SelectionTrigger>
+ {selectedRows.size} {translate("selected")}
+ </ActionBar.SelectionTrigger>
+ <ActionBar.Separator />
+ <BulkDeleteDagRunsButton clearSelections={clearSelections}
selectedDagRuns={selectedDagRuns} />
+ <ActionBar.CloseTrigger onClick={clearSelections} />
+ </ActionBar.Content>
+ </ActionBar.Root>
</>
);
};
diff --git a/airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx
b/airflow-core/src/airflow/ui/src/pages/DagRuns/DagRunsFilters.tsx
similarity index 100%
rename from airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx
rename to airflow-core/src/airflow/ui/src/pages/DagRuns/DagRunsFilters.tsx
diff --git a/airflow-core/src/airflow/ui/src/pages/DeleteRunButton.tsx
b/airflow-core/src/airflow/ui/src/pages/DagRuns/DeleteRunButton.tsx
similarity index 100%
rename from airflow-core/src/airflow/ui/src/pages/DeleteRunButton.tsx
rename to airflow-core/src/airflow/ui/src/pages/DagRuns/DeleteRunButton.tsx
diff --git a/airflow-core/src/airflow/ui/src/components/MarkAs/utils.ts
b/airflow-core/src/airflow/ui/src/pages/DagRuns/index.ts
similarity index 84%
copy from airflow-core/src/airflow/ui/src/components/MarkAs/utils.ts
copy to airflow-core/src/airflow/ui/src/pages/DagRuns/index.ts
index 14245602bfb..26ed3a4da89 100644
--- a/airflow-core/src/airflow/ui/src/components/MarkAs/utils.ts
+++ b/airflow-core/src/airflow/ui/src/pages/DagRuns/index.ts
@@ -16,6 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-import type { DAGRunPatchStates } from "openapi/requests/types.gen";
-
-export const allowedStates: Array<DAGRunPatchStates> = ["success", "failed"];
+export { DagRuns } from "./DagRuns";
diff --git a/airflow-core/src/airflow/ui/src/pages/Run/Header.tsx
b/airflow-core/src/airflow/ui/src/pages/Run/Header.tsx
index 3f83146927c..18c0e927426 100644
--- a/airflow-core/src/airflow/ui/src/pages/Run/Header.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/Run/Header.tsx
@@ -33,7 +33,7 @@ import { RunTypeIcon } from "src/components/RunTypeIcon";
import Time from "src/components/Time";
import { RouterLink } from "src/components/ui";
import { SearchParamsKeys } from "src/constants/searchParams";
-import DeleteRunButton from "src/pages/DeleteRunButton";
+import DeleteRunButton from "src/pages/DagRuns/DeleteRunButton";
import { usePatchDagRun } from "src/queries/usePatchDagRun";
import { getDuration } from "src/utils";
diff --git a/airflow-core/src/airflow/ui/src/queries/useBulkDeleteDagRuns.ts
b/airflow-core/src/airflow/ui/src/queries/useBulkDeleteDagRuns.ts
new file mode 100644
index 00000000000..d0a3fcff978
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/queries/useBulkDeleteDagRuns.ts
@@ -0,0 +1,114 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useQueryClient } from "@tanstack/react-query";
+import { useRef, useState } from "react";
+import { useTranslation } from "react-i18next";
+
+import {
+ useDagRunServiceBulkDagRuns,
+ useDagRunServiceGetDagRunsKey,
+ useTaskInstanceServiceGetTaskInstancesKey,
+} from "openapi/queries";
+import type { BulkActionResponse, BulkBody_BulkDAGRunBody_, BulkResponse }
from "openapi/requests/types.gen";
+import { toaster } from "src/components/ui";
+
+import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
+
+type Props = {
+ readonly clearSelections: VoidFunction;
+ readonly onSuccessConfirm: VoidFunction;
+};
+
+const handleActionResult = (
+ actionResult: BulkActionResponse,
+ setError: (error: unknown) => void,
+ onSuccess: (count: number, keys: Array<string>) => void,
+) => {
+ const { errors, success } = actionResult;
+
+ if (Array.isArray(errors) && errors.length > 0) {
+ const apiError = errors[0] as { error: string };
+
+ setError({ body: { detail: apiError.error } });
+ } else if (Array.isArray(success) && success.length > 0) {
+ onSuccess(success.length, success);
+ }
+};
+
+export const useBulkDeleteDagRuns = ({ clearSelections, onSuccessConfirm }:
Props) => {
+ const queryClient = useQueryClient();
+ const [error, setError] = useState<unknown>(undefined);
+ const affectedDagIds = useRef<Set<string>>(new Set());
+ const { t: translate } = useTranslation(["common", "dags"]);
+
+ const onSuccess = async (responseData: BulkResponse) => {
+ await Promise.all([
+ queryClient.invalidateQueries({ queryKey:
[useDagRunServiceGetDagRunsKey] }),
+ queryClient.invalidateQueries({ queryKey:
[useTaskInstanceServiceGetTaskInstancesKey] }),
+ ...tiPerAttemptQueryKeys.map((key) => queryClient.invalidateQueries({
queryKey: key })),
+ ...[...affectedDagIds.current].flatMap((dagId) =>
+ gridQueryKeys(dagId).map((key) => queryClient.invalidateQueries({
queryKey: key })),
+ ),
+ ]);
+
+ if (responseData.delete) {
+ handleActionResult(responseData.delete, setError, (count, keys) => {
+ toaster.create({
+ description: translate("toaster.bulkDelete.success.description", {
+ count,
+ keys: keys.join(", "),
+ resourceName: translate("dagRun_other"),
+ }),
+ title: translate("toaster.bulkDelete.success.title", {
+ resourceName: translate("dagRun_other"),
+ }),
+ type: "success",
+ });
+ clearSelections();
+ onSuccessConfirm();
+ });
+ }
+ };
+
+ const onError = (_error: unknown) => {
+ setError(_error);
+ };
+
+ const { isPending, mutate } = useDagRunServiceBulkDagRuns({
+ onError,
+ onSuccess,
+ });
+
+ const bulkAction = (requestBody: BulkBody_BulkDAGRunBody_) => {
+ setError(undefined);
+ const dagIds = new Set<string>();
+
+ for (const action of requestBody.actions) {
+ for (const entity of action.entities) {
+ if (typeof entity !== "string" && entity.dag_id !== null &&
entity.dag_id !== undefined) {
+ dagIds.add(entity.dag_id);
+ }
+ }
+ }
+ affectedDagIds.current = dagIds;
+ mutate({ dagId: "~", requestBody });
+ };
+
+ return { bulkAction, error, isPending, setError };
+};
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 6dcd2b41219..bc59d204378 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -24,14 +24,18 @@ from unittest import mock
import pytest
import time_machine
-from sqlalchemy import func, select
+from fastapi.testclient import TestClient
+from sqlalchemy import func, select, update
from airflow._shared.timezones import timezone
+from airflow.api_fastapi.auth.managers.simple.user import SimpleAuthManagerUser
from airflow.api_fastapi.core_api.datamodels.dag_versions import
DagVersionResponse
from airflow.api_fastapi.core_api.services.public.common import
resolve_run_on_latest_version
from airflow.models import DagModel, DagRun, Log
from airflow.models.asset import AssetEvent, AssetModel
+from airflow.models.dagbundle import DagBundleModel
from airflow.models.taskinstance import TaskInstance
+from airflow.models.team import Team
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk.definitions.asset import Asset
from airflow.sdk.definitions.param import Param
@@ -2627,3 +2631,271 @@ class TestWaitDagRun:
assert response.status_code == 200
data = response.json()
assert data == {"state": DagRunState.SUCCESS}
+
+
+class TestBulkDagRuns:
+ ENDPOINT_URL = f"/dags/{DAG1_ID}/dagRuns"
+ WILDCARD_ENDPOINT = "/dags/~/dagRuns"
+
+ def test_bulk_delete(self, test_client, session):
+ response = test_client.patch(
+ self.ENDPOINT_URL,
+ json={
+ "actions": [
+ {
+ "action": "delete",
+ "entities": [DAG1_RUN1_ID, DAG1_RUN2_ID],
+ }
+ ]
+ },
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert sorted(body["delete"]["success"]) == sorted(
+ [f"{DAG1_ID}.{DAG1_RUN1_ID}", f"{DAG1_ID}.{DAG1_RUN2_ID}"]
+ )
+ session.expire_all()
+ remaining = session.scalars(select(DagRun).where(DagRun.dag_id ==
DAG1_ID)).all()
+ assert remaining == []
+
+ def test_bulk_delete_with_entity_object(self, test_client, session):
+ response = test_client.patch(
+ self.ENDPOINT_URL,
+ json={
+ "actions": [
+ {
+ "action": "delete",
+ "entities": [{"dag_run_id": DAG1_RUN1_ID}],
+ }
+ ]
+ },
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert body["delete"]["success"] == [f"{DAG1_ID}.{DAG1_RUN1_ID}"]
+ session.expire_all()
+ dr = session.scalar(select(DagRun).where(DagRun.run_id ==
DAG1_RUN1_ID))
+ assert dr is None
+
+ def test_bulk_delete_rejects_running_state(self, test_client, dag_maker,
session):
+ """Mirror the single-run DELETE: a RUNNING Dag Run can't be
bulk-deleted (409)."""
+ with dag_maker(dag_id="test_running_bulk_dag"):
+ EmptyOperator(task_id="t1")
+ dag_maker.create_dagrun(run_id="running_run",
state=DagRunState.RUNNING)
+ session.commit()
+
+ response = test_client.patch(
+ "/dags/test_running_bulk_dag/dagRuns",
+ json={
+ "actions": [
+ {
+ "action": "delete",
+ "entities": ["running_run"],
+ }
+ ]
+ },
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert body["delete"]["success"] == []
+ assert body["delete"]["errors"] == [
+ {
+ "error": (
+ "The DagRun with dag_id: `test_running_bulk_dag` and
run_id: `running_run` "
+ "cannot be deleted in running state"
+ ),
+ "status_code": 409,
+ }
+ ]
+ session.expire_all()
+ assert session.scalar(select(DagRun).where(DagRun.run_id ==
"running_run")) is not None
+
+ def test_bulk_delete_not_found_fails(self, test_client, session):
+ """When FAIL semantics, each missing run is reported individually and
matched runs still get deleted."""
+ response = test_client.patch(
+ self.ENDPOINT_URL,
+ json={
+ "actions": [
+ {
+ "action": "delete",
+ "entities": [DAG1_RUN1_ID, "non_existent_run",
"another_missing_run"],
+ }
+ ]
+ },
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert body["delete"]["success"] == [f"{DAG1_ID}.{DAG1_RUN1_ID}"]
+ errors = body["delete"]["errors"]
+ assert len(errors) == 2
+ assert all(err["status_code"] == 404 for err in errors)
+ assert {err["error"] for err in errors} == {
+ f"The DagRun with dag_id: `{DAG1_ID}` and run_id:
`another_missing_run` was not found",
+ f"The DagRun with dag_id: `{DAG1_ID}` and run_id:
`non_existent_run` was not found",
+ }
+ session.expire_all()
+ assert session.scalar(select(DagRun).where(DagRun.run_id ==
DAG1_RUN1_ID)) is None
+
+ def test_bulk_delete_not_found_skip(self, test_client, session):
+ response = test_client.patch(
+ self.ENDPOINT_URL,
+ json={
+ "actions": [
+ {
+ "action": "delete",
+ "action_on_non_existence": "skip",
+ "entities": [DAG1_RUN1_ID, "non_existent_run"],
+ }
+ ]
+ },
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert body["delete"]["success"] == [f"{DAG1_ID}.{DAG1_RUN1_ID}"]
+ assert body["delete"]["errors"] == []
+
+ def test_bulk_delete_across_dags_with_wildcard(self, test_client, session):
+ response = test_client.patch(
+ self.WILDCARD_ENDPOINT,
+ json={
+ "actions": [
+ {
+ "action": "delete",
+ "entities": [
+ {"dag_id": DAG1_ID, "dag_run_id": DAG1_RUN1_ID},
+ {"dag_id": DAG2_ID, "dag_run_id": DAG2_RUN1_ID},
+ ],
+ }
+ ]
+ },
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert sorted(body["delete"]["success"]) == sorted(
+ [f"{DAG1_ID}.{DAG1_RUN1_ID}", f"{DAG2_ID}.{DAG2_RUN1_ID}"]
+ )
+ session.expire_all()
+ assert session.scalar(select(DagRun).where(DagRun.run_id ==
DAG1_RUN1_ID)) is None
+ assert session.scalar(select(DagRun).where(DagRun.run_id ==
DAG2_RUN1_ID)) is None
+
+ def test_bulk_delete_wildcard_requires_dag_id_in_body(self, test_client):
+ response = test_client.patch(
+ self.WILDCARD_ENDPOINT,
+ json={
+ "actions": [
+ {
+ "action": "delete",
+ "entities": [DAG1_RUN1_ID],
+ }
+ ]
+ },
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert body["delete"]["success"] == []
+ assert len(body["delete"]["errors"]) == 1
+ assert body["delete"]["errors"][0]["status_code"] == 400
+
+ def test_bulk_create_not_supported(self, test_client):
+ response = test_client.patch(
+ self.ENDPOINT_URL,
+ json={
+ "actions": [
+ {
+ "action": "create",
+ "entities": [{"dag_run_id": "brand_new_run"}],
+ }
+ ]
+ },
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert body["create"]["success"] == []
+ assert len(body["create"]["errors"]) == 1
+ assert body["create"]["errors"][0]["status_code"] == 405
+
+ def test_bulk_update_not_supported(self, test_client):
+ response = test_client.patch(
+ self.ENDPOINT_URL,
+ json={
+ "actions": [
+ {
+ "action": "update",
+ "entities": [{"dag_run_id": DAG1_RUN1_ID}],
+ }
+ ]
+ },
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert body["update"]["success"] == []
+ assert len(body["update"]["errors"]) == 1
+ assert body["update"]["errors"][0]["status_code"] == 405
+
+ def test_bulk_delete_rejects_unauthorized_dag_ids_from_request_body(self,
test_client, session):
+ """A 403 at the route level if any entity references a Dag the user
can't access."""
+ restricted_bundle_name = "restricted-bundle-delete"
+ restricted_team_name = "restricted-team-delete"
+ restricted_bundle = DagBundleModel(name=restricted_bundle_name)
+ restricted_team = Team(name=restricted_team_name)
+ restricted_bundle.teams.append(restricted_team)
+ session.add_all([restricted_bundle, restricted_team])
+ session.flush()
+ # Restrict DAG2 by attaching it to a team-scoped bundle the limited
user has no access to.
+ session.execute(
+ update(DagModel).where(DagModel.dag_id ==
DAG2_ID).values(bundle_name=restricted_bundle_name)
+ )
+ session.commit()
+
+ auth_manager = test_client.app.state.auth_manager
+ token = auth_manager._get_token_signer().generate(
+ auth_manager.serialize_user(
+ SimpleAuthManagerUser(username="limited-user", role="user",
teams=[]),
+ )
+ )
+ with (
+ mock.patch("airflow.models.revoked_token.RevokedToken.is_revoked",
return_value=False),
+ TestClient(
+ test_client.app,
+ headers={"Authorization": f"Bearer {token}"},
+ base_url=str(test_client.base_url),
+ ) as limited_test_client,
+ ):
+ response = limited_test_client.patch(
+ self.WILDCARD_ENDPOINT,
+ json={
+ "actions": [
+ {
+ "action": "delete",
+ "entities": [
+ {"dag_id": DAG1_ID, "dag_run_id":
DAG1_RUN1_ID},
+ {"dag_id": DAG2_ID, "dag_run_id":
DAG2_RUN1_ID},
+ ],
+ }
+ ]
+ },
+ )
+
+ assert response.status_code == 403
+ session.expire_all()
+ assert session.scalar(select(DagRun).where(DagRun.run_id ==
DAG1_RUN1_ID)) is not None
+ assert session.scalar(select(DagRun).where(DagRun.run_id ==
DAG2_RUN1_ID)) is not None
+
+ def test_bulk_should_respond_401(self, unauthenticated_test_client):
+ response = unauthenticated_test_client.patch(self.ENDPOINT_URL,
json={"actions": []})
+ assert response.status_code == 401
+
+ def test_bulk_should_respond_403(self, unauthorized_test_client):
+ """An authenticated user with no Dag permissions gets a 403 at the
route level."""
+ response = unauthorized_test_client.patch(
+ self.ENDPOINT_URL,
+ json={
+ "actions": [
+ {
+ "action": "delete",
+ "entities": [DAG1_RUN1_ID],
+ }
+ ]
+ },
+ )
+ assert response.status_code == 403
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 2990901b71f..f05fa65cf56 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -129,6 +129,32 @@ class BulkActionResponse(BaseModel):
] = []
+class BulkDAGRunBody(BaseModel):
+ """
+ Request body for bulk delete operations on Dag Runs.
+ """
+
+ model_config = ConfigDict(
+ extra="forbid",
+ )
+ dag_run_id: Annotated[str, Field(title="Dag Run Id")]
+ dag_id: Annotated[str | None, Field(title="Dag Id")] = None
+
+
+class BulkDeleteActionBulkDAGRunBody(BaseModel):
+ model_config = ConfigDict(
+ extra="forbid",
+ )
+ action: Annotated[
+ Literal["delete"], Field(description="The action to be performed on
the entities.", title="Action")
+ ]
+ entities: Annotated[
+ list[str | BulkDAGRunBody],
+ Field(description="A list of entity id/key or entity objects to be
deleted.", title="Entities"),
+ ]
+ action_on_non_existence: BulkActionNotOnExistence | None = "fail"
+
+
class BulkResponse(BaseModel):
"""
Serializer for responses to bulk entity operations.
@@ -156,6 +182,26 @@ class Note(RootModel[str]):
root: Annotated[str, Field(max_length=1000, title="Note")]
+class BulkUpdateActionBulkDAGRunBody(BaseModel):
+ model_config = ConfigDict(
+ extra="forbid",
+ )
+ action: Annotated[
+ Literal["update"], Field(description="The action to be performed on
the entities.", title="Action")
+ ]
+ entities: Annotated[
+ list[BulkDAGRunBody], Field(description="A list of entities to be
updated.", title="Entities")
+ ]
+ update_mask: Annotated[
+ list[str] | None,
+ Field(
+ description="A list of field names to update for each entity.Only
these fields will be applied from the request body to the database model.Any
extra fields provided will be ignored.",
+ title="Update Mask",
+ ),
+ ] = None
+ action_on_non_existence: BulkActionNotOnExistence | None = "fail"
+
+
class TaskIds(RootModel[list]):
root: Annotated[list, Field(max_length=2, min_length=2)]
@@ -325,16 +371,6 @@ class DAGRunClearBody(BaseModel):
] = None
-class DAGRunPatchStates(str, Enum):
- """
- Enum for Dag Run states when updating a Dag Run.
- """
-
- QUEUED = "queued"
- SUCCESS = "success"
- FAILED = "failed"
-
-
class DAGSourceResponse(BaseModel):
"""
Dag Source serializer for responses.
@@ -385,6 +421,16 @@ class DagRunAssetReference(BaseModel):
partition_key: Annotated[str | None, Field(title="Partition Key")] = None
+class DagRunMutableStates(str, Enum):
+ """
+ Dag Run states from which the run may be mutated (patched, deleted).
+ """
+
+ QUEUED = "queued"
+ SUCCESS = "success"
+ FAILED = "failed"
+
+
class DagRunState(str, Enum):
"""
All possible states that a DagRun can be in.
@@ -1231,6 +1277,19 @@ class BackfillResponse(BaseModel):
dag_display_name: Annotated[str, Field(title="Dag Display Name")]
+class BulkCreateActionBulkDAGRunBody(BaseModel):
+ model_config = ConfigDict(
+ extra="forbid",
+ )
+ action: Annotated[
+ Literal["create"], Field(description="The action to be performed on
the entities.", title="Action")
+ ]
+ entities: Annotated[
+ list[BulkDAGRunBody], Field(description="A list of entities to be
created.", title="Entities")
+ ]
+ action_on_existence: BulkActionOnExistence | None = "fail"
+
+
class BulkCreateActionConnectionBody(BaseModel):
model_config = ConfigDict(
extra="forbid",
@@ -1553,7 +1612,7 @@ class DAGRunPatchBody(BaseModel):
model_config = ConfigDict(
extra="forbid",
)
- state: DAGRunPatchStates | None = None
+ state: DagRunMutableStates | None = None
note: Annotated[Note | None, Field(title="Note")] = None
@@ -1967,6 +2026,18 @@ class BackfillCollectionResponse(BaseModel):
total_entries: Annotated[int, Field(title="Total Entries")]
+class BulkBodyBulkDAGRunBody(BaseModel):
+ model_config = ConfigDict(
+ extra="forbid",
+ )
+ actions: Annotated[
+ list[
+ BulkCreateActionBulkDAGRunBody | BulkUpdateActionBulkDAGRunBody |
BulkDeleteActionBulkDAGRunBody
+ ],
+ Field(title="Actions"),
+ ]
+
+
class BulkBodyConnectionBody(BaseModel):
model_config = ConfigDict(
extra="forbid",