This is an automated email from the ASF dual-hosted git repository.

guanmingchiu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e0e4ab451ce Add UI for AssetPartitionDagRun (#61398)
e0e4ab451ce is described below

commit e0e4ab451ce084a860044cbf53ae41a863ae16f6
Author: Guan-Ming (Wesley) Chiu <[email protected]>
AuthorDate: Tue Feb 24 12:31:04 2026 +0800

    Add UI for AssetPartitionDagRun (#61398)
    
    * Add AssetPartitionDagRun ui
    
    * Replace dag with Dag
    
    * Refactor backend
    
    Signed-off-by: Guan-Ming (Wesley) Chiu 
<[email protected]>
    
    * Fix small changes
    
    Signed-off-by: Guan-Ming (Wesley) Chiu 
<[email protected]>
    
    * Remove PartitionedDagRun components and related routes from the UI
    
    Signed-off-by: Guan-Ming (Wesley) Chiu 
<[email protected]>
    
    * Fix assets test
    
    Signed-off-by: Guan-Ming (Wesley) Chiu 
<[email protected]>
    
    * Update translation key for received asset events
    
    Signed-off-by: Guan-Ming (Wesley) Chiu 
<[email protected]>
    
    * Fix query for pending apdr
    
    Signed-off-by: Guan-Ming (Wesley) Chiu 
<[email protected]>
    
    * Fix wording
    
    Co-authored-by: Wei Lee <[email protected]>
    Signed-off-by: Guan-Ming (Wesley) Chiu 
<[email protected]>
    
    ---------
    
    Signed-off-by: Guan-Ming (Wesley) Chiu 
<[email protected]>
    Co-authored-by: Wei Lee <[email protected]>
---
 .../src/airflow/api_fastapi/common/parameters.py   |  23 ++
 .../core_api/datamodels/ui/partitioned_dag_runs.py |  64 +++++
 .../api_fastapi/core_api/openapi/_private_ui.yaml  | 219 ++++++++++++++++
 .../api_fastapi/core_api/routes/ui/__init__.py     |   2 +
 .../api_fastapi/core_api/routes/ui/assets.py       |  89 ++++---
 .../core_api/routes/ui/partitioned_dag_runs.py     | 239 ++++++++++++++++++
 .../src/airflow/ui/openapi-gen/queries/common.ts   |  16 +-
 .../ui/openapi-gen/queries/ensureQueryData.ts      |  28 ++-
 .../src/airflow/ui/openapi-gen/queries/prefetch.ts |  28 ++-
 .../src/airflow/ui/openapi-gen/queries/queries.ts  |  28 ++-
 .../src/airflow/ui/openapi-gen/queries/suspense.ts |  28 ++-
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts | 217 ++++++++++++++++
 .../ui/openapi-gen/requests/services.gen.ts        |  51 +++-
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |  99 ++++++++
 .../airflow/ui/public/i18n/locales/en/common.json  |   7 +
 .../components/AssetExpression/AssetExpression.tsx |   4 +-
 .../ui/src/components/AssetProgressCell.tsx        |  67 +++++
 .../ui/src/pages/DagsList/AssetSchedule.tsx        |  56 ++++-
 .../src/pages/DagsList/PartitionScheduleModal.tsx  | 110 ++++++++
 .../api_fastapi/core_api/routes/ui/test_assets.py  |  64 ++++-
 .../routes/ui/test_partitioned_dag_runs.py         | 276 +++++++++++++++++++++
 21 files changed, 1670 insertions(+), 45 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py 
b/airflow-core/src/airflow/api_fastapi/common/parameters.py
index b6b1d255264..e963cdbff55 100644
--- a/airflow-core/src/airflow/api_fastapi/common/parameters.py
+++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py
@@ -47,6 +47,7 @@ from airflow.models import Base
 from airflow.models.asset import (
     AssetAliasModel,
     AssetModel,
+    AssetPartitionDagRun,
     DagScheduleAssetReference,
     TaskInletAssetReference,
     TaskOutletAssetReference,
@@ -1040,6 +1041,28 @@ QueryAssetAliasNamePatternSearch = Annotated[
 QueryAssetDagIdPatternSearch = Annotated[
     _DagIdAssetReferenceFilter, Depends(_DagIdAssetReferenceFilter.depends)
 ]
+QueryPartitionedDagRunHasCreatedDagRunIdFilter = Annotated[
+    FilterParam[bool | None],
+    Depends(
+        filter_param_factory(
+            AssetPartitionDagRun.created_dag_run_id,
+            bool | None,
+            FilterOptionEnum.IS_NONE,
+            filter_name="has_created_dag_run_id",
+            transform_callable=lambda v: not v if v is not None else None,
+        )
+    ),
+]
+QueryPartitionedDagRunDagIdFilter = Annotated[
+    FilterParam[str | None],
+    Depends(
+        filter_param_factory(
+            AssetPartitionDagRun.target_dag_id,
+            str | None,
+            filter_name="dag_id",
+        )
+    ),
+]
 
 # Variables
 QueryVariableKeyPatternSearch = Annotated[
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/partitioned_dag_runs.py
 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/partitioned_dag_runs.py
new file mode 100644
index 00000000000..628f8560e96
--- /dev/null
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/partitioned_dag_runs.py
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.api_fastapi.core_api.base import BaseModel
+
+
+class PartitionedDagRunResponse(BaseModel):
+    """Single partitioned Dag run item."""
+
+    id: int
+    partition_key: str
+    created_at: str | None = None
+    total_received: int
+    total_required: int
+    dag_id: str | None = None
+    state: str | None = None
+    created_dag_run_id: str | None = None
+
+
+class PartitionedDagRunCollectionResponse(BaseModel):
+    """Collection of partitioned Dag runs."""
+
+    partitioned_dag_runs: list[PartitionedDagRunResponse]
+    total: int
+    asset_expressions: dict[str, dict | None] | None = None
+
+
+class PartitionedDagRunAssetResponse(BaseModel):
+    """Asset info within a partitioned Dag run detail."""
+
+    asset_id: int
+    asset_name: str
+    asset_uri: str
+    received: bool
+
+
+class PartitionedDagRunDetailResponse(BaseModel):
+    """Detail of a single partitioned Dag run."""
+
+    id: int
+    dag_id: str
+    partition_key: str
+    created_at: str | None = None
+    updated_at: str | None = None
+    created_dag_run_id: str | None = None
+    assets: list[PartitionedDagRunAssetResponse]
+    total_required: int
+    total_received: int
+    asset_expression: dict | None = None
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index c3f307afbe7..5fbf898c512 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -71,6 +71,82 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
+  /ui/partitioned_dag_runs:
+    get:
+      tags:
+      - PartitionedDagRun
+      summary: Get Partitioned Dag Runs
+      description: Return PartitionedDagRuns. Filter by dag_id and/or 
has_created_dag_run_id.
+      operationId: get_partitioned_dag_runs
+      security:
+      - OAuth2PasswordBearer: []
+      - HTTPBearer: []
+      parameters:
+      - name: dag_id
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Dag Id
+      - name: has_created_dag_run_id
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: boolean
+          - type: 'null'
+          title: Has Created Dag Run Id
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                $ref: 
'#/components/schemas/PartitionedDagRunCollectionResponse'
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
+  /ui/pending_partitioned_dag_run/{dag_id}/{partition_key}:
+    get:
+      tags:
+      - PartitionedDagRun
+      summary: Get Pending Partitioned Dag Run
+      description: Return full details for pending PartitionedDagRun.
+      operationId: get_pending_partitioned_dag_run
+      security:
+      - OAuth2PasswordBearer: []
+      - HTTPBearer: []
+      parameters:
+      - name: dag_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Id
+      - name: partition_key
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Partition Key
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/PartitionedDagRunDetailResponse'
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
   /ui/config:
     get:
       tags:
@@ -2568,6 +2644,149 @@ components:
       description: Node serializer for responses.
     OklchColor:
       type: string
+    PartitionedDagRunAssetResponse:
+      properties:
+        asset_id:
+          type: integer
+          title: Asset Id
+        asset_name:
+          type: string
+          title: Asset Name
+        asset_uri:
+          type: string
+          title: Asset Uri
+        received:
+          type: boolean
+          title: Received
+      type: object
+      required:
+      - asset_id
+      - asset_name
+      - asset_uri
+      - received
+      title: PartitionedDagRunAssetResponse
+      description: Asset info within a partitioned Dag run detail.
+    PartitionedDagRunCollectionResponse:
+      properties:
+        partitioned_dag_runs:
+          items:
+            $ref: '#/components/schemas/PartitionedDagRunResponse'
+          type: array
+          title: Partitioned Dag Runs
+        total:
+          type: integer
+          title: Total
+        asset_expressions:
+          anyOf:
+          - additionalProperties:
+              anyOf:
+              - additionalProperties: true
+                type: object
+              - type: 'null'
+            type: object
+          - type: 'null'
+          title: Asset Expressions
+      type: object
+      required:
+      - partitioned_dag_runs
+      - total
+      title: PartitionedDagRunCollectionResponse
+      description: Collection of partitioned Dag runs.
+    PartitionedDagRunDetailResponse:
+      properties:
+        id:
+          type: integer
+          title: Id
+        dag_id:
+          type: string
+          title: Dag Id
+        partition_key:
+          type: string
+          title: Partition Key
+        created_at:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Created At
+        updated_at:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Updated At
+        created_dag_run_id:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Created Dag Run Id
+        assets:
+          items:
+            $ref: '#/components/schemas/PartitionedDagRunAssetResponse'
+          type: array
+          title: Assets
+        total_required:
+          type: integer
+          title: Total Required
+        total_received:
+          type: integer
+          title: Total Received
+        asset_expression:
+          anyOf:
+          - additionalProperties: true
+            type: object
+          - type: 'null'
+          title: Asset Expression
+      type: object
+      required:
+      - id
+      - dag_id
+      - partition_key
+      - assets
+      - total_required
+      - total_received
+      title: PartitionedDagRunDetailResponse
+      description: Detail of a single partitioned Dag run.
+    PartitionedDagRunResponse:
+      properties:
+        id:
+          type: integer
+          title: Id
+        partition_key:
+          type: string
+          title: Partition Key
+        created_at:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Created At
+        total_received:
+          type: integer
+          title: Total Received
+        total_required:
+          type: integer
+          title: Total Required
+        dag_id:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Dag Id
+        state:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: State
+        created_dag_run_id:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Created Dag Run Id
+      type: object
+      required:
+      - id
+      - partition_key
+      - total_received
+      - total_required
+      title: PartitionedDagRunResponse
+      description: Single partitioned Dag run item.
     ReprocessBehavior:
       type: string
       enum:
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/__init__.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/__init__.py
index ce8744785cd..6bce499e38c 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/__init__.py
@@ -29,6 +29,7 @@ from airflow.api_fastapi.core_api.routes.ui.deadlines import 
deadlines_router
 from airflow.api_fastapi.core_api.routes.ui.dependencies import 
dependencies_router
 from airflow.api_fastapi.core_api.routes.ui.gantt import gantt_router
 from airflow.api_fastapi.core_api.routes.ui.grid import grid_router
+from airflow.api_fastapi.core_api.routes.ui.partitioned_dag_runs import 
partitioned_dag_runs_router
 from airflow.api_fastapi.core_api.routes.ui.structure import structure_router
 from airflow.api_fastapi.core_api.routes.ui.teams import teams_router
 
@@ -36,6 +37,7 @@ ui_router = AirflowRouter(prefix="/ui", 
include_in_schema=False)
 
 ui_router.include_router(auth_router)
 ui_router.include_router(assets_router)
+ui_router.include_router(partitioned_dag_runs_router)
 ui_router.include_router(config_router)
 ui_router.include_router(connections_router)
 ui_router.include_router(dags_router)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
index ce3639e1d8b..155b49726f6 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
@@ -14,18 +14,23 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 from __future__ import annotations
 
 from fastapi import Depends, HTTPException, status
-from sqlalchemy import and_, case, func, select, true
+from sqlalchemy import ColumnElement, and_, case, exists, func, select, true
 
-from airflow.api_fastapi.common.dagbag import DagBagDep
 from airflow.api_fastapi.common.db.common import SessionDep
 from airflow.api_fastapi.common.router import AirflowRouter
 from airflow.api_fastapi.core_api.security import requires_access_asset, 
requires_access_dag
 from airflow.models import DagModel
-from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel, 
DagScheduleAssetReference
+from airflow.models.asset import (
+    AssetDagRunQueue,
+    AssetEvent,
+    AssetModel,
+    AssetPartitionDagRun,
+    DagScheduleAssetReference,
+    PartitionedAssetKeyLog,
+)
 
 assets_router = AirflowRouter(tags=["Asset"])
 
@@ -36,35 +41,67 @@ assets_router = AirflowRouter(tags=["Asset"])
 )
 def next_run_assets(
     dag_id: str,
-    dag_bag: DagBagDep,
     session: SessionDep,
 ) -> dict:
     dag_model = DagModel.get_dagmodel(dag_id, session=session)
     if dag_model is None:
-        raise HTTPException(status.HTTP_404_NOT_FOUND, f"can't find associated 
dag_model {dag_id}")
+        raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
 
     latest_run = dag_model.get_last_dagrun(session=session)
+    event_filter = (
+        AssetEvent.timestamp >= latest_run.logical_date if latest_run and 
latest_run.logical_date else true()
+    )
 
-    if latest_run and latest_run.logical_date:
-        on_clause = AssetEvent.timestamp >= latest_run.logical_date
+    pending_partition_count: int | None = None
+
+    queued_expr: ColumnElement[int]
+    if is_partitioned := dag_model.timetable_summary == "Partitioned Asset":
+        pending_partition_count = session.scalar(
+            select(func.count())
+            .select_from(AssetPartitionDagRun)
+            .where(
+                AssetPartitionDagRun.target_dag_id == dag_id,
+                AssetPartitionDagRun.created_dag_run_id.is_(None),
+            )
+        )
+        queued_expr = case(
+            (
+                exists(
+                    select(PartitionedAssetKeyLog.id)
+                    .join(
+                        AssetPartitionDagRun,
+                        PartitionedAssetKeyLog.asset_partition_dag_run_id == 
AssetPartitionDagRun.id,
+                    )
+                    .where(
+                        PartitionedAssetKeyLog.asset_id == AssetModel.id,
+                        PartitionedAssetKeyLog.target_dag_id == dag_id,
+                        AssetPartitionDagRun.created_dag_run_id.is_(None),
+                    )
+                ),
+                1,
+            ),
+            else_=0,
+        )
     else:
-        on_clause = true()
+        queued_expr = func.max(case((AssetDagRunQueue.asset_id.is_not(None), 
1), else_=0))
 
-    query_result = session.execute(
+    query = (
         select(
             AssetModel.id,
             AssetModel.uri,
             AssetModel.name,
             func.max(AssetEvent.timestamp).label("lastUpdate"),
-            func.max(
-                case(
-                    (AssetDagRunQueue.asset_id.is_not(None), 1),
-                    else_=0,
-                )
-            ).label("queued"),
+            queued_expr.label("queued"),
         )
         .join(DagScheduleAssetReference, DagScheduleAssetReference.asset_id == 
AssetModel.id)
-        .join(
+        .join(AssetEvent, and_(AssetEvent.asset_id == AssetModel.id, 
event_filter), isouter=True)
+        .where(DagScheduleAssetReference.dag_id == dag_id, 
AssetModel.active.has())
+        .group_by(AssetModel.id, AssetModel.uri, AssetModel.name)
+        .order_by(AssetModel.uri)
+    )
+
+    if not is_partitioned:
+        query = query.join(
             AssetDagRunQueue,
             and_(
                 AssetDagRunQueue.asset_id == AssetModel.id,
@@ -72,23 +109,13 @@ def next_run_assets(
             ),
             isouter=True,
         )
-        .join(
-            AssetEvent,
-            and_(
-                AssetEvent.asset_id == AssetModel.id,
-                on_clause,
-            ),
-            isouter=True,
-        )
-        .where(DagScheduleAssetReference.dag_id == dag_id, 
AssetModel.active.has())
-        .group_by(AssetModel.id, AssetModel.uri, AssetModel.name)
-        .order_by(AssetModel.uri)
-    )
-    events = [dict(info._mapping) for info in query_result]
 
+    events = [dict(info._mapping) for info in session.execute(query)]
     for event in events:
         if not event.pop("queued", None):
             event["lastUpdate"] = None
 
-    data = {"asset_expression": dag_model.asset_expression, "events": events}
+    data: dict = {"asset_expression": dag_model.asset_expression, "events": 
events}
+    if pending_partition_count is not None:
+        data["pending_partition_count"] = pending_partition_count
     return data
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/partitioned_dag_runs.py
 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/partitioned_dag_runs.py
new file mode 100644
index 00000000000..fd67b0e48c6
--- /dev/null
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/partitioned_dag_runs.py
@@ -0,0 +1,239 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from fastapi import Depends, HTTPException, status
+from sqlalchemy import exists, func, select
+
+from airflow.api_fastapi.common.db.common import SessionDep, 
apply_filters_to_select
+from airflow.api_fastapi.common.parameters import (
+    QueryPartitionedDagRunDagIdFilter,
+    QueryPartitionedDagRunHasCreatedDagRunIdFilter,
+)
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.ui.partitioned_dag_runs import (
+    PartitionedDagRunAssetResponse,
+    PartitionedDagRunCollectionResponse,
+    PartitionedDagRunDetailResponse,
+    PartitionedDagRunResponse,
+)
+from airflow.api_fastapi.core_api.security import requires_access_asset
+from airflow.models import DagModel
+from airflow.models.asset import (
+    AssetModel,
+    AssetPartitionDagRun,
+    DagScheduleAssetReference,
+    PartitionedAssetKeyLog,
+)
+from airflow.models.dagrun import DagRun
+
+partitioned_dag_runs_router = AirflowRouter(tags=["PartitionedDagRun"])
+
+
+def _build_response(row, required_count: int) -> PartitionedDagRunResponse:
+    return PartitionedDagRunResponse(
+        id=row.id,
+        dag_id=row.target_dag_id,
+        partition_key=row.partition_key,
+        created_at=row.created_at.isoformat() if row.created_at else None,
+        total_received=row.total_received or 0,
+        total_required=required_count,
+        state=row.dag_run_state if row.created_dag_run_id else "pending",
+        created_dag_run_id=row.dag_run_id,
+    )
+
+
+@partitioned_dag_runs_router.get(
+    "/partitioned_dag_runs",
+    dependencies=[Depends(requires_access_asset(method="GET"))],
+)
+def get_partitioned_dag_runs(
+    session: SessionDep,
+    dag_id: QueryPartitionedDagRunDagIdFilter,
+    has_created_dag_run_id: QueryPartitionedDagRunHasCreatedDagRunIdFilter,
+) -> PartitionedDagRunCollectionResponse:
+    """Return PartitionedDagRuns. Filter by dag_id and/or 
has_created_dag_run_id."""
+    if dag_id.value is not None:
+        # Single query: validate Dag + get required count
+        dag_info = session.execute(
+            select(
+                DagModel.timetable_summary,
+                
func.count(DagScheduleAssetReference.asset_id).label("required_count"),
+            )
+            .outerjoin(
+                DagScheduleAssetReference,
+                (DagScheduleAssetReference.dag_id == DagModel.dag_id)
+                & DagScheduleAssetReference.asset_id.in_(
+                    select(AssetModel.id).where(AssetModel.active.has())
+                ),
+            )
+            .where(DagModel.dag_id == dag_id.value)
+            .group_by(DagModel.dag_id)
+        ).one_or_none()
+
+        if dag_info is None:
+            raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id 
{dag_id.value} was not found")
+        if dag_info.timetable_summary != "Partitioned Asset":
+            return 
PartitionedDagRunCollectionResponse(partitioned_dag_runs=[], total=0)
+
+        required_count = dag_info.required_count
+
+    # Subquery for received count per partition (only count required assets)
+    required_assets_subq = (
+        select(DagScheduleAssetReference.asset_id)
+        .join(AssetModel, AssetModel.id == DagScheduleAssetReference.asset_id)
+        .where(
+            DagScheduleAssetReference.dag_id == 
AssetPartitionDagRun.target_dag_id,
+            AssetModel.active.has(),
+        )
+    )
+    received_subq = (
+        select(func.count(func.distinct(PartitionedAssetKeyLog.asset_id)))
+        .where(
+            PartitionedAssetKeyLog.asset_partition_dag_run_id == 
AssetPartitionDagRun.id,
+            PartitionedAssetKeyLog.asset_id.in_(required_assets_subq),
+        )
+        .correlate(AssetPartitionDagRun)
+        .scalar_subquery()
+    )
+
+    query = select(
+        AssetPartitionDagRun.id,
+        AssetPartitionDagRun.target_dag_id,
+        AssetPartitionDagRun.partition_key,
+        AssetPartitionDagRun.created_at,
+        AssetPartitionDagRun.created_dag_run_id,
+        DagRun.run_id.label("dag_run_id"),
+        DagRun.state.label("dag_run_state"),
+        received_subq.label("total_received"),
+    ).outerjoin(DagRun, AssetPartitionDagRun.created_dag_run_id == DagRun.id)
+    query = apply_filters_to_select(statement=query, filters=[dag_id, 
has_created_dag_run_id])
+    query = query.order_by(AssetPartitionDagRun.created_at.desc())
+
+    if not (rows := session.execute(query).all()):
+        return PartitionedDagRunCollectionResponse(partitioned_dag_runs=[], 
total=0)
+
+    if dag_id.value is not None:
+        results = [_build_response(row, required_count) for row in rows]
+        return 
PartitionedDagRunCollectionResponse(partitioned_dag_runs=results, 
total=len(results))
+
+    # No dag_id: need to get required counts and expressions per dag
+    dag_ids = list({row.target_dag_id for row in rows})
+    dag_rows = session.execute(
+        select(
+            DagModel.dag_id,
+            DagModel.asset_expression,
+            
func.count(DagScheduleAssetReference.asset_id).label("required_count"),
+        )
+        .outerjoin(
+            DagScheduleAssetReference,
+            (DagScheduleAssetReference.dag_id == DagModel.dag_id)
+            & 
DagScheduleAssetReference.asset_id.in_(select(AssetModel.id).where(AssetModel.active.has())),
+        )
+        .where(DagModel.dag_id.in_(dag_ids))
+        .group_by(DagModel.dag_id)
+    ).all()
+
+    required_counts = {r.dag_id: r.required_count for r in dag_rows}
+    asset_expressions = {r.dag_id: r.asset_expression for r in dag_rows}
+    results = [_build_response(row, required_counts.get(row.target_dag_id, 0)) 
for row in rows]
+
+    return PartitionedDagRunCollectionResponse(
+        partitioned_dag_runs=results,
+        total=len(results),
+        asset_expressions=asset_expressions,
+    )
+
+
+@partitioned_dag_runs_router.get(
+    "/pending_partitioned_dag_run/{dag_id}/{partition_key}",
+    dependencies=[Depends(requires_access_asset(method="GET"))],
+)
+def get_pending_partitioned_dag_run(
+    dag_id: str,
+    partition_key: str,
+    session: SessionDep,
+) -> PartitionedDagRunDetailResponse:
+    """Return full details for pending PartitionedDagRun."""
+    partitioned_dag_run = session.execute(
+        select(
+            AssetPartitionDagRun.id,
+            AssetPartitionDagRun.target_dag_id,
+            AssetPartitionDagRun.partition_key,
+            AssetPartitionDagRun.created_at,
+            AssetPartitionDagRun.updated_at,
+            DagRun.run_id.label("created_dag_run_id"),
+        )
+        .outerjoin(DagRun, AssetPartitionDagRun.created_dag_run_id == 
DagRun.id)
+        .where(
+            AssetPartitionDagRun.target_dag_id == dag_id,
+            AssetPartitionDagRun.partition_key == partition_key,
+            AssetPartitionDagRun.created_dag_run_id.is_(None),
+        )
+    ).one_or_none()
+
+    if partitioned_dag_run is None:
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND,
+            f"No PartitionedDagRun for dag={dag_id} partition={partition_key}",
+        )
+
+    received_subq = (
+        select(PartitionedAssetKeyLog.asset_id).where(
+            PartitionedAssetKeyLog.asset_partition_dag_run_id == 
partitioned_dag_run.id
+        )
+    ).correlate(AssetModel)
+
+    received_expr = exists(received_subq.where(PartitionedAssetKeyLog.asset_id 
== AssetModel.id))
+
+    asset_expression_subq = (
+        select(DagModel.asset_expression).where(DagModel.dag_id == 
dag_id).scalar_subquery()
+    )
+    asset_rows = session.execute(
+        select(
+            AssetModel.id,
+            AssetModel.uri,
+            AssetModel.name,
+            received_expr.label("received"),
+            asset_expression_subq.label("asset_expression"),
+        )
+        .join(DagScheduleAssetReference, DagScheduleAssetReference.asset_id == 
AssetModel.id)
+        .where(DagScheduleAssetReference.dag_id == dag_id, 
AssetModel.active.has())
+        .order_by(received_expr.asc(), AssetModel.uri)
+    ).all()
+
+    assets = [
+        PartitionedDagRunAssetResponse(
+            asset_id=row.id, asset_name=row.name, asset_uri=row.uri, 
received=row.received
+        )
+        for row in asset_rows
+    ]
+    total_received = sum(1 for a in assets if a.received)
+    asset_expression = asset_rows[0].asset_expression if asset_rows else None
+
+    return PartitionedDagRunDetailResponse(
+        id=partitioned_dag_run.id,
+        dag_id=dag_id,
+        partition_key=partition_key,
+        created_at=partitioned_dag_run.created_at.isoformat() if 
partitioned_dag_run.created_at else None,
+        updated_at=partitioned_dag_run.updated_at.isoformat() if 
partitioned_dag_run.updated_at else None,
+        created_dag_run_id=partitioned_dag_run.created_dag_run_id,
+        assets=assets,
+        total_required=len(assets),
+        total_received=total_received,
+        asset_expression=asset_expression,
+    )
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
index f660990ec61..705a35f4cca 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
@@ -1,7 +1,7 @@
 // generated with @7nohe/[email protected] 
 
 import { UseQueryResult } from "@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService, 
ConfigService, ConnectionService, DagParsingService, DagRunService, DagService, 
DagSourceService, DagStatsService, DagVersionService, DagWarningService, 
DashboardService, DeadlinesService, DependenciesService, EventLogService, 
ExperimentalService, ExtraLinksService, GanttService, GridService, 
ImportErrorService, JobService, LoginService, MonitorService, PluginService, 
PoolService, ProviderService, StructureService [...]
+import { AssetService, AuthLinksService, BackfillService, CalendarService, 
ConfigService, ConnectionService, DagParsingService, DagRunService, DagService, 
DagSourceService, DagStatsService, DagVersionService, DagWarningService, 
DashboardService, DeadlinesService, DependenciesService, EventLogService, 
ExperimentalService, ExtraLinksService, GanttService, GridService, 
ImportErrorService, JobService, LoginService, MonitorService, 
PartitionedDagRunService, PluginService, PoolService, Provide [...]
 import { DagRunState, DagWarningType } from "../requests/types.gen";
 export type AssetServiceGetAssetsDefaultResponse = Awaited<ReturnType<typeof 
AssetService.getAssets>>;
 export type AssetServiceGetAssetsQueryResult<TData = 
AssetServiceGetAssetsDefaultResponse, TError = unknown> = UseQueryResult<TData, 
TError>;
@@ -789,6 +789,20 @@ export type 
AuthLinksServiceGetCurrentUserInfoDefaultResponse = Awaited<ReturnTy
 export type AuthLinksServiceGetCurrentUserInfoQueryResult<TData = 
AuthLinksServiceGetCurrentUserInfoDefaultResponse, TError = unknown> = 
UseQueryResult<TData, TError>;
 export const useAuthLinksServiceGetCurrentUserInfoKey = 
"AuthLinksServiceGetCurrentUserInfo";
 export const UseAuthLinksServiceGetCurrentUserInfoKeyFn = (queryKey?: 
Array<unknown>) => [useAuthLinksServiceGetCurrentUserInfoKey, ...(queryKey ?? 
[])];
+export type PartitionedDagRunServiceGetPartitionedDagRunsDefaultResponse = 
Awaited<ReturnType<typeof PartitionedDagRunService.getPartitionedDagRuns>>;
+export type PartitionedDagRunServiceGetPartitionedDagRunsQueryResult<TData = 
PartitionedDagRunServiceGetPartitionedDagRunsDefaultResponse, TError = unknown> 
= UseQueryResult<TData, TError>;
+export const usePartitionedDagRunServiceGetPartitionedDagRunsKey = 
"PartitionedDagRunServiceGetPartitionedDagRuns";
+export const UsePartitionedDagRunServiceGetPartitionedDagRunsKeyFn = ({ dagId, 
hasCreatedDagRunId }: {
+  dagId?: string;
+  hasCreatedDagRunId?: boolean;
+} = {}, queryKey?: Array<unknown>) => 
[usePartitionedDagRunServiceGetPartitionedDagRunsKey, ...(queryKey ?? [{ dagId, 
hasCreatedDagRunId }])];
+export type PartitionedDagRunServiceGetPendingPartitionedDagRunDefaultResponse 
= Awaited<ReturnType<typeof 
PartitionedDagRunService.getPendingPartitionedDagRun>>;
+export type 
PartitionedDagRunServiceGetPendingPartitionedDagRunQueryResult<TData = 
PartitionedDagRunServiceGetPendingPartitionedDagRunDefaultResponse, TError = 
unknown> = UseQueryResult<TData, TError>;
+export const usePartitionedDagRunServiceGetPendingPartitionedDagRunKey = 
"PartitionedDagRunServiceGetPendingPartitionedDagRun";
+export const UsePartitionedDagRunServiceGetPendingPartitionedDagRunKeyFn = ({ 
dagId, partitionKey }: {
+  dagId: string;
+  partitionKey: string;
+}, queryKey?: Array<unknown>) => 
[usePartitionedDagRunServiceGetPendingPartitionedDagRunKey, ...(queryKey ?? [{ 
dagId, partitionKey }])];
 export type DependenciesServiceGetDependenciesDefaultResponse = 
Awaited<ReturnType<typeof DependenciesService.getDependencies>>;
 export type DependenciesServiceGetDependenciesQueryResult<TData = 
DependenciesServiceGetDependenciesDefaultResponse, TError = unknown> = 
UseQueryResult<TData, TError>;
 export const useDependenciesServiceGetDependenciesKey = 
"DependenciesServiceGetDependencies";
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
index 20d67fcd1aa..a16531feb37 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
@@ -1,7 +1,7 @@
 // generated with @7nohe/[email protected] 
 
 import { type QueryClient } from "@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService, 
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService, 
DagStatsService, DagVersionService, DagWarningService, DashboardService, 
DeadlinesService, DependenciesService, EventLogService, ExperimentalService, 
ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, 
LoginService, MonitorService, PluginService, PoolService, ProviderService, 
StructureService, TaskInstanceServi [...]
+import { AssetService, AuthLinksService, BackfillService, CalendarService, 
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService, 
DagStatsService, DagVersionService, DagWarningService, DashboardService, 
DeadlinesService, DependenciesService, EventLogService, ExperimentalService, 
ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, 
LoginService, MonitorService, PartitionedDagRunService, PluginService, 
PoolService, ProviderService, Structure [...]
 import { DagRunState, DagWarningType } from "../requests/types.gen";
 import * as Common from "./common";
 /**
@@ -1499,6 +1499,32 @@ export const ensureUseAuthLinksServiceGetAuthMenusData = 
(queryClient: QueryClie
 */
 export const ensureUseAuthLinksServiceGetCurrentUserInfoData = (queryClient: 
QueryClient) => queryClient.ensureQueryData({ queryKey: 
Common.UseAuthLinksServiceGetCurrentUserInfoKeyFn(), queryFn: () => 
AuthLinksService.getCurrentUserInfo() });
 /**
+* Get Partitioned Dag Runs
+* Return PartitionedDagRuns. Filter by dag_id and/or has_created_dag_run_id.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.hasCreatedDagRunId
+* @returns PartitionedDagRunCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const ensureUsePartitionedDagRunServiceGetPartitionedDagRunsData = 
(queryClient: QueryClient, { dagId, hasCreatedDagRunId }: {
+  dagId?: string;
+  hasCreatedDagRunId?: boolean;
+} = {}) => queryClient.ensureQueryData({ queryKey: 
Common.UsePartitionedDagRunServiceGetPartitionedDagRunsKeyFn({ dagId, 
hasCreatedDagRunId }), queryFn: () => 
PartitionedDagRunService.getPartitionedDagRuns({ dagId, hasCreatedDagRunId }) 
});
+/**
+* Get Pending Partitioned Dag Run
+* Return full details for pending PartitionedDagRun.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.partitionKey
+* @returns PartitionedDagRunDetailResponse Successful Response
+* @throws ApiError
+*/
+export const ensureUsePartitionedDagRunServiceGetPendingPartitionedDagRunData 
= (queryClient: QueryClient, { dagId, partitionKey }: {
+  dagId: string;
+  partitionKey: string;
+}) => queryClient.ensureQueryData({ queryKey: 
Common.UsePartitionedDagRunServiceGetPendingPartitionedDagRunKeyFn({ dagId, 
partitionKey }), queryFn: () => 
PartitionedDagRunService.getPendingPartitionedDagRun({ dagId, partitionKey }) 
});
+/**
 * Get Dependencies
 * Dependencies graph.
 * @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
index bed863f93cd..a4b5afb54df 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -1,7 +1,7 @@
 // generated with @7nohe/[email protected] 
 
 import { type QueryClient } from "@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService, 
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService, 
DagStatsService, DagVersionService, DagWarningService, DashboardService, 
DeadlinesService, DependenciesService, EventLogService, ExperimentalService, 
ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, 
LoginService, MonitorService, PluginService, PoolService, ProviderService, 
StructureService, TaskInstanceServi [...]
+import { AssetService, AuthLinksService, BackfillService, CalendarService, 
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService, 
DagStatsService, DagVersionService, DagWarningService, DashboardService, 
DeadlinesService, DependenciesService, EventLogService, ExperimentalService, 
ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, 
LoginService, MonitorService, PartitionedDagRunService, PluginService, 
PoolService, ProviderService, Structure [...]
 import { DagRunState, DagWarningType } from "../requests/types.gen";
 import * as Common from "./common";
 /**
@@ -1499,6 +1499,32 @@ export const prefetchUseAuthLinksServiceGetAuthMenus = 
(queryClient: QueryClient
 */
 export const prefetchUseAuthLinksServiceGetCurrentUserInfo = (queryClient: 
QueryClient) => queryClient.prefetchQuery({ queryKey: 
Common.UseAuthLinksServiceGetCurrentUserInfoKeyFn(), queryFn: () => 
AuthLinksService.getCurrentUserInfo() });
 /**
+* Get Partitioned Dag Runs
+* Return PartitionedDagRuns. Filter by dag_id and/or has_created_dag_run_id.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.hasCreatedDagRunId
+* @returns PartitionedDagRunCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const prefetchUsePartitionedDagRunServiceGetPartitionedDagRuns = 
(queryClient: QueryClient, { dagId, hasCreatedDagRunId }: {
+  dagId?: string;
+  hasCreatedDagRunId?: boolean;
+} = {}) => queryClient.prefetchQuery({ queryKey: 
Common.UsePartitionedDagRunServiceGetPartitionedDagRunsKeyFn({ dagId, 
hasCreatedDagRunId }), queryFn: () => 
PartitionedDagRunService.getPartitionedDagRuns({ dagId, hasCreatedDagRunId }) 
});
+/**
+* Get Pending Partitioned Dag Run
+* Return full details for pending PartitionedDagRun.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.partitionKey
+* @returns PartitionedDagRunDetailResponse Successful Response
+* @throws ApiError
+*/
+export const prefetchUsePartitionedDagRunServiceGetPendingPartitionedDagRun = 
(queryClient: QueryClient, { dagId, partitionKey }: {
+  dagId: string;
+  partitionKey: string;
+}) => queryClient.prefetchQuery({ queryKey: 
Common.UsePartitionedDagRunServiceGetPendingPartitionedDagRunKeyFn({ dagId, 
partitionKey }), queryFn: () => 
PartitionedDagRunService.getPendingPartitionedDagRun({ dagId, partitionKey }) 
});
+/**
 * Get Dependencies
 * Dependencies graph.
 * @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index 434f0af6dcd..7bafcd6b706 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -1,7 +1,7 @@
 // generated with @7nohe/[email protected] 
 
 import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from 
"@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService, 
ConfigService, ConnectionService, DagParsingService, DagRunService, DagService, 
DagSourceService, DagStatsService, DagVersionService, DagWarningService, 
DashboardService, DeadlinesService, DependenciesService, EventLogService, 
ExperimentalService, ExtraLinksService, GanttService, GridService, 
ImportErrorService, JobService, LoginService, MonitorService, PluginService, 
PoolService, ProviderService, StructureService [...]
+import { AssetService, AuthLinksService, BackfillService, CalendarService, 
ConfigService, ConnectionService, DagParsingService, DagRunService, DagService, 
DagSourceService, DagStatsService, DagVersionService, DagWarningService, 
DashboardService, DeadlinesService, DependenciesService, EventLogService, 
ExperimentalService, ExtraLinksService, GanttService, GridService, 
ImportErrorService, JobService, LoginService, MonitorService, 
PartitionedDagRunService, PluginService, PoolService, Provide [...]
 import { BackfillPostBody, BulkBody_BulkTaskInstanceBody_, 
BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, 
ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, 
DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, 
DagWarningType, PatchTaskInstanceBody, PoolBody, PoolPatchBody, 
TaskInstancesBatchBody, TriggerDAGRunPostBody, UpdateHITLDetailPayload, 
VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen";
 import * as Common from "./common";
 /**
@@ -1499,6 +1499,32 @@ export const useAuthLinksServiceGetAuthMenus = <TData = 
Common.AuthLinksServiceG
 */
 export const useAuthLinksServiceGetCurrentUserInfo = <TData = 
Common.AuthLinksServiceGetCurrentUserInfoDefaultResponse, TError = unknown, 
TQueryKey extends Array<unknown> = unknown[]>(queryKey?: TQueryKey, options?: 
Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => 
useQuery<TData, TError>({ queryKey: 
Common.UseAuthLinksServiceGetCurrentUserInfoKeyFn(queryKey), queryFn: () => 
AuthLinksService.getCurrentUserInfo() as TData, ...options });
 /**
+* Get Partitioned Dag Runs
+* Return PartitionedDagRuns. Filter by dag_id and/or has_created_dag_run_id.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.hasCreatedDagRunId
+* @returns PartitionedDagRunCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const usePartitionedDagRunServiceGetPartitionedDagRuns = <TData = 
Common.PartitionedDagRunServiceGetPartitionedDagRunsDefaultResponse, TError = 
unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, 
hasCreatedDagRunId }: {
+  dagId?: string;
+  hasCreatedDagRunId?: boolean;
+} = {}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: 
Common.UsePartitionedDagRunServiceGetPartitionedDagRunsKeyFn({ dagId, 
hasCreatedDagRunId }, queryKey), queryFn: () => 
PartitionedDagRunService.getPartitionedDagRuns({ dagId, hasCreatedDagRunId }) 
as TData, ...options });
+/**
+* Get Pending Partitioned Dag Run
+* Return full details for pending PartitionedDagRun.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.partitionKey
+* @returns PartitionedDagRunDetailResponse Successful Response
+* @throws ApiError
+*/
+export const usePartitionedDagRunServiceGetPendingPartitionedDagRun = <TData = 
Common.PartitionedDagRunServiceGetPendingPartitionedDagRunDefaultResponse, 
TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, 
partitionKey }: {
+  dagId: string;
+  partitionKey: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: 
Common.UsePartitionedDagRunServiceGetPendingPartitionedDagRunKeyFn({ dagId, 
partitionKey }, queryKey), queryFn: () => 
PartitionedDagRunService.getPendingPartitionedDagRun({ dagId, partitionKey }) 
as TData, ...options });
+/**
 * Get Dependencies
 * Dependencies graph.
 * @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
index 05a2d3941ab..867f08383cf 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
@@ -1,7 +1,7 @@
 // generated with @7nohe/[email protected] 
 
 import { UseQueryOptions, useSuspenseQuery } from "@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService, 
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService, 
DagStatsService, DagVersionService, DagWarningService, DashboardService, 
DeadlinesService, DependenciesService, EventLogService, ExperimentalService, 
ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, 
LoginService, MonitorService, PluginService, PoolService, ProviderService, 
StructureService, TaskInstanceServi [...]
+import { AssetService, AuthLinksService, BackfillService, CalendarService, 
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService, 
DagStatsService, DagVersionService, DagWarningService, DashboardService, 
DeadlinesService, DependenciesService, EventLogService, ExperimentalService, 
ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, 
LoginService, MonitorService, PartitionedDagRunService, PluginService, 
PoolService, ProviderService, Structure [...]
 import { DagRunState, DagWarningType } from "../requests/types.gen";
 import * as Common from "./common";
 /**
@@ -1499,6 +1499,32 @@ export const useAuthLinksServiceGetAuthMenusSuspense = 
<TData = Common.AuthLinks
 */
 export const useAuthLinksServiceGetCurrentUserInfoSuspense = <TData = 
Common.AuthLinksServiceGetCurrentUserInfoDefaultResponse, TError = unknown, 
TQueryKey extends Array<unknown> = unknown[]>(queryKey?: TQueryKey, options?: 
Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => 
useSuspenseQuery<TData, TError>({ queryKey: 
Common.UseAuthLinksServiceGetCurrentUserInfoKeyFn(queryKey), queryFn: () => 
AuthLinksService.getCurrentUserInfo() as TData, ...options });
 /**
+* Get Partitioned Dag Runs
+* Return PartitionedDagRuns. Filter by dag_id and/or has_created_dag_run_id.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.hasCreatedDagRunId
+* @returns PartitionedDagRunCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const usePartitionedDagRunServiceGetPartitionedDagRunsSuspense = <TData 
= Common.PartitionedDagRunServiceGetPartitionedDagRunsDefaultResponse, TError = 
unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, 
hasCreatedDagRunId }: {
+  dagId?: string;
+  hasCreatedDagRunId?: boolean;
+} = {}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: 
Common.UsePartitionedDagRunServiceGetPartitionedDagRunsKeyFn({ dagId, 
hasCreatedDagRunId }, queryKey), queryFn: () => 
PartitionedDagRunService.getPartitionedDagRuns({ dagId, hasCreatedDagRunId }) 
as TData, ...options });
+/**
+* Get Pending Partitioned Dag Run
+* Return full details for pending PartitionedDagRun.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.partitionKey
+* @returns PartitionedDagRunDetailResponse Successful Response
+* @throws ApiError
+*/
+export const usePartitionedDagRunServiceGetPendingPartitionedDagRunSuspense = 
<TData = 
Common.PartitionedDagRunServiceGetPendingPartitionedDagRunDefaultResponse, 
TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, 
partitionKey }: {
+  dagId: string;
+  partitionKey: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: 
Common.UsePartitionedDagRunServiceGetPendingPartitionedDagRunKeyFn({ dagId, 
partitionKey }, queryKey), queryFn: () => 
PartitionedDagRunService.getPendingPartitionedDagRun({ dagId, partitionKey }) 
as TData, ...options });
+/**
 * Get Dependencies
 * Dependencies graph.
 * @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index b9e305f07c9..88c516866e4 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -8393,6 +8393,223 @@ export const $OklchColor = {
     type: 'string'
 } as const;
 
+export const $PartitionedDagRunAssetResponse = {
+    properties: {
+        asset_id: {
+            type: 'integer',
+            title: 'Asset Id'
+        },
+        asset_name: {
+            type: 'string',
+            title: 'Asset Name'
+        },
+        asset_uri: {
+            type: 'string',
+            title: 'Asset Uri'
+        },
+        received: {
+            type: 'boolean',
+            title: 'Received'
+        }
+    },
+    type: 'object',
+    required: ['asset_id', 'asset_name', 'asset_uri', 'received'],
+    title: 'PartitionedDagRunAssetResponse',
+    description: 'Asset info within a partitioned Dag run detail.'
+} as const;
+
+export const $PartitionedDagRunCollectionResponse = {
+    properties: {
+        partitioned_dag_runs: {
+            items: {
+                '$ref': '#/components/schemas/PartitionedDagRunResponse'
+            },
+            type: 'array',
+            title: 'Partitioned Dag Runs'
+        },
+        total: {
+            type: 'integer',
+            title: 'Total'
+        },
+        asset_expressions: {
+            anyOf: [
+                {
+                    additionalProperties: {
+                        anyOf: [
+                            {
+                                additionalProperties: true,
+                                type: 'object'
+                            },
+                            {
+                                type: 'null'
+                            }
+                        ]
+                    },
+                    type: 'object'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Asset Expressions'
+        }
+    },
+    type: 'object',
+    required: ['partitioned_dag_runs', 'total'],
+    title: 'PartitionedDagRunCollectionResponse',
+    description: 'Collection of partitioned Dag runs.'
+} as const;
+
+export const $PartitionedDagRunDetailResponse = {
+    properties: {
+        id: {
+            type: 'integer',
+            title: 'Id'
+        },
+        dag_id: {
+            type: 'string',
+            title: 'Dag Id'
+        },
+        partition_key: {
+            type: 'string',
+            title: 'Partition Key'
+        },
+        created_at: {
+            anyOf: [
+                {
+                    type: 'string'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Created At'
+        },
+        updated_at: {
+            anyOf: [
+                {
+                    type: 'string'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Updated At'
+        },
+        created_dag_run_id: {
+            anyOf: [
+                {
+                    type: 'string'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Created Dag Run Id'
+        },
+        assets: {
+            items: {
+                '$ref': '#/components/schemas/PartitionedDagRunAssetResponse'
+            },
+            type: 'array',
+            title: 'Assets'
+        },
+        total_required: {
+            type: 'integer',
+            title: 'Total Required'
+        },
+        total_received: {
+            type: 'integer',
+            title: 'Total Received'
+        },
+        asset_expression: {
+            anyOf: [
+                {
+                    additionalProperties: true,
+                    type: 'object'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Asset Expression'
+        }
+    },
+    type: 'object',
+    required: ['id', 'dag_id', 'partition_key', 'assets', 'total_required', 
'total_received'],
+    title: 'PartitionedDagRunDetailResponse',
+    description: 'Detail of a single partitioned Dag run.'
+} as const;
+
+export const $PartitionedDagRunResponse = {
+    properties: {
+        id: {
+            type: 'integer',
+            title: 'Id'
+        },
+        partition_key: {
+            type: 'string',
+            title: 'Partition Key'
+        },
+        created_at: {
+            anyOf: [
+                {
+                    type: 'string'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Created At'
+        },
+        total_received: {
+            type: 'integer',
+            title: 'Total Received'
+        },
+        total_required: {
+            type: 'integer',
+            title: 'Total Required'
+        },
+        dag_id: {
+            anyOf: [
+                {
+                    type: 'string'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Dag Id'
+        },
+        state: {
+            anyOf: [
+                {
+                    type: 'string'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'State'
+        },
+        created_dag_run_id: {
+            anyOf: [
+                {
+                    type: 'string'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Created Dag Run Id'
+        }
+    },
+    type: 'object',
+    required: ['id', 'partition_key', 'total_received', 'total_required'],
+    title: 'PartitionedDagRunResponse',
+    description: 'Single partitioned Dag run item.'
+} as const;
+
 export const $StandardHookFields = {
     properties: {
         description: {
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index 63107cb97b6..99a9f0d0e23 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -3,7 +3,7 @@
 import type { CancelablePromise } from './core/CancelablePromise';
 import { OpenAPI } from './core/OpenAPI';
 import { request as __request } from './core/request';
-import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, 
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, 
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, 
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, 
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, 
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, 
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, 
Dele [...]
+import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, 
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, 
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, 
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, 
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, 
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, 
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, 
Dele [...]
 
 export class AssetService {
     /**
@@ -3850,6 +3850,55 @@ export class AuthLinksService {
     
 }
 
+export class PartitionedDagRunService {
+    /**
+     * Get Partitioned Dag Runs
+     * Return PartitionedDagRuns. Filter by dag_id and/or 
has_created_dag_run_id.
+     * @param data The data for the request.
+     * @param data.dagId
+     * @param data.hasCreatedDagRunId
+     * @returns PartitionedDagRunCollectionResponse Successful Response
+     * @throws ApiError
+     */
+    public static getPartitionedDagRuns(data: GetPartitionedDagRunsData = {}): 
CancelablePromise<GetPartitionedDagRunsResponse> {
+        return __request(OpenAPI, {
+            method: 'GET',
+            url: '/ui/partitioned_dag_runs',
+            query: {
+                dag_id: data.dagId,
+                has_created_dag_run_id: data.hasCreatedDagRunId
+            },
+            errors: {
+                422: 'Validation Error'
+            }
+        });
+    }
+    
+    /**
+     * Get Pending Partitioned Dag Run
+     * Return full details for pending PartitionedDagRun.
+     * @param data The data for the request.
+     * @param data.dagId
+     * @param data.partitionKey
+     * @returns PartitionedDagRunDetailResponse Successful Response
+     * @throws ApiError
+     */
+    public static getPendingPartitionedDagRun(data: 
GetPendingPartitionedDagRunData): 
CancelablePromise<GetPendingPartitionedDagRunResponse> {
+        return __request(OpenAPI, {
+            method: 'GET',
+            url: '/ui/pending_partitioned_dag_run/{dag_id}/{partition_key}',
+            path: {
+                dag_id: data.dagId,
+                partition_key: data.partitionKey
+            },
+            errors: {
+                422: 'Validation Error'
+            }
+        });
+    }
+    
+}
+
 export class DependenciesService {
     /**
      * Get Dependencies
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index aa596d50253..7b334da247d 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -2065,6 +2065,61 @@ export type NodeResponse = {
 
 export type OklchColor = string;
 
+/**
+ * Asset info within a partitioned Dag run detail.
+ */
+export type PartitionedDagRunAssetResponse = {
+    asset_id: number;
+    asset_name: string;
+    asset_uri: string;
+    received: boolean;
+};
+
+/**
+ * Collection of partitioned Dag runs.
+ */
+export type PartitionedDagRunCollectionResponse = {
+    partitioned_dag_runs: Array<PartitionedDagRunResponse>;
+    total: number;
+    asset_expressions?: {
+    [key: string]: ({
+    [key: string]: unknown;
+} | null);
+} | null;
+};
+
+/**
+ * Detail of a single partitioned Dag run.
+ */
+export type PartitionedDagRunDetailResponse = {
+    id: number;
+    dag_id: string;
+    partition_key: string;
+    created_at?: string | null;
+    updated_at?: string | null;
+    created_dag_run_id?: string | null;
+    assets: Array<PartitionedDagRunAssetResponse>;
+    total_required: number;
+    total_received: number;
+    asset_expression?: {
+    [key: string]: unknown;
+} | null;
+};
+
+/**
+ * Single partitioned Dag run item.
+ */
+export type PartitionedDagRunResponse = {
+    id: number;
+    partition_key: string;
+    created_at?: string | null;
+    total_received: number;
+    total_required: number;
+    dag_id?: string | null;
+    state?: string | null;
+    created_dag_run_id?: string | null;
+};
+
 /**
  * Standard fields of a Hook that a form will render.
  */
@@ -3460,6 +3515,20 @@ export type GetAuthMenusResponse = 
MenuItemCollectionResponse;
 
 export type GetCurrentUserInfoResponse = AuthenticatedMeResponse;
 
+export type GetPartitionedDagRunsData = {
+    dagId?: string | null;
+    hasCreatedDagRunId?: boolean | null;
+};
+
+export type GetPartitionedDagRunsResponse = 
PartitionedDagRunCollectionResponse;
+
+export type GetPendingPartitionedDagRunData = {
+    dagId: string;
+    partitionKey: string;
+};
+
+export type GetPendingPartitionedDagRunResponse = 
PartitionedDagRunDetailResponse;
+
 export type GetDependenciesData = {
     dependencyType?: 'scheduling' | 'data';
     nodeId?: string | null;
@@ -6649,6 +6718,36 @@ export type $OpenApiTs = {
             };
         };
     };
+    '/ui/partitioned_dag_runs': {
+        get: {
+            req: GetPartitionedDagRunsData;
+            res: {
+                /**
+                 * Successful Response
+                 */
+                200: PartitionedDagRunCollectionResponse;
+                /**
+                 * Validation Error
+                 */
+                422: HTTPValidationError;
+            };
+        };
+    };
+    '/ui/pending_partitioned_dag_run/{dag_id}/{partition_key}': {
+        get: {
+            req: GetPendingPartitionedDagRunData;
+            res: {
+                /**
+                 * Successful Response
+                 */
+                200: PartitionedDagRunDetailResponse;
+                /**
+                 * Validation Error
+                 */
+                422: HTTPValidationError;
+            };
+        };
+    };
     '/ui/dependencies': {
         get: {
             req: GetDependenciesData;
diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json 
b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
index 396359107e7..1846f214024 100644
--- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
+++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json
@@ -161,6 +161,13 @@
     "placeholder": "Add a note...",
     "taskInstance": "Task Instance Note"
   },
+  "partitionedDagRun": "Partitioned Dag Run",
+  "partitionedDagRun_other": "Partitioned Dag Runs",
+  "partitionedDagRunDetail": {
+    "receivedAssetEvents": "Received Asset Events"
+  },
+  "pendingDagRun": "{{count}} Pending Dag Run",
+  "pendingDagRun_other": "{{count}} Pending Dag Runs",
   "reset": "Reset",
   "runId": "Run ID",
   "runTypes": {
diff --git 
a/airflow-core/src/airflow/ui/src/components/AssetExpression/AssetExpression.tsx
 
b/airflow-core/src/airflow/ui/src/components/AssetExpression/AssetExpression.tsx
index 9b49fd8b947..9b2a8e3b4f2 100644
--- 
a/airflow-core/src/airflow/ui/src/components/AssetExpression/AssetExpression.tsx
+++ 
b/airflow-core/src/airflow/ui/src/components/AssetExpression/AssetExpression.tsx
@@ -31,11 +31,11 @@ export const AssetExpression = ({
   expression,
 }: {
   readonly events?: Array<NextRunEvent>;
-  readonly expression: ExpressionType | null;
+  readonly expression: ExpressionType | undefined;
 }) => {
   const { t: translate } = useTranslation("common");
 
-  if (expression === null) {
+  if (expression === undefined) {
     return undefined;
   }
 
diff --git a/airflow-core/src/airflow/ui/src/components/AssetProgressCell.tsx 
b/airflow-core/src/airflow/ui/src/components/AssetProgressCell.tsx
new file mode 100644
index 00000000000..7d305a03fb7
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/components/AssetProgressCell.tsx
@@ -0,0 +1,67 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Button } from "@chakra-ui/react";
+import { FiDatabase } from "react-icons/fi";
+
+import { usePartitionedDagRunServiceGetPendingPartitionedDagRun } from 
"openapi/queries";
+import type { PartitionedDagRunAssetResponse } from 
"openapi/requests/types.gen";
+import { AssetExpression, type ExpressionType } from 
"src/components/AssetExpression";
+import type { NextRunEvent } from "src/components/AssetExpression/types";
+import { Popover } from "src/components/ui";
+
+type Props = {
+  readonly dagId: string;
+  readonly partitionKey: string;
+  readonly totalReceived: number;
+  readonly totalRequired: number;
+};
+
+export const AssetProgressCell = ({ dagId, partitionKey, totalReceived, 
totalRequired }: Props) => {
+  const { data, isLoading } = 
usePartitionedDagRunServiceGetPendingPartitionedDagRun({ dagId, partitionKey });
+
+  const assetExpression = data?.asset_expression as ExpressionType | undefined;
+  const assets: Array<PartitionedDagRunAssetResponse> = data?.assets ?? [];
+
+  const events: Array<NextRunEvent> = assets
+    .filter((ak: PartitionedDagRunAssetResponse) => ak.received)
+    .map((ak: PartitionedDagRunAssetResponse) => ({
+      id: ak.asset_id,
+      lastUpdate: "received",
+      name: ak.asset_name,
+      uri: ak.asset_uri,
+    }));
+
+  return (
+    // eslint-disable-next-line jsx-a11y/no-autofocus
+    <Popover.Root autoFocus={false} lazyMount positioning={{ placement: 
"bottom-end" }} unmountOnExit>
+      <Popover.Trigger asChild>
+        <Button loading={isLoading} paddingInline={0} size="sm" 
variant="ghost">
+          <FiDatabase style={{ display: "inline" }} />
+          {`${String(totalReceived)} / ${String(totalRequired)}`}
+        </Button>
+      </Popover.Trigger>
+      <Popover.Content css={{ "--popover-bg": "colors.bg.emphasized" }} 
width="fit-content">
+        <Popover.Arrow />
+        <Popover.Body>
+          <AssetExpression events={events} expression={assetExpression} />
+        </Popover.Body>
+      </Popover.Content>
+    </Popover.Root>
+  );
+};
diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx 
b/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
index 2a801f473f0..dd7a7a45d3b 100644
--- a/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx
@@ -18,6 +18,7 @@
  */
 import { Button, HStack, Link, Text } from "@chakra-ui/react";
 import dayjs from "dayjs";
+import { useState } from "react";
 import { useTranslation } from "react-i18next";
 import { FiDatabase } from "react-icons/fi";
 import { Link as RouterLink } from "react-router-dom";
@@ -28,6 +29,8 @@ import type { NextRunEvent } from 
"src/components/AssetExpression/types";
 import { TruncatedText } from "src/components/TruncatedText";
 import { Popover } from "src/components/ui";
 
+import { PartitionScheduleModal } from "./PartitionScheduleModal";
+
 type Props = {
   readonly assetExpression?: ExpressionType | null;
   readonly dagId: string;
@@ -35,18 +38,44 @@ type Props = {
   readonly timetableSummary: string | null;
 };
 
+type PartitionScheduleProps = {
+  readonly dagId: string;
+  readonly isLoading: boolean;
+  readonly pendingCount: number;
+};
+
+const PartitionSchedule = ({ dagId, isLoading, pendingCount }: 
PartitionScheduleProps) => {
+  const { t: translate } = useTranslation("common");
+  const [open, setOpen] = useState(false);
+
+  return (
+    <>
+      <Button loading={isLoading} onClick={() => setOpen(true)} 
paddingInline={0} size="sm" variant="ghost">
+        <FiDatabase style={{ display: "inline" }} />
+        {translate("pendingDagRun", { count: pendingCount })}
+      </Button>
+      <PartitionScheduleModal dagId={dagId} onClose={() => setOpen(false)} 
open={open} />
+    </>
+  );
+};
+
 export const AssetSchedule = ({ assetExpression, dagId, latestRunAfter, 
timetableSummary }: Props) => {
-  const { t: translate } = useTranslation("dags");
+  const { t: translate } = useTranslation(["dags", "common"]);
   const { data: nextRun, isLoading } = useAssetServiceNextRunAssets({ dagId });
 
+  const isPartitioned = timetableSummary === "Partitioned Asset";
+
   const nextRunEvents = (nextRun?.events ?? []) as Array<NextRunEvent>;
 
   const pendingEvents = nextRunEvents.filter((ev) => {
-    if (ev.lastUpdate !== null && latestRunAfter !== undefined) {
-      return dayjs(ev.lastUpdate).isAfter(latestRunAfter);
+    if (ev.lastUpdate === null) {
+      return false;
+    }
+    if (isPartitioned) {
+      return true;
     }
 
-    return false;
+    return latestRunAfter !== undefined && 
dayjs(ev.lastUpdate).isAfter(latestRunAfter);
   });
 
   if (!nextRunEvents.length) {
@@ -58,6 +87,25 @@ export const AssetSchedule = ({ assetExpression, dagId, 
latestRunAfter, timetabl
     );
   }
 
+  if (isPartitioned) {
+    const pendingCount = (nextRun?.pending_partition_count as number | 
undefined) ?? 0;
+
+    if (pendingCount === 0) {
+      return (
+        <HStack>
+          <FiDatabase style={{ display: "inline", flexShrink: 0 }} />
+          <Text>{translate("common:runTypes.asset_triggered")}</Text>
+        </HStack>
+      );
+    }
+
+    if (pendingCount > 1) {
+      return <PartitionSchedule dagId={dagId} isLoading={isLoading} 
pendingCount={pendingCount} />;
+    }
+
+    // pendingCount === 1: fall through to the standard asset popover below
+  }
+
   const [asset] = nextRunEvents;
 
   if (nextRunEvents.length === 1 && asset !== undefined) {
diff --git 
a/airflow-core/src/airflow/ui/src/pages/DagsList/PartitionScheduleModal.tsx 
b/airflow-core/src/airflow/ui/src/pages/DagsList/PartitionScheduleModal.tsx
new file mode 100644
index 00000000000..9a5e2dcb0d8
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/pages/DagsList/PartitionScheduleModal.tsx
@@ -0,0 +1,110 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Heading, HStack, Text } from "@chakra-ui/react";
+import type { ColumnDef } from "@tanstack/react-table";
+import { useTranslation } from "react-i18next";
+import { FiDatabase } from "react-icons/fi";
+
+import { usePartitionedDagRunServiceGetPartitionedDagRuns } from 
"openapi/queries";
+import type { PartitionedDagRunResponse } from "openapi/requests/types.gen";
+import { AssetProgressCell } from "src/components/AssetProgressCell";
+import { DataTable } from "src/components/DataTable";
+import { ErrorAlert } from "src/components/ErrorAlert";
+import Time from "src/components/Time";
+import { Dialog } from "src/components/ui";
+
+type PartitionScheduleModalProps = {
+  readonly dagId: string;
+  readonly onClose: () => void;
+  readonly open: boolean;
+};
+
+const getColumns = (
+  translate: (key: string) => string,
+  dagId: string,
+): Array<ColumnDef<PartitionedDagRunResponse>> => [
+  {
+    accessorKey: "partition_key",
+    enableSorting: false,
+    header: translate("dagRun.mappedPartitionKey"),
+  },
+  {
+    accessorKey: "created_at",
+    cell: ({ row }) => (
+      <Text>
+        <Time datetime={row.original.created_at} />
+      </Text>
+    ),
+    enableSorting: false,
+    header: translate("table.createdAt"),
+  },
+  {
+    accessorKey: "total_received",
+    cell: ({ row }) => (
+      <AssetProgressCell
+        dagId={dagId}
+        partitionKey={row.original.partition_key}
+        totalReceived={row.original.total_received}
+        totalRequired={row.original.total_required}
+      />
+    ),
+    enableSorting: false,
+    header: translate("partitionedDagRunDetail.receivedAssetEvents"),
+  },
+];
+
+export const PartitionScheduleModal = ({ dagId, onClose, open }: 
PartitionScheduleModalProps) => {
+  const { t: translate } = useTranslation("common");
+
+  const { data, error, isFetching, isLoading } = 
usePartitionedDagRunServiceGetPartitionedDagRuns(
+    { dagId, hasCreatedDagRunId: false },
+    undefined,
+    { enabled: open },
+  );
+
+  const partitionedDagRuns = data?.partitioned_dag_runs ?? [];
+  const total = data?.total ?? 0;
+  const columns = getColumns(translate, dagId);
+
+  return (
+    <Dialog.Root lazyMount onOpenChange={onClose} open={open} 
scrollBehavior="inside" size="xl" unmountOnExit>
+      <Dialog.Content backdrop>
+        <Dialog.Header>
+          <HStack>
+            <FiDatabase />
+            <Heading size="md">{translate("pendingDagRun", { count: total 
})}</Heading>
+          </HStack>
+        </Dialog.Header>
+        <Dialog.CloseTrigger />
+        <Dialog.Body>
+          <ErrorAlert error={error} />
+          <DataTable
+            columns={columns}
+            data={partitionedDagRuns}
+            isFetching={isFetching}
+            isLoading={isLoading}
+            modelName="partitionedDagRun"
+            showRowCountHeading={false}
+            total={total}
+          />
+        </Dialog.Body>
+      </Dialog.Content>
+    </Dialog.Root>
+  );
+};
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_assets.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_assets.py
index 3e9dd95b308..eaf3e5031fc 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_assets.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_assets.py
@@ -22,12 +22,19 @@ import pendulum
 import pytest
 from sqlalchemy import select
 
-from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
+from airflow.models.asset import (
+    AssetDagRunQueue,
+    AssetEvent,
+    AssetModel,
+    AssetPartitionDagRun,
+    PartitionedAssetKeyLog,
+)
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.sdk.definitions.asset import Asset
+from airflow.sdk.definitions.timetables.assets import PartitionedAssetTimetable
 
 from tests_common.test_utils.asserts import assert_queries_count
-from tests_common.test_utils.db import clear_db_dags, clear_db_serialized_dags
+from tests_common.test_utils.db import clear_db_apdr, clear_db_dags, 
clear_db_pakl, clear_db_serialized_dags
 
 pytestmark = pytest.mark.db_test
 
@@ -36,6 +43,8 @@ pytestmark = pytest.mark.db_test
 def cleanup():
     clear_db_dags()
     clear_db_serialized_dags()
+    clear_db_apdr()
+    clear_db_pakl()
 
 
 class TestNextRunAssets:
@@ -162,3 +171,54 @@ class TestNextRunAssets:
         ev = resp.json()["events"][0]
         assert ev["lastUpdate"] is not None
         assert "queued" not in ev
+
+    @pytest.mark.parametrize(
+        ("fulfilled", "expect_last_update"),
+        [(False, True), (True, False)],
+        ids=["pending", "fulfilled"],
+    )
+    def test_partitioned_dag_last_update(
+        self, test_client, dag_maker, session, fulfilled, expect_last_update
+    ):
+        asset = Asset(uri="s3://bucket/part", name="part")
+        with dag_maker(
+            dag_id="part_dag",
+            schedule=PartitionedAssetTimetable(assets=asset),
+            serialized=True,
+        ):
+            EmptyOperator(task_id="t")
+
+        dr = dag_maker.create_dagrun()
+        dag_maker.sync_dagbag_to_db()
+
+        asset_model = session.scalars(select(AssetModel).where(AssetModel.uri 
== "s3://bucket/part")).one()
+        event = AssetEvent(
+            asset_id=asset_model.id, timestamp=(dr.logical_date or 
pendulum.now()).add(minutes=5)
+        )
+        session.add(event)
+        session.flush()
+
+        pdr = AssetPartitionDagRun(
+            target_dag_id="part_dag",
+            partition_key="2024-01-01",
+            created_dag_run_id=dr.id if fulfilled else None,
+        )
+        session.add(pdr)
+        session.flush()
+
+        session.add(
+            PartitionedAssetKeyLog(
+                asset_id=asset_model.id,
+                asset_event_id=event.id,
+                asset_partition_dag_run_id=pdr.id,
+                source_partition_key="2024-01-01",
+                target_dag_id="part_dag",
+                target_partition_key="2024-01-01",
+            )
+        )
+        session.commit()
+
+        resp = test_client.get("/next_run_assets/part_dag")
+        assert resp.status_code == 200
+        ev = resp.json()["events"][0]
+        assert (ev["lastUpdate"] is not None) == expect_last_update
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_partitioned_dag_runs.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_partitioned_dag_runs.py
new file mode 100644
index 00000000000..52f0ec1e265
--- /dev/null
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_partitioned_dag_runs.py
@@ -0,0 +1,276 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pendulum
+import pytest
+from sqlalchemy import select
+
+from airflow.models.asset import AssetEvent, AssetModel, AssetPartitionDagRun, 
PartitionedAssetKeyLog
+from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk.definitions.asset import Asset
+from airflow.sdk.definitions.timetables.assets import PartitionedAssetTimetable
+
+from tests_common.test_utils.asserts import assert_queries_count
+from tests_common.test_utils.db import clear_db_apdr, clear_db_dags, 
clear_db_pakl, clear_db_serialized_dags
+
+pytestmark = pytest.mark.db_test
+
+
[email protected](autouse=True)
+def cleanup():
+    clear_db_dags()
+    clear_db_serialized_dags()
+    clear_db_apdr()
+    clear_db_pakl()
+
+
+class TestGetPartitionedDagRuns:
+    def test_should_response_401(self, unauthenticated_test_client):
+        response = 
unauthenticated_test_client.get("/partitioned_dag_runs?dag_id=any")
+        assert response.status_code == 401
+
+    def test_should_response_403(self, unauthorized_test_client):
+        response = 
unauthorized_test_client.get("/partitioned_dag_runs?dag_id=any")
+        assert response.status_code == 403
+
+    def test_should_response_404(self, test_client):
+        assert 
test_client.get("/partitioned_dag_runs?dag_id=no_such_dag").status_code == 404
+
+    def test_should_response_200_non_partitioned_dag_returns_empty(self, 
test_client, dag_maker):
+        with dag_maker(dag_id="normal", schedule=[Asset(uri="s3://bucket/a", 
name="a")], serialized=True):
+            EmptyOperator(task_id="t")
+        dag_maker.create_dagrun()
+        dag_maker.sync_dagbag_to_db()
+
+        with assert_queries_count(2):
+            resp = 
test_client.get("/partitioned_dag_runs?dag_id=normal&has_created_dag_run_id=false")
+        assert resp.status_code == 200
+        assert resp.json() == {"partitioned_dag_runs": [], "total": 0, 
"asset_expressions": None}
+
+    @pytest.mark.parametrize(
+        (
+            "num_assets",
+            "received_count",
+            "fulfilled",
+            "has_created_dag_run_id",
+            "expected_total",
+            "expected_state",
+        ),
+        [
+            (1, 1, False, False, 1, "pending"),
+            (1, 1, True, False, 0, None),
+            (1, 1, True, True, 1, "running"),
+            (3, 0, False, False, 1, "pending"),
+            (3, 1, False, False, 1, "pending"),
+            (3, 2, False, False, 1, "pending"),
+            (3, 3, False, False, 1, "pending"),
+        ],
+        ids=[
+            "filter-pending-included",
+            "filter-fulfilled-excluded",
+            "filter-fulfilled-included",
+            "received-0/3",
+            "received-1/3",
+            "received-2/3",
+            "received-3/3",
+        ],
+    )
+    def test_should_response_200(
+        self,
+        test_client,
+        dag_maker,
+        session,
+        num_assets,
+        received_count,
+        fulfilled,
+        has_created_dag_run_id,
+        expected_total,
+        expected_state,
+    ):
+        uris = [f"s3://bucket/lr{i}" for i in range(num_assets)]
+        asset_defs = [Asset(uri=uri, name=f"lr{i}") for i, uri in 
enumerate(uris)]
+        schedule = asset_defs[0]
+        for a in asset_defs[1:]:
+            schedule = schedule & a
+
+        with dag_maker(
+            dag_id="list_dag",
+            schedule=PartitionedAssetTimetable(assets=schedule),
+            serialized=True,
+        ):
+            EmptyOperator(task_id="t")
+
+        dr = dag_maker.create_dagrun()
+        dag_maker.sync_dagbag_to_db()
+
+        assets = {a.uri: a for a in 
session.scalars(select(AssetModel).where(AssetModel.uri.in_(uris)))}
+
+        pdr = AssetPartitionDagRun(
+            target_dag_id="list_dag",
+            partition_key="2024-06-01",
+            created_dag_run_id=dr.id if fulfilled else None,
+        )
+        session.add(pdr)
+        session.flush()
+
+        for uri in uris[:received_count]:
+            event = AssetEvent(asset_id=assets[uri].id, 
timestamp=pendulum.now())
+            session.add(event)
+            session.flush()
+            session.add(
+                PartitionedAssetKeyLog(
+                    asset_id=assets[uri].id,
+                    asset_event_id=event.id,
+                    asset_partition_dag_run_id=pdr.id,
+                    source_partition_key="2024-06-01",
+                    target_dag_id="list_dag",
+                    target_partition_key="2024-06-01",
+                )
+            )
+        session.commit()
+
+        with assert_queries_count(2):
+            resp = test_client.get(
+                f"/partitioned_dag_runs?dag_id=list_dag"
+                
f"&has_created_dag_run_id={str(has_created_dag_run_id).lower()}"
+            )
+        assert resp.status_code == 200
+        body = resp.json()
+        assert body["total"] == expected_total
+        if expected_total > 0:
+            pdr_resp = body["partitioned_dag_runs"][0]
+            assert pdr_resp["state"] == expected_state
+            assert pdr_resp["total_received"] == received_count
+            assert pdr_resp["total_required"] == num_assets
+
+
+class TestGetPendingPartitionedDagRun:
+    def test_should_response_401(self, unauthenticated_test_client):
+        response = 
unauthenticated_test_client.get("/pending_partitioned_dag_run/any_dag/any_key")
+        assert response.status_code == 401
+
+    def test_should_response_403(self, unauthorized_test_client):
+        response = 
unauthorized_test_client.get("/pending_partitioned_dag_run/any_dag/any_key")
+        assert response.status_code == 403
+
+    @pytest.mark.parametrize(
+        ("dag_id", "partition_key", "fulfilled"),
+        [
+            ("no_dag", "no_key", False),
+            ("fulfilled_dag", "2024-07-01", True),
+        ],
+        ids=[
+            "not-found",
+            "fulfilled-excluded",
+        ],
+    )
+    def test_should_response_404(self, test_client, dag_maker, session, 
dag_id, partition_key, fulfilled):
+        if fulfilled:
+            with dag_maker(
+                dag_id="fulfilled_dag",
+                
schedule=PartitionedAssetTimetable(assets=Asset(uri="s3://bucket/ful0", 
name="ful0")),
+                serialized=True,
+            ):
+                EmptyOperator(task_id="t")
+
+            dr = dag_maker.create_dagrun()
+            dag_maker.sync_dagbag_to_db()
+
+            session.add(
+                AssetPartitionDagRun(
+                    target_dag_id="fulfilled_dag",
+                    partition_key="2024-07-01",
+                    created_dag_run_id=dr.id,
+                )
+            )
+            session.commit()
+
+        resp = 
test_client.get(f"/pending_partitioned_dag_run/{dag_id}/{partition_key}")
+        assert resp.status_code == 404
+
+    @pytest.mark.parametrize(
+        ("num_assets", "received_count"),
+        [
+            (1, 1),
+            (1, 0),
+            (2, 1),
+            (2, 2),
+            (2, 0),
+        ],
+        ids=[
+            "1-asset-received-pending",
+            "1-asset-none-received-pending",
+            "2-assets-partial-pending",
+            "2-assets-all-received-pending",
+            "2-assets-none-received-pending",
+        ],
+    )
+    def test_should_response_200(self, test_client, dag_maker, session, 
num_assets, received_count):
+        uris = [f"s3://bucket/dt{i}" for i in range(num_assets)]
+        asset_defs = [Asset(uri=uri, name=f"dt{i}") for i, uri in 
enumerate(uris)]
+        schedule = asset_defs[0] if num_assets == 1 else asset_defs[0] & 
asset_defs[1]
+
+        with dag_maker(
+            dag_id="detail_dag",
+            schedule=PartitionedAssetTimetable(assets=schedule),
+            serialized=True,
+        ):
+            EmptyOperator(task_id="t")
+
+        dag_maker.create_dagrun()
+        dag_maker.sync_dagbag_to_db()
+
+        assets = {a.uri: a for a in 
session.scalars(select(AssetModel).where(AssetModel.uri.in_(uris)))}
+
+        pdr = AssetPartitionDagRun(
+            target_dag_id="detail_dag",
+            partition_key="2024-07-01",
+            created_dag_run_id=None,
+        )
+        session.add(pdr)
+        session.flush()
+
+        for uri in uris[:received_count]:
+            event = AssetEvent(asset_id=assets[uri].id, 
timestamp=pendulum.now())
+            session.add(event)
+            session.flush()
+            session.add(
+                PartitionedAssetKeyLog(
+                    asset_id=assets[uri].id,
+                    asset_event_id=event.id,
+                    asset_partition_dag_run_id=pdr.id,
+                    source_partition_key="2024-07-01",
+                    target_dag_id="detail_dag",
+                    target_partition_key="2024-07-01",
+                )
+            )
+        session.commit()
+
+        resp = 
test_client.get("/pending_partitioned_dag_run/detail_dag/2024-07-01")
+        assert resp.status_code == 200
+        body = resp.json()
+        assert body["dag_id"] == "detail_dag"
+        assert body["partition_key"] == "2024-07-01"
+        assert body["total_required"] == num_assets
+        assert body["total_received"] == received_count
+        assert len(body["assets"]) == num_assets
+        assert body["asset_expression"] is not None
+        assert body["created_dag_run_id"] is None
+
+        received_uris = {a["asset_uri"] for a in body["assets"] if 
a["received"]}
+        assert received_uris == set(uris[:received_count])


Reply via email to