This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7b8bd2823a1 AIP-84: Migrating GET queued asset events for DAG to 
fastAPI (#44124)
7b8bd2823a1 is described below

commit 7b8bd2823a1b64f3d7fbc20a9f1c6b4b4e1be263
Author: Amogh Desai <[email protected]>
AuthorDate: Mon Nov 18 14:55:51 2024 +0530

    AIP-84: Migrating GET queued asset events for DAG to fastAPI (#44124)
    
    * AIP-84: Migrating GET queued asset events for DAG to fastAPI
    
    * fixing tests and server code
    
    * fixing parameters
    
    * fixing parameters
---
 airflow/api_connexion/endpoints/asset_endpoint.py  |  1 +
 airflow/api_fastapi/common/parameters.py           | 24 +++++-
 airflow/api_fastapi/core_api/datamodels/assets.py  | 15 ++++
 .../api_fastapi/core_api/openapi/v1-generated.yaml | 90 +++++++++++++++++++++-
 .../api_fastapi/core_api/routes/public/assets.py   | 81 +++++++++++++++++--
 airflow/ui/openapi-gen/queries/common.ts           | 22 ++++++
 airflow/ui/openapi-gen/queries/prefetch.ts         | 26 +++++++
 airflow/ui/openapi-gen/queries/queries.ts          | 33 ++++++++
 airflow/ui/openapi-gen/queries/suspense.ts         | 33 ++++++++
 airflow/ui/openapi-gen/requests/schemas.gen.ts     | 42 ++++++++++
 airflow/ui/openapi-gen/requests/services.gen.ts    | 34 +++++++-
 airflow/ui/openapi-gen/requests/types.gen.ts       | 53 ++++++++++++-
 .../core_api/routes/public/test_assets.py          | 66 ++++++++++++++--
 13 files changed, 502 insertions(+), 18 deletions(-)

diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py 
b/airflow/api_connexion/endpoints/asset_endpoint.py
index 7915bf8b034..ff47db88387 100644
--- a/airflow/api_connexion/endpoints/asset_endpoint.py
+++ b/airflow/api_connexion/endpoints/asset_endpoint.py
@@ -222,6 +222,7 @@ def delete_dag_asset_queued_event(
     )
 
 
+@mark_fastapi_migration_done
 @security.requires_access_asset("GET")
 @security.requires_access_dag("GET")
 @provide_session
diff --git a/airflow/api_fastapi/common/parameters.py 
b/airflow/api_fastapi/common/parameters.py
index 337d85547c3..5b2d7cc68b0 100644
--- a/airflow/api_fastapi/common/parameters.py
+++ b/airflow/api_fastapi/common/parameters.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 
 from abc import ABC, abstractmethod
 from datetime import datetime
-from typing import TYPE_CHECKING, Annotated, Any, Callable, Generic, List, 
Optional, TypeVar
+from typing import TYPE_CHECKING, Annotated, Any, Callable, Generic, List, 
Optional, TypeVar, Union, overload
 
 from fastapi import Depends, HTTPException, Query
 from pendulum.parsing.exceptions import ParserError
@@ -409,6 +409,27 @@ def _safe_parse_datetime(date_to_check: str) -> datetime:
     """
     if not date_to_check:
         raise ValueError(f"{date_to_check} cannot be None.")
+    return _safe_parse_datetime_optional(date_to_check)
+
+
+@overload
+def _safe_parse_datetime_optional(date_to_check: str) -> datetime: ...
+
+
+@overload
+def _safe_parse_datetime_optional(date_to_check: None) -> None: ...
+
+
+def _safe_parse_datetime_optional(date_to_check: str | None) -> datetime | 
None:
+    """
+    Parse datetime and raise error for invalid dates.
+
+    Allow None values.
+
+    :param date_to_check: the string value to be parsed
+    """
+    if date_to_check is None:
+        return None
     try:
         return timezone.parse(date_to_check, strict=True)
     except (TypeError, ParserError):
@@ -615,6 +636,7 @@ def float_range_filter_factory(
 
 # Common Safe DateTime
 DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)]
+OptionalDateTimeQuery = Annotated[Union[str, None], 
AfterValidator(_safe_parse_datetime_optional)]
 
 # DAG
 QueryLimit = Annotated[LimitFilter, Depends(LimitFilter().depends)]
diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py 
b/airflow/api_fastapi/core_api/datamodels/assets.py
index e5ac10715ed..bfdbb2d7fc8 100644
--- a/airflow/api_fastapi/core_api/datamodels/assets.py
+++ b/airflow/api_fastapi/core_api/datamodels/assets.py
@@ -101,6 +101,21 @@ class AssetEventCollectionResponse(BaseModel):
     total_entries: int
 
 
+class QueuedEventResponse(BaseModel):
+    """Queued Event serializer for responses.."""
+
+    uri: str
+    dag_id: str
+    created_at: datetime
+
+
+class QueuedEventCollectionResponse(BaseModel):
+    """Queued Event Collection serializer for responses."""
+
+    queued_events: list[QueuedEventResponse]
+    total_entries: int
+
+
 class CreateAssetEventsBody(BaseModel):
     """Create asset events request."""
 
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index e7762392a0c..8678b21c9b0 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -170,7 +170,7 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
-  /public/assets/:
+  /public/assets:
     get:
       tags:
       - Asset
@@ -434,6 +434,59 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
+  /public/dags/{dag_id}/assets/queuedEvent:
+    get:
+      tags:
+      - Asset
+      summary: Get Dag Asset Queued Events
+      description: Get queued asset events for a DAG.
+      operationId: get_dag_asset_queued_events
+      parameters:
+      - name: dag_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Id
+      - name: before
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Before
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/QueuedEventCollectionResponse'
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
   /public/backfills/:
     get:
       tags:
@@ -5730,6 +5783,41 @@ components:
       - version
       title: ProviderResponse
       description: Provider serializer for responses.
+    QueuedEventCollectionResponse:
+      properties:
+        queued_events:
+          items:
+            $ref: '#/components/schemas/QueuedEventResponse'
+          type: array
+          title: Queued Events
+        total_entries:
+          type: integer
+          title: Total Entries
+      type: object
+      required:
+      - queued_events
+      - total_entries
+      title: QueuedEventCollectionResponse
+      description: Queued Event Collection serializer for responses.
+    QueuedEventResponse:
+      properties:
+        uri:
+          type: string
+          title: Uri
+        dag_id:
+          type: string
+          title: Dag Id
+        created_at:
+          type: string
+          format: date-time
+          title: Created At
+      type: object
+      required:
+      - uri
+      - dag_id
+      - created_at
+      title: QueuedEventResponse
+      description: Queued Event serializer for responses..
     ReprocessBehavior:
       type: string
       enum:
diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py 
b/airflow/api_fastapi/core_api/routes/public/assets.py
index 326a387f008..37fbb6e5878 100644
--- a/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -17,6 +17,7 @@
 
 from __future__ import annotations
 
+from datetime import datetime
 from typing import Annotated
 
 from fastapi import Depends, HTTPException, status
@@ -25,6 +26,7 @@ from sqlalchemy.orm import Session, joinedload, subqueryload
 
 from airflow.api_fastapi.common.db.common import get_session, paginated_select
 from airflow.api_fastapi.common.parameters import (
+    OptionalDateTimeQuery,
     QueryAssetDagIdPatternSearch,
     QueryAssetIdFilter,
     QueryLimit,
@@ -43,18 +45,41 @@ from airflow.api_fastapi.core_api.datamodels.assets import (
     AssetEventResponse,
     AssetResponse,
     CreateAssetEventsBody,
+    QueuedEventCollectionResponse,
+    QueuedEventResponse,
 )
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
 from airflow.assets import Asset
 from airflow.assets.manager import asset_manager
-from airflow.models.asset import AssetEvent, AssetModel
+from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
 from airflow.utils import timezone
 
-assets_router = AirflowRouter(tags=["Asset"], prefix="/assets")
+assets_router = AirflowRouter(tags=["Asset"])
+
+
+def _generate_queued_event_where_clause(
+    *,
+    dag_id: str | None = None,
+    uri: str | None = None,
+    before: datetime | str | None = None,
+) -> list:
+    """Get AssetDagRunQueue where clause."""
+    where_clause = []
+    if dag_id is not None:
+        where_clause.append(AssetDagRunQueue.target_dag_id == dag_id)
+    if uri is not None:
+        where_clause.append(
+            AssetDagRunQueue.asset_id.in_(
+                select(AssetModel.id).where(AssetModel.uri == uri),
+            ),
+        )
+    if before is not None:
+        where_clause.append(AssetDagRunQueue.created_at < before)
+    return where_clause
 
 
 @assets_router.get(
-    "/",
+    "/assets",
     responses=create_openapi_http_exception_doc([401, 403, 404]),
 )
 def get_assets(
@@ -89,7 +114,7 @@ def get_assets(
 
 
 @assets_router.get(
-    "/events",
+    "/assets/events",
     responses=create_openapi_http_exception_doc([404]),
 )
 def get_asset_events(
@@ -139,7 +164,7 @@ def get_asset_events(
 
 
 @assets_router.post(
-    "/events",
+    "/assets/events",
     responses=create_openapi_http_exception_doc([404]),
 )
 def create_asset_event(
@@ -165,7 +190,7 @@ def create_asset_event(
 
 
 @assets_router.get(
-    "/{uri:path}",
+    "/assets/{uri:path}",
     responses=create_openapi_http_exception_doc([401, 403, 404]),
 )
 def get_asset(
@@ -183,3 +208,47 @@ def get_asset(
         raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with uri: 
`{uri}` was not found")
 
     return AssetResponse.model_validate(asset, from_attributes=True)
+
+
+@assets_router.get(
+    "/dags/{dag_id}/assets/queuedEvent",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_404_NOT_FOUND,
+        ]
+    ),
+)
+def get_dag_asset_queued_events(
+    dag_id: str,
+    session: Annotated[Session, Depends(get_session)],
+    before: OptionalDateTimeQuery = None,
+) -> QueuedEventCollectionResponse:
+    """Get queued asset events for a DAG."""
+    where_clause = _generate_queued_event_where_clause(dag_id=dag_id, 
before=before)
+    query = (
+        select(AssetDagRunQueue, AssetModel.uri)
+        .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
+        .where(*where_clause)
+    )
+
+    dag_asset_queued_events_select, total_entries = paginated_select(
+        query,
+        [],
+    )
+    adrqs = session.execute(dag_asset_queued_events_select).all()
+
+    if not adrqs:
+        raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with 
dag_id: `{dag_id}` was not found")
+
+    queued_events = [
+        QueuedEventResponse(created_at=adrq.created_at, 
dag_id=adrq.target_dag_id, uri=uri)
+        for adrq, uri in adrqs
+    ]
+
+    return QueuedEventCollectionResponse(
+        queued_events=[
+            QueuedEventResponse.model_validate(queued_event, 
from_attributes=True)
+            for queued_event in queued_events
+        ],
+        total_entries=total_entries,
+    )
diff --git a/airflow/ui/openapi-gen/queries/common.ts 
b/airflow/ui/openapi-gen/queries/common.ts
index 46940bfd318..7b23e33f0ab 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -129,6 +129,28 @@ export const UseAssetServiceGetAssetKeyFn = (
   },
   queryKey?: Array<unknown>,
 ) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ uri }])];
+export type AssetServiceGetDagAssetQueuedEventsDefaultResponse = Awaited<
+  ReturnType<typeof AssetService.getDagAssetQueuedEvents>
+>;
+export type AssetServiceGetDagAssetQueuedEventsQueryResult<
+  TData = AssetServiceGetDagAssetQueuedEventsDefaultResponse,
+  TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useAssetServiceGetDagAssetQueuedEventsKey =
+  "AssetServiceGetDagAssetQueuedEvents";
+export const UseAssetServiceGetDagAssetQueuedEventsKeyFn = (
+  {
+    before,
+    dagId,
+  }: {
+    before?: string;
+    dagId: string;
+  },
+  queryKey?: Array<unknown>,
+) => [
+  useAssetServiceGetDagAssetQueuedEventsKey,
+  ...(queryKey ?? [{ before, dagId }]),
+];
 export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited<
   ReturnType<typeof DashboardService.historicalMetrics>
 >;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts 
b/airflow/ui/openapi-gen/queries/prefetch.ts
index 4c541670258..0c522f36e43 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -165,6 +165,32 @@ export const prefetchUseAssetServiceGetAsset = (
     queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }),
     queryFn: () => AssetService.getAsset({ uri }),
   });
+/**
+ * Get Dag Asset Queued Events
+ * Get queued asset events for a DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.before
+ * @returns QueuedEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseAssetServiceGetDagAssetQueuedEvents = (
+  queryClient: QueryClient,
+  {
+    before,
+    dagId,
+  }: {
+    before?: string;
+    dagId: string;
+  },
+) =>
+  queryClient.prefetchQuery({
+    queryKey: Common.UseAssetServiceGetDagAssetQueuedEventsKeyFn({
+      before,
+      dagId,
+    }),
+    queryFn: () => AssetService.getDagAssetQueuedEvents({ before, dagId }),
+  });
 /**
  * Historical Metrics
  * Return cluster activity historical metrics.
diff --git a/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow/ui/openapi-gen/queries/queries.ts
index a96b09e12a7..8ec0ea9234a 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -213,6 +213,39 @@ export const useAssetServiceGetAsset = <
     queryFn: () => AssetService.getAsset({ uri }) as TData,
     ...options,
   });
+/**
+ * Get Dag Asset Queued Events
+ * Get queued asset events for a DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.before
+ * @returns QueuedEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useAssetServiceGetDagAssetQueuedEvents = <
+  TData = Common.AssetServiceGetDagAssetQueuedEventsDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    before,
+    dagId,
+  }: {
+    before?: string;
+    dagId: string;
+  },
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useQuery<TData, TError>({
+    queryKey: Common.UseAssetServiceGetDagAssetQueuedEventsKeyFn(
+      { before, dagId },
+      queryKey,
+    ),
+    queryFn: () =>
+      AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData,
+    ...options,
+  });
 /**
  * Historical Metrics
  * Return cluster activity historical metrics.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts 
b/airflow/ui/openapi-gen/queries/suspense.ts
index 43331b187fe..1b814222815 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -195,6 +195,39 @@ export const useAssetServiceGetAssetSuspense = <
     queryFn: () => AssetService.getAsset({ uri }) as TData,
     ...options,
   });
+/**
+ * Get Dag Asset Queued Events
+ * Get queued asset events for a DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.before
+ * @returns QueuedEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useAssetServiceGetDagAssetQueuedEventsSuspense = <
+  TData = Common.AssetServiceGetDagAssetQueuedEventsDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    before,
+    dagId,
+  }: {
+    before?: string;
+    dagId: string;
+  },
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useSuspenseQuery<TData, TError>({
+    queryKey: Common.UseAssetServiceGetDagAssetQueuedEventsKeyFn(
+      { before, dagId },
+      queryKey,
+    ),
+    queryFn: () =>
+      AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData,
+    ...options,
+  });
 /**
  * Historical Metrics
  * Return cluster activity historical metrics.
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index e5ac0441a2a..1f83e434286 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -2880,6 +2880,48 @@ export const $ProviderResponse = {
   description: "Provider serializer for responses.",
 } as const;
 
+export const $QueuedEventCollectionResponse = {
+  properties: {
+    queued_events: {
+      items: {
+        $ref: "#/components/schemas/QueuedEventResponse",
+      },
+      type: "array",
+      title: "Queued Events",
+    },
+    total_entries: {
+      type: "integer",
+      title: "Total Entries",
+    },
+  },
+  type: "object",
+  required: ["queued_events", "total_entries"],
+  title: "QueuedEventCollectionResponse",
+  description: "Queued Event Collection serializer for responses.",
+} as const;
+
+export const $QueuedEventResponse = {
+  properties: {
+    uri: {
+      type: "string",
+      title: "Uri",
+    },
+    dag_id: {
+      type: "string",
+      title: "Dag Id",
+    },
+    created_at: {
+      type: "string",
+      format: "date-time",
+      title: "Created At",
+    },
+  },
+  type: "object",
+  required: ["uri", "dag_id", "created_at"],
+  title: "QueuedEventResponse",
+  description: "Queued Event serializer for responses..",
+} as const;
+
 export const $ReprocessBehavior = {
   type: "string",
   enum: ["failed", "completed", "none"],
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 53bb3527d14..361cde79fc5 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -13,6 +13,8 @@ import type {
   CreateAssetEventResponse,
   GetAssetData,
   GetAssetResponse,
+  GetDagAssetQueuedEventsData,
+  GetDagAssetQueuedEventsResponse,
   HistoricalMetricsData,
   HistoricalMetricsResponse,
   RecentDagRunsData,
@@ -167,7 +169,7 @@ export class AssetService {
   ): CancelablePromise<GetAssetsResponse> {
     return __request(OpenAPI, {
       method: "GET",
-      url: "/public/assets/",
+      url: "/public/assets",
       query: {
         limit: data.limit,
         offset: data.offset,
@@ -274,6 +276,36 @@ export class AssetService {
       },
     });
   }
+
+  /**
+   * Get Dag Asset Queued Events
+   * Get queued asset events for a DAG.
+   * @param data The data for the request.
+   * @param data.dagId
+   * @param data.before
+   * @returns QueuedEventCollectionResponse Successful Response
+   * @throws ApiError
+   */
+  public static getDagAssetQueuedEvents(
+    data: GetDagAssetQueuedEventsData,
+  ): CancelablePromise<GetDagAssetQueuedEventsResponse> {
+    return __request(OpenAPI, {
+      method: "GET",
+      url: "/public/dags/{dag_id}/assets/queuedEvent",
+      path: {
+        dag_id: data.dagId,
+      },
+      query: {
+        before: data.before,
+      },
+      errors: {
+        401: "Unauthorized",
+        403: "Forbidden",
+        404: "Not Found",
+        422: "Validation Error",
+      },
+    });
+  }
 }
 
 export class DashboardService {
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 078699cc0f2..948b3149c4f 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -712,6 +712,23 @@ export type ProviderResponse = {
   version: string;
 };
 
+/**
+ * Queued Event Collection serializer for responses.
+ */
+export type QueuedEventCollectionResponse = {
+  queued_events: Array<QueuedEventResponse>;
+  total_entries: number;
+};
+
+/**
+ * Queued Event serializer for responses..
+ */
+export type QueuedEventResponse = {
+  uri: string;
+  dag_id: string;
+  created_at: string;
+};
+
 /**
  * Internal enum for setting reprocess behavior in a backfill.
  *
@@ -1045,6 +1062,13 @@ export type GetAssetData = {
 
 export type GetAssetResponse = AssetResponse;
 
+export type GetDagAssetQueuedEventsData = {
+  before?: string | null;
+  dagId: string;
+};
+
+export type GetDagAssetQueuedEventsResponse = QueuedEventCollectionResponse;
+
 export type HistoricalMetricsData = {
   endDate: string;
   startDate: string;
@@ -1535,7 +1559,7 @@ export type $OpenApiTs = {
       };
     };
   };
-  "/public/assets/": {
+  "/public/assets": {
     get: {
       req: GetAssetsData;
       res: {
@@ -1641,6 +1665,33 @@ export type $OpenApiTs = {
       };
     };
   };
+  "/public/dags/{dag_id}/assets/queuedEvent": {
+    get: {
+      req: GetDagAssetQueuedEventsData;
+      res: {
+        /**
+         * Successful Response
+         */
+        200: QueuedEventCollectionResponse;
+        /**
+         * Unauthorized
+         */
+        401: HTTPExceptionResponse;
+        /**
+         * Forbidden
+         */
+        403: HTTPExceptionResponse;
+        /**
+         * Not Found
+         */
+        404: HTTPExceptionResponse;
+        /**
+         * Validation Error
+         */
+        422: HTTPValidationError;
+      };
+    };
+  };
   "/ui/dashboard/historical_metrics_data": {
     get: {
       req: HistoricalMetricsData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py 
b/tests/api_fastapi/core_api/routes/public/test_assets.py
index 42b7acd908f..4e127ffd0c7 100644
--- a/tests/api_fastapi/core_api/routes/public/test_assets.py
+++ b/tests/api_fastapi/core_api/routes/public/test_assets.py
@@ -24,7 +24,13 @@ import pytest
 import time_machine
 
 from airflow.models import DagModel
-from airflow.models.asset import AssetEvent, AssetModel, 
DagScheduleAssetReference, TaskOutletAssetReference
+from airflow.models.asset import (
+    AssetDagRunQueue,
+    AssetEvent,
+    AssetModel,
+    DagScheduleAssetReference,
+    TaskOutletAssetReference,
+)
 from airflow.models.dagrun import DagRun
 from airflow.utils import timezone
 from airflow.utils.session import provide_session
@@ -114,6 +120,15 @@ def _create_asset_dag_run(session, num: int = 2):
 class TestAssets:
     default_time = "2020-06-11T18:00:00+00:00"
 
+    @pytest.fixture
+    def time_freezer(self) -> Generator:
+        freezer = time_machine.travel(self.default_time, tick=False)
+        freezer.start()
+
+        yield
+
+        freezer.stop()
+
     @pytest.fixture(autouse=True)
     def setup(self) -> None:
         clear_db_assets()
@@ -464,16 +479,51 @@ class TestGetAssetEndpoint(TestAssets):
         assert response.json()["detail"] == "The Asset with uri: 
`s3://bucket/key` was not found"
 
 
-class TestPostAssetEvents(TestAssets):
-    @pytest.fixture
-    def time_freezer(self) -> Generator:
-        freezer = time_machine.travel(self.default_time, tick=False)
-        freezer.start()
+class TestQueuedEventEndpoint(TestAssets):
+    def _create_asset_dag_run_queues(self, dag_id, asset_id, session):
+        adrq = AssetDagRunQueue(target_dag_id=dag_id, asset_id=asset_id)
+        session.add(adrq)
+        session.commit()
+        return adrq
 
-        yield
 
-        freezer.stop()
+class TestGetDagAssetQueuedEvents(TestQueuedEventEndpoint):
+    @pytest.mark.usefixtures("time_freezer")
+    def test_should_respond_200(self, test_client, session, create_dummy_dag):
+        dag, _ = create_dummy_dag()
+        dag_id = dag.dag_id
+        self.create_assets(session=session, num=1)
+        asset_id = 1
+        self._create_asset_dag_run_queues(dag_id, asset_id, session)
 
+        response = test_client.get(
+            f"/public/dags/{dag_id}/assets/queuedEvent",
+        )
+
+        assert response.status_code == 200
+        assert response.json() == {
+            "queued_events": [
+                {
+                    "created_at": self.default_time.replace("+00:00", "Z"),
+                    "uri": "s3://bucket/key/1",
+                    "dag_id": "dag",
+                }
+            ],
+            "total_entries": 1,
+        }
+
+    def test_should_respond_404(self, test_client):
+        dag_id = "not_exists"
+
+        response = test_client.get(
+            f"/public/dags/{dag_id}/assets/queuedEvent",
+        )
+
+        assert response.status_code == 404
+        assert response.json()["detail"] == "Queue event with dag_id: 
`not_exists` was not found"
+
+
+class TestPostAssetEvents(TestAssets):
     @pytest.mark.usefixtures("time_freezer")
     def test_should_respond_200(self, test_client, session):
         self.create_assets()

Reply via email to