This is an automated email from the ASF dual-hosted git repository.
guanmingchiu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e0e4ab451ce Add UI for AssetPartitionDagRun (#61398)
e0e4ab451ce is described below
commit e0e4ab451ce084a860044cbf53ae41a863ae16f6
Author: Guan-Ming (Wesley) Chiu <[email protected]>
AuthorDate: Tue Feb 24 12:31:04 2026 +0800
Add UI for AssetPartitionDagRun (#61398)
* Add AssetPartitionDagRun ui
* Replace dag with Dag
* Refactor backend
Signed-off-by: Guan-Ming (Wesley) Chiu
<[email protected]>
* Fix small changes
Signed-off-by: Guan-Ming (Wesley) Chiu
<[email protected]>
* Remove PartitionedDagRun components and related routes from the UI
Signed-off-by: Guan-Ming (Wesley) Chiu
<[email protected]>
* Fix assets test
Signed-off-by: Guan-Ming (Wesley) Chiu
<[email protected]>
* Update translation key for received asset events
Signed-off-by: Guan-Ming (Wesley) Chiu
<[email protected]>
* Fix query for pending apdr
Signed-off-by: Guan-Ming (Wesley) Chiu
<[email protected]>
* Fix wording
Co-authored-by: Wei Lee <[email protected]>
Signed-off-by: Guan-Ming (Wesley) Chiu
<[email protected]>
---------
Signed-off-by: Guan-Ming (Wesley) Chiu
<[email protected]>
Co-authored-by: Wei Lee <[email protected]>
---
.../src/airflow/api_fastapi/common/parameters.py | 23 ++
.../core_api/datamodels/ui/partitioned_dag_runs.py | 64 +++++
.../api_fastapi/core_api/openapi/_private_ui.yaml | 219 ++++++++++++++++
.../api_fastapi/core_api/routes/ui/__init__.py | 2 +
.../api_fastapi/core_api/routes/ui/assets.py | 89 ++++---
.../core_api/routes/ui/partitioned_dag_runs.py | 239 ++++++++++++++++++
.../src/airflow/ui/openapi-gen/queries/common.ts | 16 +-
.../ui/openapi-gen/queries/ensureQueryData.ts | 28 ++-
.../src/airflow/ui/openapi-gen/queries/prefetch.ts | 28 ++-
.../src/airflow/ui/openapi-gen/queries/queries.ts | 28 ++-
.../src/airflow/ui/openapi-gen/queries/suspense.ts | 28 ++-
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 217 ++++++++++++++++
.../ui/openapi-gen/requests/services.gen.ts | 51 +++-
.../airflow/ui/openapi-gen/requests/types.gen.ts | 99 ++++++++
.../airflow/ui/public/i18n/locales/en/common.json | 7 +
.../components/AssetExpression/AssetExpression.tsx | 4 +-
.../ui/src/components/AssetProgressCell.tsx | 67 +++++
.../ui/src/pages/DagsList/AssetSchedule.tsx | 56 ++++-
.../src/pages/DagsList/PartitionScheduleModal.tsx | 110 ++++++++
.../api_fastapi/core_api/routes/ui/test_assets.py | 64 ++++-
.../routes/ui/test_partitioned_dag_runs.py | 276 +++++++++++++++++++++
21 files changed, 1670 insertions(+), 45 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py
b/airflow-core/src/airflow/api_fastapi/common/parameters.py
index b6b1d255264..e963cdbff55 100644
--- a/airflow-core/src/airflow/api_fastapi/common/parameters.py
+++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py
@@ -47,6 +47,7 @@ from airflow.models import Base
from airflow.models.asset import (
AssetAliasModel,
AssetModel,
+ AssetPartitionDagRun,
DagScheduleAssetReference,
TaskInletAssetReference,
TaskOutletAssetReference,
@@ -1040,6 +1041,28 @@ QueryAssetAliasNamePatternSearch = Annotated[
QueryAssetDagIdPatternSearch = Annotated[
_DagIdAssetReferenceFilter, Depends(_DagIdAssetReferenceFilter.depends)
]
+QueryPartitionedDagRunHasCreatedDagRunIdFilter = Annotated[
+ FilterParam[bool | None],
+ Depends(
+ filter_param_factory(
+ AssetPartitionDagRun.created_dag_run_id,
+ bool | None,
+ FilterOptionEnum.IS_NONE,
+ filter_name="has_created_dag_run_id",
+ transform_callable=lambda v: not v if v is not None else None,
+ )
+ ),
+]
+QueryPartitionedDagRunDagIdFilter = Annotated[
+ FilterParam[str | None],
+ Depends(
+ filter_param_factory(
+ AssetPartitionDagRun.target_dag_id,
+ str | None,
+ filter_name="dag_id",
+ )
+ ),
+]
# Variables
QueryVariableKeyPatternSearch = Annotated[
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/partitioned_dag_runs.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/partitioned_dag_runs.py
new file mode 100644
index 00000000000..628f8560e96
--- /dev/null
+++
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/partitioned_dag_runs.py
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.api_fastapi.core_api.base import BaseModel
+
+
+class PartitionedDagRunResponse(BaseModel):
+ """Single partitioned Dag run item."""
+
+ id: int
+ partition_key: str
+ created_at: str | None = None
+ total_received: int
+ total_required: int
+ dag_id: str | None = None
+ state: str | None = None
+ created_dag_run_id: str | None = None
+
+
+class PartitionedDagRunCollectionResponse(BaseModel):
+ """Collection of partitioned Dag runs."""
+
+ partitioned_dag_runs: list[PartitionedDagRunResponse]
+ total: int
+ asset_expressions: dict[str, dict | None] | None = None
+
+
+class PartitionedDagRunAssetResponse(BaseModel):
+ """Asset info within a partitioned Dag run detail."""
+
+ asset_id: int
+ asset_name: str
+ asset_uri: str
+ received: bool
+
+
+class PartitionedDagRunDetailResponse(BaseModel):
+ """Detail of a single partitioned Dag run."""
+
+ id: int
+ dag_id: str
+ partition_key: str
+ created_at: str | None = None
+ updated_at: str | None = None
+ created_dag_run_id: str | None = None
+ assets: list[PartitionedDagRunAssetResponse]
+ total_required: int
+ total_received: int
+ asset_expression: dict | None = None
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index c3f307afbe7..5fbf898c512 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -71,6 +71,82 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /ui/partitioned_dag_runs:
+ get:
+ tags:
+ - PartitionedDagRun
+ summary: Get Partitioned Dag Runs
+ description: Return PartitionedDagRuns. Filter by dag_id and/or
has_created_dag_run_id.
+ operationId: get_partitioned_dag_runs
+ security:
+ - OAuth2PasswordBearer: []
+ - HTTPBearer: []
+ parameters:
+ - name: dag_id
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Dag Id
+ - name: has_created_dag_run_id
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: boolean
+ - type: 'null'
+ title: Has Created Dag Run Id
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref:
'#/components/schemas/PartitionedDagRunCollectionResponse'
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
+ /ui/pending_partitioned_dag_run/{dag_id}/{partition_key}:
+ get:
+ tags:
+ - PartitionedDagRun
+ summary: Get Pending Partitioned Dag Run
+ description: Return full details for pending PartitionedDagRun.
+ operationId: get_pending_partitioned_dag_run
+ security:
+ - OAuth2PasswordBearer: []
+ - HTTPBearer: []
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: partition_key
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Partition Key
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/PartitionedDagRunDetailResponse'
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/ui/config:
get:
tags:
@@ -2568,6 +2644,149 @@ components:
description: Node serializer for responses.
OklchColor:
type: string
+ PartitionedDagRunAssetResponse:
+ properties:
+ asset_id:
+ type: integer
+ title: Asset Id
+ asset_name:
+ type: string
+ title: Asset Name
+ asset_uri:
+ type: string
+ title: Asset Uri
+ received:
+ type: boolean
+ title: Received
+ type: object
+ required:
+ - asset_id
+ - asset_name
+ - asset_uri
+ - received
+ title: PartitionedDagRunAssetResponse
+ description: Asset info within a partitioned Dag run detail.
+ PartitionedDagRunCollectionResponse:
+ properties:
+ partitioned_dag_runs:
+ items:
+ $ref: '#/components/schemas/PartitionedDagRunResponse'
+ type: array
+ title: Partitioned Dag Runs
+ total:
+ type: integer
+ title: Total
+ asset_expressions:
+ anyOf:
+ - additionalProperties:
+ anyOf:
+ - additionalProperties: true
+ type: object
+ - type: 'null'
+ type: object
+ - type: 'null'
+ title: Asset Expressions
+ type: object
+ required:
+ - partitioned_dag_runs
+ - total
+ title: PartitionedDagRunCollectionResponse
+ description: Collection of partitioned Dag runs.
+ PartitionedDagRunDetailResponse:
+ properties:
+ id:
+ type: integer
+ title: Id
+ dag_id:
+ type: string
+ title: Dag Id
+ partition_key:
+ type: string
+ title: Partition Key
+ created_at:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Created At
+ updated_at:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Updated At
+ created_dag_run_id:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Created Dag Run Id
+ assets:
+ items:
+ $ref: '#/components/schemas/PartitionedDagRunAssetResponse'
+ type: array
+ title: Assets
+ total_required:
+ type: integer
+ title: Total Required
+ total_received:
+ type: integer
+ title: Total Received
+ asset_expression:
+ anyOf:
+ - additionalProperties: true
+ type: object
+ - type: 'null'
+ title: Asset Expression
+ type: object
+ required:
+ - id
+ - dag_id
+ - partition_key
+ - assets
+ - total_required
+ - total_received
+ title: PartitionedDagRunDetailResponse
+ description: Detail of a single partitioned Dag run.
+ PartitionedDagRunResponse:
+ properties:
+ id:
+ type: integer
+ title: Id
+ partition_key:
+ type: string
+ title: Partition Key
+ created_at:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Created At
+ total_received:
+ type: integer
+ title: Total Received
+ total_required:
+ type: integer
+ title: Total Required
+ dag_id:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Dag Id
+ state:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: State
+ created_dag_run_id:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Created Dag Run Id
+ type: object
+ required:
+ - id
+ - partition_key
+ - total_received
+ - total_required
+ title: PartitionedDagRunResponse
+ description: Single partitioned Dag run item.
ReprocessBehavior:
type: string
enum:
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/__init__.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/__init__.py
index ce8744785cd..6bce499e38c 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/__init__.py
@@ -29,6 +29,7 @@ from airflow.api_fastapi.core_api.routes.ui.deadlines import
deadlines_router
from airflow.api_fastapi.core_api.routes.ui.dependencies import
dependencies_router
from airflow.api_fastapi.core_api.routes.ui.gantt import gantt_router
from airflow.api_fastapi.core_api.routes.ui.grid import grid_router
+from airflow.api_fastapi.core_api.routes.ui.partitioned_dag_runs import
partitioned_dag_runs_router
from airflow.api_fastapi.core_api.routes.ui.structure import structure_router
from airflow.api_fastapi.core_api.routes.ui.teams import teams_router
@@ -36,6 +37,7 @@ ui_router = AirflowRouter(prefix="/ui",
include_in_schema=False)
ui_router.include_router(auth_router)
ui_router.include_router(assets_router)
+ui_router.include_router(partitioned_dag_runs_router)
ui_router.include_router(config_router)
ui_router.include_router(connections_router)
ui_router.include_router(dags_router)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
index ce3639e1d8b..155b49726f6 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
@@ -14,18 +14,23 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
from __future__ import annotations
from fastapi import Depends, HTTPException, status
-from sqlalchemy import and_, case, func, select, true
+from sqlalchemy import ColumnElement, and_, case, exists, func, select, true
-from airflow.api_fastapi.common.dagbag import DagBagDep
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.security import requires_access_asset,
requires_access_dag
from airflow.models import DagModel
-from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel,
DagScheduleAssetReference
+from airflow.models.asset import (
+ AssetDagRunQueue,
+ AssetEvent,
+ AssetModel,
+ AssetPartitionDagRun,
+ DagScheduleAssetReference,
+ PartitionedAssetKeyLog,
+)
assets_router = AirflowRouter(tags=["Asset"])
@@ -36,35 +41,67 @@ assets_router = AirflowRouter(tags=["Asset"])
)
def next_run_assets(
dag_id: str,
- dag_bag: DagBagDep,
session: SessionDep,
) -> dict:
dag_model = DagModel.get_dagmodel(dag_id, session=session)
if dag_model is None:
- raise HTTPException(status.HTTP_404_NOT_FOUND, f"can't find associated
dag_model {dag_id}")
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id}
was not found")
latest_run = dag_model.get_last_dagrun(session=session)
+ event_filter = (
+ AssetEvent.timestamp >= latest_run.logical_date if latest_run and
latest_run.logical_date else true()
+ )
- if latest_run and latest_run.logical_date:
- on_clause = AssetEvent.timestamp >= latest_run.logical_date
+ pending_partition_count: int | None = None
+
+ queued_expr: ColumnElement[int]
+ if is_partitioned := dag_model.timetable_summary == "Partitioned Asset":
+ pending_partition_count = session.scalar(
+ select(func.count())
+ .select_from(AssetPartitionDagRun)
+ .where(
+ AssetPartitionDagRun.target_dag_id == dag_id,
+ AssetPartitionDagRun.created_dag_run_id.is_(None),
+ )
+ )
+ queued_expr = case(
+ (
+ exists(
+ select(PartitionedAssetKeyLog.id)
+ .join(
+ AssetPartitionDagRun,
+ PartitionedAssetKeyLog.asset_partition_dag_run_id ==
AssetPartitionDagRun.id,
+ )
+ .where(
+ PartitionedAssetKeyLog.asset_id == AssetModel.id,
+ PartitionedAssetKeyLog.target_dag_id == dag_id,
+ AssetPartitionDagRun.created_dag_run_id.is_(None),
+ )
+ ),
+ 1,
+ ),
+ else_=0,
+ )
else:
- on_clause = true()
+ queued_expr = func.max(case((AssetDagRunQueue.asset_id.is_not(None),
1), else_=0))
- query_result = session.execute(
+ query = (
select(
AssetModel.id,
AssetModel.uri,
AssetModel.name,
func.max(AssetEvent.timestamp).label("lastUpdate"),
- func.max(
- case(
- (AssetDagRunQueue.asset_id.is_not(None), 1),
- else_=0,
- )
- ).label("queued"),
+ queued_expr.label("queued"),
)
.join(DagScheduleAssetReference, DagScheduleAssetReference.asset_id ==
AssetModel.id)
- .join(
+ .join(AssetEvent, and_(AssetEvent.asset_id == AssetModel.id,
event_filter), isouter=True)
+ .where(DagScheduleAssetReference.dag_id == dag_id,
AssetModel.active.has())
+ .group_by(AssetModel.id, AssetModel.uri, AssetModel.name)
+ .order_by(AssetModel.uri)
+ )
+
+ if not is_partitioned:
+ query = query.join(
AssetDagRunQueue,
and_(
AssetDagRunQueue.asset_id == AssetModel.id,
@@ -72,23 +109,13 @@ def next_run_assets(
),
isouter=True,
)
- .join(
- AssetEvent,
- and_(
- AssetEvent.asset_id == AssetModel.id,
- on_clause,
- ),
- isouter=True,
- )
- .where(DagScheduleAssetReference.dag_id == dag_id,
AssetModel.active.has())
- .group_by(AssetModel.id, AssetModel.uri, AssetModel.name)
- .order_by(AssetModel.uri)
- )
- events = [dict(info._mapping) for info in query_result]
+ events = [dict(info._mapping) for info in session.execute(query)]
for event in events:
if not event.pop("queued", None):
event["lastUpdate"] = None
- data = {"asset_expression": dag_model.asset_expression, "events": events}
+ data: dict = {"asset_expression": dag_model.asset_expression, "events":
events}
+ if pending_partition_count is not None:
+ data["pending_partition_count"] = pending_partition_count
return data
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/partitioned_dag_runs.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/partitioned_dag_runs.py
new file mode 100644
index 00000000000..fd67b0e48c6
--- /dev/null
+++
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/partitioned_dag_runs.py
@@ -0,0 +1,239 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from fastapi import Depends, HTTPException, status
+from sqlalchemy import exists, func, select
+
+from airflow.api_fastapi.common.db.common import SessionDep,
apply_filters_to_select
+from airflow.api_fastapi.common.parameters import (
+ QueryPartitionedDagRunDagIdFilter,
+ QueryPartitionedDagRunHasCreatedDagRunIdFilter,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.partitioned_dag_runs import (
+ PartitionedDagRunAssetResponse,
+ PartitionedDagRunCollectionResponse,
+ PartitionedDagRunDetailResponse,
+ PartitionedDagRunResponse,
+)
+from airflow.api_fastapi.core_api.security import requires_access_asset
+from airflow.models import DagModel
+from airflow.models.asset import (
+ AssetModel,
+ AssetPartitionDagRun,
+ DagScheduleAssetReference,
+ PartitionedAssetKeyLog,
+)
+from airflow.models.dagrun import DagRun
+
+partitioned_dag_runs_router = AirflowRouter(tags=["PartitionedDagRun"])
+
+
+def _build_response(row, required_count: int) -> PartitionedDagRunResponse:
+ return PartitionedDagRunResponse(
+ id=row.id,
+ dag_id=row.target_dag_id,
+ partition_key=row.partition_key,
+ created_at=row.created_at.isoformat() if row.created_at else None,
+ total_received=row.total_received or 0,
+ total_required=required_count,
+ state=row.dag_run_state if row.created_dag_run_id else "pending",
+ created_dag_run_id=row.dag_run_id,
+ )
+
+
+@partitioned_dag_runs_router.get(
+ "/partitioned_dag_runs",
+ dependencies=[Depends(requires_access_asset(method="GET"))],
+)
+def get_partitioned_dag_runs(
+ session: SessionDep,
+ dag_id: QueryPartitionedDagRunDagIdFilter,
+ has_created_dag_run_id: QueryPartitionedDagRunHasCreatedDagRunIdFilter,
+) -> PartitionedDagRunCollectionResponse:
+ """Return PartitionedDagRuns. Filter by dag_id and/or
has_created_dag_run_id."""
+ if dag_id.value is not None:
+ # Single query: validate Dag + get required count
+ dag_info = session.execute(
+ select(
+ DagModel.timetable_summary,
+
func.count(DagScheduleAssetReference.asset_id).label("required_count"),
+ )
+ .outerjoin(
+ DagScheduleAssetReference,
+ (DagScheduleAssetReference.dag_id == DagModel.dag_id)
+ & DagScheduleAssetReference.asset_id.in_(
+ select(AssetModel.id).where(AssetModel.active.has())
+ ),
+ )
+ .where(DagModel.dag_id == dag_id.value)
+ .group_by(DagModel.dag_id)
+ ).one_or_none()
+
+ if dag_info is None:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id
{dag_id.value} was not found")
+ if dag_info.timetable_summary != "Partitioned Asset":
+ return
PartitionedDagRunCollectionResponse(partitioned_dag_runs=[], total=0)
+
+ required_count = dag_info.required_count
+
+ # Subquery for received count per partition (only count required assets)
+ required_assets_subq = (
+ select(DagScheduleAssetReference.asset_id)
+ .join(AssetModel, AssetModel.id == DagScheduleAssetReference.asset_id)
+ .where(
+ DagScheduleAssetReference.dag_id ==
AssetPartitionDagRun.target_dag_id,
+ AssetModel.active.has(),
+ )
+ )
+ received_subq = (
+ select(func.count(func.distinct(PartitionedAssetKeyLog.asset_id)))
+ .where(
+ PartitionedAssetKeyLog.asset_partition_dag_run_id ==
AssetPartitionDagRun.id,
+ PartitionedAssetKeyLog.asset_id.in_(required_assets_subq),
+ )
+ .correlate(AssetPartitionDagRun)
+ .scalar_subquery()
+ )
+
+ query = select(
+ AssetPartitionDagRun.id,
+ AssetPartitionDagRun.target_dag_id,
+ AssetPartitionDagRun.partition_key,
+ AssetPartitionDagRun.created_at,
+ AssetPartitionDagRun.created_dag_run_id,
+ DagRun.run_id.label("dag_run_id"),
+ DagRun.state.label("dag_run_state"),
+ received_subq.label("total_received"),
+ ).outerjoin(DagRun, AssetPartitionDagRun.created_dag_run_id == DagRun.id)
+ query = apply_filters_to_select(statement=query, filters=[dag_id,
has_created_dag_run_id])
+ query = query.order_by(AssetPartitionDagRun.created_at.desc())
+
+ if not (rows := session.execute(query).all()):
+ return PartitionedDagRunCollectionResponse(partitioned_dag_runs=[],
total=0)
+
+ if dag_id.value is not None:
+ results = [_build_response(row, required_count) for row in rows]
+ return
PartitionedDagRunCollectionResponse(partitioned_dag_runs=results,
total=len(results))
+
+ # No dag_id: need to get required counts and expressions per dag
+ dag_ids = list({row.target_dag_id for row in rows})
+ dag_rows = session.execute(
+ select(
+ DagModel.dag_id,
+ DagModel.asset_expression,
+
func.count(DagScheduleAssetReference.asset_id).label("required_count"),
+ )
+ .outerjoin(
+ DagScheduleAssetReference,
+ (DagScheduleAssetReference.dag_id == DagModel.dag_id)
+ &
DagScheduleAssetReference.asset_id.in_(select(AssetModel.id).where(AssetModel.active.has())),
+ )
+ .where(DagModel.dag_id.in_(dag_ids))
+ .group_by(DagModel.dag_id)
+ ).all()
+
+ required_counts = {r.dag_id: r.required_count for r in dag_rows}
+ asset_expressions = {r.dag_id: r.asset_expression for r in dag_rows}
+ results = [_build_response(row, required_counts.get(row.target_dag_id, 0))
for row in rows]
+
+ return PartitionedDagRunCollectionResponse(
+ partitioned_dag_runs=results,
+ total=len(results),
+ asset_expressions=asset_expressions,
+ )
+
+
+@partitioned_dag_runs_router.get(
+ "/pending_partitioned_dag_run/{dag_id}/{partition_key}",
+ dependencies=[Depends(requires_access_asset(method="GET"))],
+)
+def get_pending_partitioned_dag_run(
+ dag_id: str,
+ partition_key: str,
+ session: SessionDep,
+) -> PartitionedDagRunDetailResponse:
+ """Return full details for pending PartitionedDagRun."""
+ partitioned_dag_run = session.execute(
+ select(
+ AssetPartitionDagRun.id,
+ AssetPartitionDagRun.target_dag_id,
+ AssetPartitionDagRun.partition_key,
+ AssetPartitionDagRun.created_at,
+ AssetPartitionDagRun.updated_at,
+ DagRun.run_id.label("created_dag_run_id"),
+ )
+ .outerjoin(DagRun, AssetPartitionDagRun.created_dag_run_id ==
DagRun.id)
+ .where(
+ AssetPartitionDagRun.target_dag_id == dag_id,
+ AssetPartitionDagRun.partition_key == partition_key,
+ AssetPartitionDagRun.created_dag_run_id.is_(None),
+ )
+ ).one_or_none()
+
+ if partitioned_dag_run is None:
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND,
+ f"No PartitionedDagRun for dag={dag_id} partition={partition_key}",
+ )
+
+ received_subq = (
+ select(PartitionedAssetKeyLog.asset_id).where(
+ PartitionedAssetKeyLog.asset_partition_dag_run_id ==
partitioned_dag_run.id
+ )
+ ).correlate(AssetModel)
+
+ received_expr = exists(received_subq.where(PartitionedAssetKeyLog.asset_id
== AssetModel.id))
+
+ asset_expression_subq = (
+ select(DagModel.asset_expression).where(DagModel.dag_id ==
dag_id).scalar_subquery()
+ )
+ asset_rows = session.execute(
+ select(
+ AssetModel.id,
+ AssetModel.uri,
+ AssetModel.name,
+ received_expr.label("received"),
+ asset_expression_subq.label("asset_expression"),
+ )
+ .join(DagScheduleAssetReference, DagScheduleAssetReference.asset_id ==
AssetModel.id)
+ .where(DagScheduleAssetReference.dag_id == dag_id,
AssetModel.active.has())
+ .order_by(received_expr.asc(), AssetModel.uri)
+ ).all()
+
+ assets = [
+ PartitionedDagRunAssetResponse(
+ asset_id=row.id, asset_name=row.name, asset_uri=row.uri,
received=row.received
+ )
+ for row in asset_rows
+ ]
+ total_received = sum(1 for a in assets if a.received)
+ asset_expression = asset_rows[0].asset_expression if asset_rows else None
+
+ return PartitionedDagRunDetailResponse(
+ id=partitioned_dag_run.id,
+ dag_id=dag_id,
+ partition_key=partition_key,
+ created_at=partitioned_dag_run.created_at.isoformat() if
partitioned_dag_run.created_at else None,
+ updated_at=partitioned_dag_run.updated_at.isoformat() if
partitioned_dag_run.updated_at else None,
+ created_dag_run_id=partitioned_dag_run.created_dag_run_id,
+ assets=assets,
+ total_required=len(assets),
+ total_received=total_received,
+ asset_expression=asset_expression,
+ )
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
index f660990ec61..705a35f4cca 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
@@ -1,7 +1,7 @@
// generated with @7nohe/[email protected]
import { UseQueryResult } from "@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagParsingService, DagRunService, DagService,
DagSourceService, DagStatsService, DagVersionService, DagWarningService,
DashboardService, DeadlinesService, DependenciesService, EventLogService,
ExperimentalService, ExtraLinksService, GanttService, GridService,
ImportErrorService, JobService, LoginService, MonitorService, PluginService,
PoolService, ProviderService, StructureService [...]
+import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagParsingService, DagRunService, DagService,
DagSourceService, DagStatsService, DagVersionService, DagWarningService,
DashboardService, DeadlinesService, DependenciesService, EventLogService,
ExperimentalService, ExtraLinksService, GanttService, GridService,
ImportErrorService, JobService, LoginService, MonitorService,
PartitionedDagRunService, PluginService, PoolService, Provide [...]
import { DagRunState, DagWarningType } from "../requests/types.gen";
export type AssetServiceGetAssetsDefaultResponse = Awaited<ReturnType<typeof
AssetService.getAssets>>;
export type AssetServiceGetAssetsQueryResult<TData =
AssetServiceGetAssetsDefaultResponse, TError = unknown> = UseQueryResult<TData,
TError>;
@@ -789,6 +789,20 @@ export type
AuthLinksServiceGetCurrentUserInfoDefaultResponse = Awaited<ReturnTy
export type AuthLinksServiceGetCurrentUserInfoQueryResult<TData =
AuthLinksServiceGetCurrentUserInfoDefaultResponse, TError = unknown> =
UseQueryResult<TData, TError>;
export const useAuthLinksServiceGetCurrentUserInfoKey =
"AuthLinksServiceGetCurrentUserInfo";
export const UseAuthLinksServiceGetCurrentUserInfoKeyFn = (queryKey?:
Array<unknown>) => [useAuthLinksServiceGetCurrentUserInfoKey, ...(queryKey ??
[])];
+export type PartitionedDagRunServiceGetPartitionedDagRunsDefaultResponse =
Awaited<ReturnType<typeof PartitionedDagRunService.getPartitionedDagRuns>>;
+export type PartitionedDagRunServiceGetPartitionedDagRunsQueryResult<TData =
PartitionedDagRunServiceGetPartitionedDagRunsDefaultResponse, TError = unknown>
= UseQueryResult<TData, TError>;
+export const usePartitionedDagRunServiceGetPartitionedDagRunsKey =
"PartitionedDagRunServiceGetPartitionedDagRuns";
+export const UsePartitionedDagRunServiceGetPartitionedDagRunsKeyFn = ({ dagId,
hasCreatedDagRunId }: {
+ dagId?: string;
+ hasCreatedDagRunId?: boolean;
+} = {}, queryKey?: Array<unknown>) =>
[usePartitionedDagRunServiceGetPartitionedDagRunsKey, ...(queryKey ?? [{ dagId,
hasCreatedDagRunId }])];
+export type PartitionedDagRunServiceGetPendingPartitionedDagRunDefaultResponse
= Awaited<ReturnType<typeof
PartitionedDagRunService.getPendingPartitionedDagRun>>;
+export type
PartitionedDagRunServiceGetPendingPartitionedDagRunQueryResult<TData =
PartitionedDagRunServiceGetPendingPartitionedDagRunDefaultResponse, TError =
unknown> = UseQueryResult<TData, TError>;
+export const usePartitionedDagRunServiceGetPendingPartitionedDagRunKey =
"PartitionedDagRunServiceGetPendingPartitionedDagRun";
+export const UsePartitionedDagRunServiceGetPendingPartitionedDagRunKeyFn = ({
dagId, partitionKey }: {
+ dagId: string;
+ partitionKey: string;
+}, queryKey?: Array<unknown>) =>
[usePartitionedDagRunServiceGetPendingPartitionedDagRunKey, ...(queryKey ?? [{
dagId, partitionKey }])];
export type DependenciesServiceGetDependenciesDefaultResponse =
Awaited<ReturnType<typeof DependenciesService.getDependencies>>;
export type DependenciesServiceGetDependenciesQueryResult<TData =
DependenciesServiceGetDependenciesDefaultResponse, TError = unknown> =
UseQueryResult<TData, TError>;
export const useDependenciesServiceGetDependenciesKey =
"DependenciesServiceGetDependencies";
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
index 20d67fcd1aa..a16531feb37 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
@@ -1,7 +1,7 @@
// generated with @7nohe/[email protected]
import { type QueryClient } from "@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService,
DagStatsService, DagVersionService, DagWarningService, DashboardService,
DeadlinesService, DependenciesService, EventLogService, ExperimentalService,
ExtraLinksService, GanttService, GridService, ImportErrorService, JobService,
LoginService, MonitorService, PluginService, PoolService, ProviderService,
StructureService, TaskInstanceServi [...]
+import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService,
DagStatsService, DagVersionService, DagWarningService, DashboardService,
DeadlinesService, DependenciesService, EventLogService, ExperimentalService,
ExtraLinksService, GanttService, GridService, ImportErrorService, JobService,
LoginService, MonitorService, PartitionedDagRunService, PluginService,
PoolService, ProviderService, Structure [...]
import { DagRunState, DagWarningType } from "../requests/types.gen";
import * as Common from "./common";
/**
@@ -1499,6 +1499,32 @@ export const ensureUseAuthLinksServiceGetAuthMenusData =
(queryClient: QueryClie
*/
export const ensureUseAuthLinksServiceGetCurrentUserInfoData = (queryClient:
QueryClient) => queryClient.ensureQueryData({ queryKey:
Common.UseAuthLinksServiceGetCurrentUserInfoKeyFn(), queryFn: () =>
AuthLinksService.getCurrentUserInfo() });
/**
+* Get Partitioned Dag Runs
+* Return PartitionedDagRuns. Filter by dag_id and/or has_created_dag_run_id.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.hasCreatedDagRunId
+* @returns PartitionedDagRunCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const ensureUsePartitionedDagRunServiceGetPartitionedDagRunsData =
(queryClient: QueryClient, { dagId, hasCreatedDagRunId }: {
+ dagId?: string;
+ hasCreatedDagRunId?: boolean;
+} = {}) => queryClient.ensureQueryData({ queryKey:
Common.UsePartitionedDagRunServiceGetPartitionedDagRunsKeyFn({ dagId,
hasCreatedDagRunId }), queryFn: () =>
PartitionedDagRunService.getPartitionedDagRuns({ dagId, hasCreatedDagRunId })
});
+/**
+* Get Pending Partitioned Dag Run
+* Return full details for pending PartitionedDagRun.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.partitionKey
+* @returns PartitionedDagRunDetailResponse Successful Response
+* @throws ApiError
+*/
+export const ensureUsePartitionedDagRunServiceGetPendingPartitionedDagRunData
= (queryClient: QueryClient, { dagId, partitionKey }: {
+ dagId: string;
+ partitionKey: string;
+}) => queryClient.ensureQueryData({ queryKey:
Common.UsePartitionedDagRunServiceGetPendingPartitionedDagRunKeyFn({ dagId,
partitionKey }), queryFn: () =>
PartitionedDagRunService.getPendingPartitionedDagRun({ dagId, partitionKey })
});
+/**
* Get Dependencies
* Dependencies graph.
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
index bed863f93cd..a4b5afb54df 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -1,7 +1,7 @@
// generated with @7nohe/[email protected]
import { type QueryClient } from "@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService,
DagStatsService, DagVersionService, DagWarningService, DashboardService,
DeadlinesService, DependenciesService, EventLogService, ExperimentalService,
ExtraLinksService, GanttService, GridService, ImportErrorService, JobService,
LoginService, MonitorService, PluginService, PoolService, ProviderService,
StructureService, TaskInstanceServi [...]
+import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService,
DagStatsService, DagVersionService, DagWarningService, DashboardService,
DeadlinesService, DependenciesService, EventLogService, ExperimentalService,
ExtraLinksService, GanttService, GridService, ImportErrorService, JobService,
LoginService, MonitorService, PartitionedDagRunService, PluginService,
PoolService, ProviderService, Structure [...]
import { DagRunState, DagWarningType } from "../requests/types.gen";
import * as Common from "./common";
/**
@@ -1499,6 +1499,32 @@ export const prefetchUseAuthLinksServiceGetAuthMenus =
(queryClient: QueryClient
*/
export const prefetchUseAuthLinksServiceGetCurrentUserInfo = (queryClient:
QueryClient) => queryClient.prefetchQuery({ queryKey:
Common.UseAuthLinksServiceGetCurrentUserInfoKeyFn(), queryFn: () =>
AuthLinksService.getCurrentUserInfo() });
/**
+* Get Partitioned Dag Runs
+* Return PartitionedDagRuns. Filter by dag_id and/or has_created_dag_run_id.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.hasCreatedDagRunId
+* @returns PartitionedDagRunCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const prefetchUsePartitionedDagRunServiceGetPartitionedDagRuns =
(queryClient: QueryClient, { dagId, hasCreatedDagRunId }: {
+ dagId?: string;
+ hasCreatedDagRunId?: boolean;
+} = {}) => queryClient.prefetchQuery({ queryKey:
Common.UsePartitionedDagRunServiceGetPartitionedDagRunsKeyFn({ dagId,
hasCreatedDagRunId }), queryFn: () =>
PartitionedDagRunService.getPartitionedDagRuns({ dagId, hasCreatedDagRunId })
});
+/**
+* Get Pending Partitioned Dag Run
+* Return full details for pending PartitionedDagRun.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.partitionKey
+* @returns PartitionedDagRunDetailResponse Successful Response
+* @throws ApiError
+*/
+export const prefetchUsePartitionedDagRunServiceGetPendingPartitionedDagRun =
(queryClient: QueryClient, { dagId, partitionKey }: {
+ dagId: string;
+ partitionKey: string;
+}) => queryClient.prefetchQuery({ queryKey:
Common.UsePartitionedDagRunServiceGetPendingPartitionedDagRunKeyFn({ dagId,
partitionKey }), queryFn: () =>
PartitionedDagRunService.getPendingPartitionedDagRun({ dagId, partitionKey })
});
+/**
* Get Dependencies
* Dependencies graph.
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index 434f0af6dcd..7bafcd6b706 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -1,7 +1,7 @@
// generated with @7nohe/[email protected]
import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from
"@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagParsingService, DagRunService, DagService,
DagSourceService, DagStatsService, DagVersionService, DagWarningService,
DashboardService, DeadlinesService, DependenciesService, EventLogService,
ExperimentalService, ExtraLinksService, GanttService, GridService,
ImportErrorService, JobService, LoginService, MonitorService, PluginService,
PoolService, ProviderService, StructureService [...]
+import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagParsingService, DagRunService, DagService,
DagSourceService, DagStatsService, DagVersionService, DagWarningService,
DashboardService, DeadlinesService, DependenciesService, EventLogService,
ExperimentalService, ExtraLinksService, GanttService, GridService,
ImportErrorService, JobService, LoginService, MonitorService,
PartitionedDagRunService, PluginService, PoolService, Provide [...]
import { BackfillPostBody, BulkBody_BulkTaskInstanceBody_,
BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_,
ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody,
DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState,
DagWarningType, PatchTaskInstanceBody, PoolBody, PoolPatchBody,
TaskInstancesBatchBody, TriggerDAGRunPostBody, UpdateHITLDetailPayload,
VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen";
import * as Common from "./common";
/**
@@ -1499,6 +1499,32 @@ export const useAuthLinksServiceGetAuthMenus = <TData =
Common.AuthLinksServiceG
*/
export const useAuthLinksServiceGetCurrentUserInfo = <TData =
Common.AuthLinksServiceGetCurrentUserInfoDefaultResponse, TError = unknown,
TQueryKey extends Array<unknown> = unknown[]>(queryKey?: TQueryKey, options?:
Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) =>
useQuery<TData, TError>({ queryKey:
Common.UseAuthLinksServiceGetCurrentUserInfoKeyFn(queryKey), queryFn: () =>
AuthLinksService.getCurrentUserInfo() as TData, ...options });
/**
+* Get Partitioned Dag Runs
+* Return PartitionedDagRuns. Filter by dag_id and/or has_created_dag_run_id.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.hasCreatedDagRunId
+* @returns PartitionedDagRunCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const usePartitionedDagRunServiceGetPartitionedDagRuns = <TData =
Common.PartitionedDagRunServiceGetPartitionedDagRunsDefaultResponse, TError =
unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId,
hasCreatedDagRunId }: {
+ dagId?: string;
+ hasCreatedDagRunId?: boolean;
+} = {}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UsePartitionedDagRunServiceGetPartitionedDagRunsKeyFn({ dagId,
hasCreatedDagRunId }, queryKey), queryFn: () =>
PartitionedDagRunService.getPartitionedDagRuns({ dagId, hasCreatedDagRunId })
as TData, ...options });
+/**
+* Get Pending Partitioned Dag Run
+* Return full details for pending PartitionedDagRun.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.partitionKey
+* @returns PartitionedDagRunDetailResponse Successful Response
+* @throws ApiError
+*/
+export const usePartitionedDagRunServiceGetPendingPartitionedDagRun = <TData =
Common.PartitionedDagRunServiceGetPendingPartitionedDagRunDefaultResponse,
TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId,
partitionKey }: {
+ dagId: string;
+ partitionKey: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UsePartitionedDagRunServiceGetPendingPartitionedDagRunKeyFn({ dagId,
partitionKey }, queryKey), queryFn: () =>
PartitionedDagRunService.getPendingPartitionedDagRun({ dagId, partitionKey })
as TData, ...options });
+/**
* Get Dependencies
* Dependencies graph.
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
index 05a2d3941ab..867f08383cf 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
@@ -1,7 +1,7 @@
// generated with @7nohe/[email protected]
import { UseQueryOptions, useSuspenseQuery } from "@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService,
DagStatsService, DagVersionService, DagWarningService, DashboardService,
DeadlinesService, DependenciesService, EventLogService, ExperimentalService,
ExtraLinksService, GanttService, GridService, ImportErrorService, JobService,
LoginService, MonitorService, PluginService, PoolService, ProviderService,
StructureService, TaskInstanceServi [...]
+import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService,
DagStatsService, DagVersionService, DagWarningService, DashboardService,
DeadlinesService, DependenciesService, EventLogService, ExperimentalService,
ExtraLinksService, GanttService, GridService, ImportErrorService, JobService,
LoginService, MonitorService, PartitionedDagRunService, PluginService,
PoolService, ProviderService, Structure [...]
import { DagRunState, DagWarningType } from "../requests/types.gen";
import * as Common from "./common";
/**
@@ -1499,6 +1499,32 @@ export const useAuthLinksServiceGetAuthMenusSuspense =
<TData = Common.AuthLinks
*/
export const useAuthLinksServiceGetCurrentUserInfoSuspense = <TData =
Common.AuthLinksServiceGetCurrentUserInfoDefaultResponse, TError = unknown,
TQueryKey extends Array<unknown> = unknown[]>(queryKey?: TQueryKey, options?:
Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) =>
useSuspenseQuery<TData, TError>({ queryKey:
Common.UseAuthLinksServiceGetCurrentUserInfoKeyFn(queryKey), queryFn: () =>
AuthLinksService.getCurrentUserInfo() as TData, ...options });
/**
+* Get Partitioned Dag Runs
+* Return PartitionedDagRuns. Filter by dag_id and/or has_created_dag_run_id.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.hasCreatedDagRunId
+* @returns PartitionedDagRunCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const usePartitionedDagRunServiceGetPartitionedDagRunsSuspense = <TData
= Common.PartitionedDagRunServiceGetPartitionedDagRunsDefaultResponse, TError =
unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId,
hasCreatedDagRunId }: {
+ dagId?: string;
+ hasCreatedDagRunId?: boolean;
+} = {}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UsePartitionedDagRunServiceGetPartitionedDagRunsKeyFn({ dagId,
hasCreatedDagRunId }, queryKey), queryFn: () =>
PartitionedDagRunService.getPartitionedDagRuns({ dagId, hasCreatedDagRunId })
as TData, ...options });
+/**
+* Get Pending Partitioned Dag Run
+* Return full details for pending PartitionedDagRun.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.partitionKey
+* @returns PartitionedDagRunDetailResponse Successful Response
+* @throws ApiError
+*/
+export const usePartitionedDagRunServiceGetPendingPartitionedDagRunSuspense =
<TData =
Common.PartitionedDagRunServiceGetPendingPartitionedDagRunDefaultResponse,
TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId,
partitionKey }: {
+ dagId: string;
+ partitionKey: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UsePartitionedDagRunServiceGetPendingPartitionedDagRunKeyFn({ dagId,
partitionKey }, queryKey), queryFn: () =>
PartitionedDagRunService.getPendingPartitionedDagRun({ dagId, partitionKey })
as TData, ...options });
+/**
* Get Dependencies
* Dependencies graph.
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index b9e305f07c9..88c516866e4 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -8393,6 +8393,223 @@ export const $OklchColor = {
type: 'string'
} as const;
+export const $PartitionedDagRunAssetResponse = {
+ properties: {
+ asset_id: {
+ type: 'integer',
+ title: 'Asset Id'
+ },
+ asset_name: {
+ type: 'string',
+ title: 'Asset Name'
+ },
+ asset_uri: {
+ type: 'string',
+ title: 'Asset Uri'
+ },
+ received: {
+ type: 'boolean',
+ title: 'Received'
+ }
+ },
+ type: 'object',
+ required: ['asset_id', 'asset_name', 'asset_uri', 'received'],
+ title: 'PartitionedDagRunAssetResponse',
+ description: 'Asset info within a partitioned Dag run detail.'
+} as const;
+
+export const $PartitionedDagRunCollectionResponse = {
+ properties: {
+ partitioned_dag_runs: {
+ items: {
+ '$ref': '#/components/schemas/PartitionedDagRunResponse'
+ },
+ type: 'array',
+ title: 'Partitioned Dag Runs'
+ },
+ total: {
+ type: 'integer',
+ title: 'Total'
+ },
+ asset_expressions: {
+ anyOf: [
+ {
+ additionalProperties: {
+ anyOf: [
+ {
+ additionalProperties: true,
+ type: 'object'
+ },
+ {
+ type: 'null'
+ }
+ ]
+ },
+ type: 'object'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Asset Expressions'
+ }
+ },
+ type: 'object',
+ required: ['partitioned_dag_runs', 'total'],
+ title: 'PartitionedDagRunCollectionResponse',
+ description: 'Collection of partitioned Dag runs.'
+} as const;
+
+export const $PartitionedDagRunDetailResponse = {
+ properties: {
+ id: {
+ type: 'integer',
+ title: 'Id'
+ },
+ dag_id: {
+ type: 'string',
+ title: 'Dag Id'
+ },
+ partition_key: {
+ type: 'string',
+ title: 'Partition Key'
+ },
+ created_at: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Created At'
+ },
+ updated_at: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Updated At'
+ },
+ created_dag_run_id: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Created Dag Run Id'
+ },
+ assets: {
+ items: {
+ '$ref': '#/components/schemas/PartitionedDagRunAssetResponse'
+ },
+ type: 'array',
+ title: 'Assets'
+ },
+ total_required: {
+ type: 'integer',
+ title: 'Total Required'
+ },
+ total_received: {
+ type: 'integer',
+ title: 'Total Received'
+ },
+ asset_expression: {
+ anyOf: [
+ {
+ additionalProperties: true,
+ type: 'object'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Asset Expression'
+ }
+ },
+ type: 'object',
+ required: ['id', 'dag_id', 'partition_key', 'assets', 'total_required',
'total_received'],
+ title: 'PartitionedDagRunDetailResponse',
+ description: 'Detail of a single partitioned Dag run.'
+} as const;
+
+export const $PartitionedDagRunResponse = {
+ properties: {
+ id: {
+ type: 'integer',
+ title: 'Id'
+ },
+ partition_key: {
+ type: 'string',
+ title: 'Partition Key'
+ },
+ created_at: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Created At'
+ },
+ total_received: {
+ type: 'integer',
+ title: 'Total Received'
+ },
+ total_required: {
+ type: 'integer',
+ title: 'Total Required'
+ },
+ dag_id: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Dag Id'
+ },
+ state: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'State'
+ },
+ created_dag_run_id: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Created Dag Run Id'
+ }
+ },
+ type: 'object',
+ required: ['id', 'partition_key', 'total_received', 'total_required'],
+ title: 'PartitionedDagRunResponse',
+ description: 'Single partitioned Dag run item.'
+} as const;
+
export const $StandardHookFields = {
properties: {
description: {
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index 63107cb97b6..99a9f0d0e23 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -3,7 +3,7 @@
import type { CancelablePromise } from './core/CancelablePromise';
import { OpenAPI } from './core/OpenAPI';
import { request as __request } from './core/request';
-import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData,
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse,
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData,
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse,
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse,
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData,
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse,
Dele [...]
+import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData,
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse,
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData,
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse,
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse,
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData,
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse,
Dele [...]
export class AssetService {
/**
@@ -3850,6 +3850,55 @@ export class AuthLinksService {
}
+export class PartitionedDagRunService {
+ /**
+ * Get Partitioned Dag Runs
+ * Return PartitionedDagRuns. Filter by dag_id and/or
has_created_dag_run_id.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.hasCreatedDagRunId
+ * @returns PartitionedDagRunCollectionResponse Successful Response
+ * @throws ApiError
+ */
+ public static getPartitionedDagRuns(data: GetPartitionedDagRunsData = {}):
CancelablePromise<GetPartitionedDagRunsResponse> {
+ return __request(OpenAPI, {
+ method: 'GET',
+ url: '/ui/partitioned_dag_runs',
+ query: {
+ dag_id: data.dagId,
+ has_created_dag_run_id: data.hasCreatedDagRunId
+ },
+ errors: {
+ 422: 'Validation Error'
+ }
+ });
+ }
+
+ /**
+ * Get Pending Partitioned Dag Run
+ * Return full details for pending PartitionedDagRun.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.partitionKey
+ * @returns PartitionedDagRunDetailResponse Successful Response
+ * @throws ApiError
+ */
+ public static getPendingPartitionedDagRun(data:
GetPendingPartitionedDagRunData):
CancelablePromise<GetPendingPartitionedDagRunResponse> {
+ return __request(OpenAPI, {
+ method: 'GET',
+ url: '/ui/pending_partitioned_dag_run/{dag_id}/{partition_key}',
+ path: {
+ dag_id: data.dagId,
+ partition_key: data.partitionKey
+ },
+ errors: {
+ 422: 'Validation Error'
+ }
+ });
+ }
+
+}
+
export class DependenciesService {
/**
* Get Dependencies
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index aa596d50253..7b334da247d 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -2065,6 +2065,61 @@ export type NodeResponse = {
export type OklchColor = string;
+/**
+ * Asset info within a partitioned Dag run detail.
+ */
+export type PartitionedDagRunAssetResponse = {
+ asset_id: number;
+ asset_name: string;
+ asset_uri: string;
+ received: boolean;
+};
+
+/**
+ * Collection of partitioned Dag runs.
+ */
+export type PartitionedDagRunCollectionResponse = {
+ partitioned_dag_runs: Array<PartitionedDagRunResponse>;
+ total: number;
+ asset_expressions?: {
+ [key: string]: ({
+ [key: string]: unknown;
+} | null);
+} | null;
+};
+
+/**
+ * Detail of a single partitioned Dag run.
+ */
+export type PartitionedDagRunDetailResponse = {
+ id: number;
+ dag_id: string;
+ partition_key: string;
+ created_at?: string | null;
+ updated_at?: string | null;
+ created_dag_run_id?: string | null;
+ assets: Array<PartitionedDagRunAssetResponse>;
+ total_required: number;
+ total_received: number;
+ asset_expression?: {
+ [key: string]: unknown;
+} | null;
+};
+
+/**
+ * Single partitioned Dag run item.
+ */
+export type PartitionedDagRunResponse = {
+ id: number;
+ partition_key: string;
+ created_at?: string | null;
+ total_received: number;
+ total_required: number;
+ dag_id?: string | null;
+ state?: string | null;
+ created_dag_run_id?: string | null;
+};
+
/**
* Standard fields of a Hook that a form will render.
*/
@@ -3460,6 +3515,20 @@ export type GetAuthMenusResponse =
MenuItemCollectionResponse;
export type GetCurrentUserInfoResponse = AuthenticatedMeResponse;
+export type GetPartitionedDagRunsData = {
+ dagId?: string | null;
+ hasCreatedDagRunId?: boolean | null;
+};
+
+export type GetPartitionedDagRunsResponse =
PartitionedDagRunCollectionResponse;
+
+export type GetPendingPartitionedDagRunData = {
+ dagId: string;
+ partitionKey: string;
+};
+
+export type GetPendingPartitionedDagRunResponse =
PartitionedDagRunDetailResponse;
+
export type GetDependenciesData = {
dependencyType?: 'scheduling' | 'data';
nodeId?: string | null;
@@ -6649,6 +6718,36 @@ export type $OpenApiTs = {
};
};
};
+ '/ui/partitioned_dag_runs': {
+ get: {
+ req: GetPartitionedDagRunsData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: PartitionedDagRunCollectionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
+ '/ui/pending_partitioned_dag_run/{dag_id}/{partition_key}': {
+ get: {
+ req: GetPendingPartitionedDagRunData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: PartitionedDagRunDetailResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
'/ui/dependencies': {
get: {
req: GetDependenciesData;
diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
index 396359107e7..1846f214024 100644
--- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
+++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
@@ -161,6 +161,13 @@
"placeholder": "Add a note...",
"taskInstance": "Task Instance Note"
},
+ "partitionedDagRun": "Partitioned Dag Run",
+ "partitionedDagRun_other": "Partitioned Dag Runs",
+ "partitionedDagRunDetail": {
+ "receivedAssetEvents": "Received Asset Events"
+ },
+ "pendingDagRun": "{{count}} Pending Dag Run",
+ "pendingDagRun_other": "{{count}} Pending Dag Runs",
"reset": "Reset",
"runId": "Run ID",
"runTypes": {
diff --git
a/airflow-core/src/airflow/ui/src/components/AssetExpression/AssetExpression.tsx
b/airflow-core/src/airflow/ui/src/components/AssetExpression/AssetExpression.tsx
index 9b49fd8b947..9b2a8e3b4f2 100644
---
a/airflow-core/src/airflow/ui/src/components/AssetExpression/AssetExpression.tsx
+++
b/airflow-core/src/airflow/ui/src/components/AssetExpression/AssetExpression.tsx
@@ -31,11 +31,11 @@ export const AssetExpression = ({
expression,
}: {
readonly events?: Array<NextRunEvent>;
- readonly expression: ExpressionType | null;
+ readonly expression: ExpressionType | undefined;
}) => {
const { t: translate } = useTranslation("common");
- if (expression === null) {
+ if (expression === undefined) {
return undefined;
}
diff --git a/airflow-core/src/airflow/ui/src/components/AssetProgressCell.tsx
b/airflow-core/src/airflow/ui/src/components/AssetProgressCell.tsx
new file mode 100644
index 00000000000..7d305a03fb7
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/components/AssetProgressCell.tsx
@@ -0,0 +1,67 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Button } from "@chakra-ui/react";
+import { FiDatabase } from "react-icons/fi";
+
+import { usePartitionedDagRunServiceGetPendingPartitionedDagRun } from
"openapi/queries";
+import type { PartitionedDagRunAssetResponse } from
"openapi/requests/types.gen";
+import { AssetExpression, type ExpressionType } from
"src/components/AssetExpression";
+import type { NextRunEvent } from "src/components/AssetExpression/types";
+import { Popover } from "src/components/ui";
+
+type Props = {
+ readonly dagId: string;
+ readonly partitionKey: string;
+ readonly totalReceived: number;
+ readonly totalRequired: number;
+};
+
+export const AssetProgressCell = ({ dagId, partitionKey, totalReceived,
totalRequired }: Props) => {
+ const { data, isLoading } =
usePartitionedDagRunServiceGetPendingPartitionedDagRun({ dagId, partitionKey });
+
+ const assetExpression = data?.asset_expression as ExpressionType | undefined;
+ const assets: Array<PartitionedDagRunAssetResponse> = data?.assets ?? [];
+
+ const events: Array<NextRunEvent> = assets
+ .filter((ak: PartitionedDagRunAssetResponse) => ak.received)
+ .map((ak: PartitionedDagRunAssetResponse) => ({
+ id: ak.asset_id,
+ lastUpdate: "received",
+ name: ak.asset_name,
+ uri: ak.asset_uri,
+ }));
+
+ return (
+ // eslint-disable-next-line jsx-a11y/no-autofocus
+ <Popover.Root autoFocus={false} lazyMount positioning={{ placement:
"bottom-end" }} unmountOnExit>
+ <Popover.Trigger asChild>
+ <Button loading={isLoading} paddingInline={0} size="sm"
variant="ghost">
+ <FiDatabase style={{ display: "inline" }} />
+ {`${String(totalReceived)} / ${String(totalRequired)}`}
+ </Button>
+ </Popover.Trigger>
+ <Popover.Content css={{ "--popover-bg": "colors.bg.emphasized" }}
width="fit-content">
+ <Popover.Arrow />
+ <Popover.Body>
+ <AssetExpression events={events} expression={assetExpression} />
+ </Popover.Body>
+ </Popover.Content>
+ </Popover.Root>
+ );
+};
diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
b/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
index 2a801f473f0..dd7a7a45d3b 100644
--- a/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
@@ -18,6 +18,7 @@
*/
import { Button, HStack, Link, Text } from "@chakra-ui/react";
import dayjs from "dayjs";
+import { useState } from "react";
import { useTranslation } from "react-i18next";
import { FiDatabase } from "react-icons/fi";
import { Link as RouterLink } from "react-router-dom";
@@ -28,6 +29,8 @@ import type { NextRunEvent } from
"src/components/AssetExpression/types";
import { TruncatedText } from "src/components/TruncatedText";
import { Popover } from "src/components/ui";
+import { PartitionScheduleModal } from "./PartitionScheduleModal";
+
type Props = {
readonly assetExpression?: ExpressionType | null;
readonly dagId: string;
@@ -35,18 +38,44 @@ type Props = {
readonly timetableSummary: string | null;
};
+type PartitionScheduleProps = {
+ readonly dagId: string;
+ readonly isLoading: boolean;
+ readonly pendingCount: number;
+};
+
+const PartitionSchedule = ({ dagId, isLoading, pendingCount }:
PartitionScheduleProps) => {
+ const { t: translate } = useTranslation("common");
+ const [open, setOpen] = useState(false);
+
+ return (
+ <>
+ <Button loading={isLoading} onClick={() => setOpen(true)}
paddingInline={0} size="sm" variant="ghost">
+ <FiDatabase style={{ display: "inline" }} />
+ {translate("pendingDagRun", { count: pendingCount })}
+ </Button>
+ <PartitionScheduleModal dagId={dagId} onClose={() => setOpen(false)}
open={open} />
+ </>
+ );
+};
+
export const AssetSchedule = ({ assetExpression, dagId, latestRunAfter,
timetableSummary }: Props) => {
- const { t: translate } = useTranslation("dags");
+ const { t: translate } = useTranslation(["dags", "common"]);
const { data: nextRun, isLoading } = useAssetServiceNextRunAssets({ dagId });
+ const isPartitioned = timetableSummary === "Partitioned Asset";
+
const nextRunEvents = (nextRun?.events ?? []) as Array<NextRunEvent>;
const pendingEvents = nextRunEvents.filter((ev) => {
- if (ev.lastUpdate !== null && latestRunAfter !== undefined) {
- return dayjs(ev.lastUpdate).isAfter(latestRunAfter);
+ if (ev.lastUpdate === null) {
+ return false;
+ }
+ if (isPartitioned) {
+ return true;
}
- return false;
+ return latestRunAfter !== undefined &&
dayjs(ev.lastUpdate).isAfter(latestRunAfter);
});
if (!nextRunEvents.length) {
@@ -58,6 +87,25 @@ export const AssetSchedule = ({ assetExpression, dagId,
latestRunAfter, timetabl
);
}
+ if (isPartitioned) {
+ const pendingCount = (nextRun?.pending_partition_count as number |
undefined) ?? 0;
+
+ if (pendingCount === 0) {
+ return (
+ <HStack>
+ <FiDatabase style={{ display: "inline", flexShrink: 0 }} />
+ <Text>{translate("common:runTypes.asset_triggered")}</Text>
+ </HStack>
+ );
+ }
+
+ if (pendingCount > 1) {
+ return <PartitionSchedule dagId={dagId} isLoading={isLoading}
pendingCount={pendingCount} />;
+ }
+
+ // pendingCount === 1: fall through to the standard asset popover below
+ }
+
const [asset] = nextRunEvents;
if (nextRunEvents.length === 1 && asset !== undefined) {
diff --git
a/airflow-core/src/airflow/ui/src/pages/DagsList/PartitionScheduleModal.tsx
b/airflow-core/src/airflow/ui/src/pages/DagsList/PartitionScheduleModal.tsx
new file mode 100644
index 00000000000..9a5e2dcb0d8
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/pages/DagsList/PartitionScheduleModal.tsx
@@ -0,0 +1,110 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Heading, HStack, Text } from "@chakra-ui/react";
+import type { ColumnDef } from "@tanstack/react-table";
+import { useTranslation } from "react-i18next";
+import { FiDatabase } from "react-icons/fi";
+
+import { usePartitionedDagRunServiceGetPartitionedDagRuns } from
"openapi/queries";
+import type { PartitionedDagRunResponse } from "openapi/requests/types.gen";
+import { AssetProgressCell } from "src/components/AssetProgressCell";
+import { DataTable } from "src/components/DataTable";
+import { ErrorAlert } from "src/components/ErrorAlert";
+import Time from "src/components/Time";
+import { Dialog } from "src/components/ui";
+
+type PartitionScheduleModalProps = {
+ readonly dagId: string;
+ readonly onClose: () => void;
+ readonly open: boolean;
+};
+
+const getColumns = (
+ translate: (key: string) => string,
+ dagId: string,
+): Array<ColumnDef<PartitionedDagRunResponse>> => [
+ {
+ accessorKey: "partition_key",
+ enableSorting: false,
+ header: translate("dagRun.mappedPartitionKey"),
+ },
+ {
+ accessorKey: "created_at",
+ cell: ({ row }) => (
+ <Text>
+ <Time datetime={row.original.created_at} />
+ </Text>
+ ),
+ enableSorting: false,
+ header: translate("table.createdAt"),
+ },
+ {
+ accessorKey: "total_received",
+ cell: ({ row }) => (
+ <AssetProgressCell
+ dagId={dagId}
+ partitionKey={row.original.partition_key}
+ totalReceived={row.original.total_received}
+ totalRequired={row.original.total_required}
+ />
+ ),
+ enableSorting: false,
+ header: translate("partitionedDagRunDetail.receivedAssetEvents"),
+ },
+];
+
+export const PartitionScheduleModal = ({ dagId, onClose, open }:
PartitionScheduleModalProps) => {
+ const { t: translate } = useTranslation("common");
+
+ const { data, error, isFetching, isLoading } =
usePartitionedDagRunServiceGetPartitionedDagRuns(
+ { dagId, hasCreatedDagRunId: false },
+ undefined,
+ { enabled: open },
+ );
+
+ const partitionedDagRuns = data?.partitioned_dag_runs ?? [];
+ const total = data?.total ?? 0;
+ const columns = getColumns(translate, dagId);
+
+ return (
+ <Dialog.Root lazyMount onOpenChange={onClose} open={open}
scrollBehavior="inside" size="xl" unmountOnExit>
+ <Dialog.Content backdrop>
+ <Dialog.Header>
+ <HStack>
+ <FiDatabase />
+ <Heading size="md">{translate("pendingDagRun", { count: total
})}</Heading>
+ </HStack>
+ </Dialog.Header>
+ <Dialog.CloseTrigger />
+ <Dialog.Body>
+ <ErrorAlert error={error} />
+ <DataTable
+ columns={columns}
+ data={partitionedDagRuns}
+ isFetching={isFetching}
+ isLoading={isLoading}
+ modelName="partitionedDagRun"
+ showRowCountHeading={false}
+ total={total}
+ />
+ </Dialog.Body>
+ </Dialog.Content>
+ </Dialog.Root>
+ );
+};
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_assets.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_assets.py
index 3e9dd95b308..eaf3e5031fc 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_assets.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_assets.py
@@ -22,12 +22,19 @@ import pendulum
import pytest
from sqlalchemy import select
-from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
+from airflow.models.asset import (
+ AssetDagRunQueue,
+ AssetEvent,
+ AssetModel,
+ AssetPartitionDagRun,
+ PartitionedAssetKeyLog,
+)
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk.definitions.asset import Asset
+from airflow.sdk.definitions.timetables.assets import PartitionedAssetTimetable
from tests_common.test_utils.asserts import assert_queries_count
-from tests_common.test_utils.db import clear_db_dags, clear_db_serialized_dags
+from tests_common.test_utils.db import clear_db_apdr, clear_db_dags,
clear_db_pakl, clear_db_serialized_dags
pytestmark = pytest.mark.db_test
@@ -36,6 +43,8 @@ pytestmark = pytest.mark.db_test
def cleanup():
clear_db_dags()
clear_db_serialized_dags()
+ clear_db_apdr()
+ clear_db_pakl()
class TestNextRunAssets:
@@ -162,3 +171,54 @@ class TestNextRunAssets:
ev = resp.json()["events"][0]
assert ev["lastUpdate"] is not None
assert "queued" not in ev
+
+ @pytest.mark.parametrize(
+ ("fulfilled", "expect_last_update"),
+ [(False, True), (True, False)],
+ ids=["pending", "fulfilled"],
+ )
+ def test_partitioned_dag_last_update(
+ self, test_client, dag_maker, session, fulfilled, expect_last_update
+ ):
+ asset = Asset(uri="s3://bucket/part", name="part")
+ with dag_maker(
+ dag_id="part_dag",
+ schedule=PartitionedAssetTimetable(assets=asset),
+ serialized=True,
+ ):
+ EmptyOperator(task_id="t")
+
+ dr = dag_maker.create_dagrun()
+ dag_maker.sync_dagbag_to_db()
+
+ asset_model = session.scalars(select(AssetModel).where(AssetModel.uri
== "s3://bucket/part")).one()
+ event = AssetEvent(
+ asset_id=asset_model.id, timestamp=(dr.logical_date or
pendulum.now()).add(minutes=5)
+ )
+ session.add(event)
+ session.flush()
+
+ pdr = AssetPartitionDagRun(
+ target_dag_id="part_dag",
+ partition_key="2024-01-01",
+ created_dag_run_id=dr.id if fulfilled else None,
+ )
+ session.add(pdr)
+ session.flush()
+
+ session.add(
+ PartitionedAssetKeyLog(
+ asset_id=asset_model.id,
+ asset_event_id=event.id,
+ asset_partition_dag_run_id=pdr.id,
+ source_partition_key="2024-01-01",
+ target_dag_id="part_dag",
+ target_partition_key="2024-01-01",
+ )
+ )
+ session.commit()
+
+ resp = test_client.get("/next_run_assets/part_dag")
+ assert resp.status_code == 200
+ ev = resp.json()["events"][0]
+ assert (ev["lastUpdate"] is not None) == expect_last_update
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_partitioned_dag_runs.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_partitioned_dag_runs.py
new file mode 100644
index 00000000000..52f0ec1e265
--- /dev/null
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_partitioned_dag_runs.py
@@ -0,0 +1,276 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pendulum
+import pytest
+from sqlalchemy import select
+
+from airflow.models.asset import AssetEvent, AssetModel, AssetPartitionDagRun,
PartitionedAssetKeyLog
+from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk.definitions.asset import Asset
+from airflow.sdk.definitions.timetables.assets import PartitionedAssetTimetable
+
+from tests_common.test_utils.asserts import assert_queries_count
+from tests_common.test_utils.db import clear_db_apdr, clear_db_dags,
clear_db_pakl, clear_db_serialized_dags
+
+pytestmark = pytest.mark.db_test
+
+
[email protected](autouse=True)
+def cleanup():
+ clear_db_dags()
+ clear_db_serialized_dags()
+ clear_db_apdr()
+ clear_db_pakl()
+
+
+class TestGetPartitionedDagRuns:
+ def test_should_response_401(self, unauthenticated_test_client):
+ response =
unauthenticated_test_client.get("/partitioned_dag_runs?dag_id=any")
+ assert response.status_code == 401
+
+ def test_should_response_403(self, unauthorized_test_client):
+ response =
unauthorized_test_client.get("/partitioned_dag_runs?dag_id=any")
+ assert response.status_code == 403
+
+ def test_should_response_404(self, test_client):
+ assert
test_client.get("/partitioned_dag_runs?dag_id=no_such_dag").status_code == 404
+
+ def test_should_response_200_non_partitioned_dag_returns_empty(self,
test_client, dag_maker):
+ with dag_maker(dag_id="normal", schedule=[Asset(uri="s3://bucket/a",
name="a")], serialized=True):
+ EmptyOperator(task_id="t")
+ dag_maker.create_dagrun()
+ dag_maker.sync_dagbag_to_db()
+
+ with assert_queries_count(2):
+ resp =
test_client.get("/partitioned_dag_runs?dag_id=normal&has_created_dag_run_id=false")
+ assert resp.status_code == 200
+ assert resp.json() == {"partitioned_dag_runs": [], "total": 0,
"asset_expressions": None}
+
+ @pytest.mark.parametrize(
+ (
+ "num_assets",
+ "received_count",
+ "fulfilled",
+ "has_created_dag_run_id",
+ "expected_total",
+ "expected_state",
+ ),
+ [
+ (1, 1, False, False, 1, "pending"),
+ (1, 1, True, False, 0, None),
+ (1, 1, True, True, 1, "running"),
+ (3, 0, False, False, 1, "pending"),
+ (3, 1, False, False, 1, "pending"),
+ (3, 2, False, False, 1, "pending"),
+ (3, 3, False, False, 1, "pending"),
+ ],
+ ids=[
+ "filter-pending-included",
+ "filter-fulfilled-excluded",
+ "filter-fulfilled-included",
+ "received-0/3",
+ "received-1/3",
+ "received-2/3",
+ "received-3/3",
+ ],
+ )
+ def test_should_response_200(
+ self,
+ test_client,
+ dag_maker,
+ session,
+ num_assets,
+ received_count,
+ fulfilled,
+ has_created_dag_run_id,
+ expected_total,
+ expected_state,
+ ):
+ uris = [f"s3://bucket/lr{i}" for i in range(num_assets)]
+ asset_defs = [Asset(uri=uri, name=f"lr{i}") for i, uri in
enumerate(uris)]
+ schedule = asset_defs[0]
+ for a in asset_defs[1:]:
+ schedule = schedule & a
+
+ with dag_maker(
+ dag_id="list_dag",
+ schedule=PartitionedAssetTimetable(assets=schedule),
+ serialized=True,
+ ):
+ EmptyOperator(task_id="t")
+
+ dr = dag_maker.create_dagrun()
+ dag_maker.sync_dagbag_to_db()
+
+ assets = {a.uri: a for a in
session.scalars(select(AssetModel).where(AssetModel.uri.in_(uris)))}
+
+ pdr = AssetPartitionDagRun(
+ target_dag_id="list_dag",
+ partition_key="2024-06-01",
+ created_dag_run_id=dr.id if fulfilled else None,
+ )
+ session.add(pdr)
+ session.flush()
+
+ for uri in uris[:received_count]:
+ event = AssetEvent(asset_id=assets[uri].id,
timestamp=pendulum.now())
+ session.add(event)
+ session.flush()
+ session.add(
+ PartitionedAssetKeyLog(
+ asset_id=assets[uri].id,
+ asset_event_id=event.id,
+ asset_partition_dag_run_id=pdr.id,
+ source_partition_key="2024-06-01",
+ target_dag_id="list_dag",
+ target_partition_key="2024-06-01",
+ )
+ )
+ session.commit()
+
+ with assert_queries_count(2):
+ resp = test_client.get(
+ f"/partitioned_dag_runs?dag_id=list_dag"
+
f"&has_created_dag_run_id={str(has_created_dag_run_id).lower()}"
+ )
+ assert resp.status_code == 200
+ body = resp.json()
+ assert body["total"] == expected_total
+ if expected_total > 0:
+ pdr_resp = body["partitioned_dag_runs"][0]
+ assert pdr_resp["state"] == expected_state
+ assert pdr_resp["total_received"] == received_count
+ assert pdr_resp["total_required"] == num_assets
+
+
+class TestGetPendingPartitionedDagRun:
+ def test_should_response_401(self, unauthenticated_test_client):
+ response =
unauthenticated_test_client.get("/pending_partitioned_dag_run/any_dag/any_key")
+ assert response.status_code == 401
+
+ def test_should_response_403(self, unauthorized_test_client):
+ response =
unauthorized_test_client.get("/pending_partitioned_dag_run/any_dag/any_key")
+ assert response.status_code == 403
+
+ @pytest.mark.parametrize(
+ ("dag_id", "partition_key", "fulfilled"),
+ [
+ ("no_dag", "no_key", False),
+ ("fulfilled_dag", "2024-07-01", True),
+ ],
+ ids=[
+ "not-found",
+ "fulfilled-excluded",
+ ],
+ )
+ def test_should_response_404(self, test_client, dag_maker, session,
dag_id, partition_key, fulfilled):
+ if fulfilled:
+ with dag_maker(
+ dag_id="fulfilled_dag",
+
schedule=PartitionedAssetTimetable(assets=Asset(uri="s3://bucket/ful0",
name="ful0")),
+ serialized=True,
+ ):
+ EmptyOperator(task_id="t")
+
+ dr = dag_maker.create_dagrun()
+ dag_maker.sync_dagbag_to_db()
+
+ session.add(
+ AssetPartitionDagRun(
+ target_dag_id="fulfilled_dag",
+ partition_key="2024-07-01",
+ created_dag_run_id=dr.id,
+ )
+ )
+ session.commit()
+
+ resp =
test_client.get(f"/pending_partitioned_dag_run/{dag_id}/{partition_key}")
+ assert resp.status_code == 404
+
+ @pytest.mark.parametrize(
+ ("num_assets", "received_count"),
+ [
+ (1, 1),
+ (1, 0),
+ (2, 1),
+ (2, 2),
+ (2, 0),
+ ],
+ ids=[
+ "1-asset-received-pending",
+ "1-asset-none-received-pending",
+ "2-assets-partial-pending",
+ "2-assets-all-received-pending",
+ "2-assets-none-received-pending",
+ ],
+ )
+ def test_should_response_200(self, test_client, dag_maker, session,
num_assets, received_count):
+ uris = [f"s3://bucket/dt{i}" for i in range(num_assets)]
+ asset_defs = [Asset(uri=uri, name=f"dt{i}") for i, uri in
enumerate(uris)]
+ schedule = asset_defs[0] if num_assets == 1 else asset_defs[0] &
asset_defs[1]
+
+ with dag_maker(
+ dag_id="detail_dag",
+ schedule=PartitionedAssetTimetable(assets=schedule),
+ serialized=True,
+ ):
+ EmptyOperator(task_id="t")
+
+ dag_maker.create_dagrun()
+ dag_maker.sync_dagbag_to_db()
+
+ assets = {a.uri: a for a in
session.scalars(select(AssetModel).where(AssetModel.uri.in_(uris)))}
+
+ pdr = AssetPartitionDagRun(
+ target_dag_id="detail_dag",
+ partition_key="2024-07-01",
+ created_dag_run_id=None,
+ )
+ session.add(pdr)
+ session.flush()
+
+ for uri in uris[:received_count]:
+ event = AssetEvent(asset_id=assets[uri].id,
timestamp=pendulum.now())
+ session.add(event)
+ session.flush()
+ session.add(
+ PartitionedAssetKeyLog(
+ asset_id=assets[uri].id,
+ asset_event_id=event.id,
+ asset_partition_dag_run_id=pdr.id,
+ source_partition_key="2024-07-01",
+ target_dag_id="detail_dag",
+ target_partition_key="2024-07-01",
+ )
+ )
+ session.commit()
+
+ resp =
test_client.get("/pending_partitioned_dag_run/detail_dag/2024-07-01")
+ assert resp.status_code == 200
+ body = resp.json()
+ assert body["dag_id"] == "detail_dag"
+ assert body["partition_key"] == "2024-07-01"
+ assert body["total_required"] == num_assets
+ assert body["total_received"] == received_count
+ assert len(body["assets"]) == num_assets
+ assert body["asset_expression"] is not None
+ assert body["created_dag_run_id"] is None
+
+ received_uris = {a["asset_uri"] for a in body["assets"] if
a["received"]}
+ assert received_uris == set(uris[:received_count])