This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7b8bd2823a1 AIP-84: Migrating GET queued asset events for DAG to
fastAPI (#44124)
7b8bd2823a1 is described below
commit 7b8bd2823a1b64f3d7fbc20a9f1c6b4b4e1be263
Author: Amogh Desai <[email protected]>
AuthorDate: Mon Nov 18 14:55:51 2024 +0530
AIP-84: Migrating GET queued asset events for DAG to fastAPI (#44124)
* AIP-84: Migrating GET queued asset events for DAG to fastAPI
* fixing tests and server code
* fixing parameters
* fixing parameters
---
airflow/api_connexion/endpoints/asset_endpoint.py | 1 +
airflow/api_fastapi/common/parameters.py | 24 +++++-
airflow/api_fastapi/core_api/datamodels/assets.py | 15 ++++
.../api_fastapi/core_api/openapi/v1-generated.yaml | 90 +++++++++++++++++++++-
.../api_fastapi/core_api/routes/public/assets.py | 81 +++++++++++++++++--
airflow/ui/openapi-gen/queries/common.ts | 22 ++++++
airflow/ui/openapi-gen/queries/prefetch.ts | 26 +++++++
airflow/ui/openapi-gen/queries/queries.ts | 33 ++++++++
airflow/ui/openapi-gen/queries/suspense.ts | 33 ++++++++
airflow/ui/openapi-gen/requests/schemas.gen.ts | 42 ++++++++++
airflow/ui/openapi-gen/requests/services.gen.ts | 34 +++++++-
airflow/ui/openapi-gen/requests/types.gen.ts | 53 ++++++++++++-
.../core_api/routes/public/test_assets.py | 66 ++++++++++++++--
13 files changed, 502 insertions(+), 18 deletions(-)
diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py
b/airflow/api_connexion/endpoints/asset_endpoint.py
index 7915bf8b034..ff47db88387 100644
--- a/airflow/api_connexion/endpoints/asset_endpoint.py
+++ b/airflow/api_connexion/endpoints/asset_endpoint.py
@@ -222,6 +222,7 @@ def delete_dag_asset_queued_event(
)
+@mark_fastapi_migration_done
@security.requires_access_asset("GET")
@security.requires_access_dag("GET")
@provide_session
diff --git a/airflow/api_fastapi/common/parameters.py
b/airflow/api_fastapi/common/parameters.py
index 337d85547c3..5b2d7cc68b0 100644
--- a/airflow/api_fastapi/common/parameters.py
+++ b/airflow/api_fastapi/common/parameters.py
@@ -19,7 +19,7 @@ from __future__ import annotations
from abc import ABC, abstractmethod
from datetime import datetime
-from typing import TYPE_CHECKING, Annotated, Any, Callable, Generic, List,
Optional, TypeVar
+from typing import TYPE_CHECKING, Annotated, Any, Callable, Generic, List,
Optional, TypeVar, Union, overload
from fastapi import Depends, HTTPException, Query
from pendulum.parsing.exceptions import ParserError
@@ -409,6 +409,27 @@ def _safe_parse_datetime(date_to_check: str) -> datetime:
"""
if not date_to_check:
raise ValueError(f"{date_to_check} cannot be None.")
+ return _safe_parse_datetime_optional(date_to_check)
+
+
+@overload
+def _safe_parse_datetime_optional(date_to_check: str) -> datetime: ...
+
+
+@overload
+def _safe_parse_datetime_optional(date_to_check: None) -> None: ...
+
+
+def _safe_parse_datetime_optional(date_to_check: str | None) -> datetime |
None:
+ """
+ Parse datetime and raise error for invalid dates.
+
+ Allow None values.
+
+ :param date_to_check: the string value to be parsed
+ """
+ if date_to_check is None:
+ return None
try:
return timezone.parse(date_to_check, strict=True)
except (TypeError, ParserError):
@@ -615,6 +636,7 @@ def float_range_filter_factory(
# Common Safe DateTime
DateTimeQuery = Annotated[str, AfterValidator(_safe_parse_datetime)]
+OptionalDateTimeQuery = Annotated[Union[str, None],
AfterValidator(_safe_parse_datetime_optional)]
# DAG
QueryLimit = Annotated[LimitFilter, Depends(LimitFilter().depends)]
diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py
b/airflow/api_fastapi/core_api/datamodels/assets.py
index e5ac10715ed..bfdbb2d7fc8 100644
--- a/airflow/api_fastapi/core_api/datamodels/assets.py
+++ b/airflow/api_fastapi/core_api/datamodels/assets.py
@@ -101,6 +101,21 @@ class AssetEventCollectionResponse(BaseModel):
total_entries: int
+class QueuedEventResponse(BaseModel):
+ """Queued Event serializer for responses.."""
+
+ uri: str
+ dag_id: str
+ created_at: datetime
+
+
+class QueuedEventCollectionResponse(BaseModel):
+ """Queued Event Collection serializer for responses."""
+
+ queued_events: list[QueuedEventResponse]
+ total_entries: int
+
+
class CreateAssetEventsBody(BaseModel):
"""Create asset events request."""
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index e7762392a0c..8678b21c9b0 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -170,7 +170,7 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
- /public/assets/:
+ /public/assets:
get:
tags:
- Asset
@@ -434,6 +434,59 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /public/dags/{dag_id}/assets/queuedEvent:
+ get:
+ tags:
+ - Asset
+ summary: Get Dag Asset Queued Events
+ description: Get queued asset events for a DAG.
+ operationId: get_dag_asset_queued_events
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: before
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Before
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/QueuedEventCollectionResponse'
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/public/backfills/:
get:
tags:
@@ -5730,6 +5783,41 @@ components:
- version
title: ProviderResponse
description: Provider serializer for responses.
+ QueuedEventCollectionResponse:
+ properties:
+ queued_events:
+ items:
+ $ref: '#/components/schemas/QueuedEventResponse'
+ type: array
+ title: Queued Events
+ total_entries:
+ type: integer
+ title: Total Entries
+ type: object
+ required:
+ - queued_events
+ - total_entries
+ title: QueuedEventCollectionResponse
+ description: Queued Event Collection serializer for responses.
+ QueuedEventResponse:
+ properties:
+ uri:
+ type: string
+ title: Uri
+ dag_id:
+ type: string
+ title: Dag Id
+ created_at:
+ type: string
+ format: date-time
+ title: Created At
+ type: object
+ required:
+ - uri
+ - dag_id
+ - created_at
+ title: QueuedEventResponse
+ description: Queued Event serializer for responses..
ReprocessBehavior:
type: string
enum:
diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py
b/airflow/api_fastapi/core_api/routes/public/assets.py
index 326a387f008..37fbb6e5878 100644
--- a/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -17,6 +17,7 @@
from __future__ import annotations
+from datetime import datetime
from typing import Annotated
from fastapi import Depends, HTTPException, status
@@ -25,6 +26,7 @@ from sqlalchemy.orm import Session, joinedload, subqueryload
from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import (
+ OptionalDateTimeQuery,
QueryAssetDagIdPatternSearch,
QueryAssetIdFilter,
QueryLimit,
@@ -43,18 +45,41 @@ from airflow.api_fastapi.core_api.datamodels.assets import (
AssetEventResponse,
AssetResponse,
CreateAssetEventsBody,
+ QueuedEventCollectionResponse,
+ QueuedEventResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
from airflow.assets import Asset
from airflow.assets.manager import asset_manager
-from airflow.models.asset import AssetEvent, AssetModel
+from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
from airflow.utils import timezone
-assets_router = AirflowRouter(tags=["Asset"], prefix="/assets")
+assets_router = AirflowRouter(tags=["Asset"])
+
+
+def _generate_queued_event_where_clause(
+ *,
+ dag_id: str | None = None,
+ uri: str | None = None,
+ before: datetime | str | None = None,
+) -> list:
+ """Get AssetDagRunQueue where clause."""
+ where_clause = []
+ if dag_id is not None:
+ where_clause.append(AssetDagRunQueue.target_dag_id == dag_id)
+ if uri is not None:
+ where_clause.append(
+ AssetDagRunQueue.asset_id.in_(
+ select(AssetModel.id).where(AssetModel.uri == uri),
+ ),
+ )
+ if before is not None:
+ where_clause.append(AssetDagRunQueue.created_at < before)
+ return where_clause
@assets_router.get(
- "/",
+ "/assets",
responses=create_openapi_http_exception_doc([401, 403, 404]),
)
def get_assets(
@@ -89,7 +114,7 @@ def get_assets(
@assets_router.get(
- "/events",
+ "/assets/events",
responses=create_openapi_http_exception_doc([404]),
)
def get_asset_events(
@@ -139,7 +164,7 @@ def get_asset_events(
@assets_router.post(
- "/events",
+ "/assets/events",
responses=create_openapi_http_exception_doc([404]),
)
def create_asset_event(
@@ -165,7 +190,7 @@ def create_asset_event(
@assets_router.get(
- "/{uri:path}",
+ "/assets/{uri:path}",
responses=create_openapi_http_exception_doc([401, 403, 404]),
)
def get_asset(
@@ -183,3 +208,47 @@ def get_asset(
raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with uri:
`{uri}` was not found")
return AssetResponse.model_validate(asset, from_attributes=True)
+
+
+@assets_router.get(
+ "/dags/{dag_id}/assets/queuedEvent",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
+def get_dag_asset_queued_events(
+ dag_id: str,
+ session: Annotated[Session, Depends(get_session)],
+ before: OptionalDateTimeQuery = None,
+) -> QueuedEventCollectionResponse:
+ """Get queued asset events for a DAG."""
+ where_clause = _generate_queued_event_where_clause(dag_id=dag_id,
before=before)
+ query = (
+ select(AssetDagRunQueue, AssetModel.uri)
+ .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
+ .where(*where_clause)
+ )
+
+ dag_asset_queued_events_select, total_entries = paginated_select(
+ query,
+ [],
+ )
+ adrqs = session.execute(dag_asset_queued_events_select).all()
+
+ if not adrqs:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with
dag_id: `{dag_id}` was not found")
+
+ queued_events = [
+ QueuedEventResponse(created_at=adrq.created_at,
dag_id=adrq.target_dag_id, uri=uri)
+ for adrq, uri in adrqs
+ ]
+
+ return QueuedEventCollectionResponse(
+ queued_events=[
+ QueuedEventResponse.model_validate(queued_event,
from_attributes=True)
+ for queued_event in queued_events
+ ],
+ total_entries=total_entries,
+ )
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index 46940bfd318..7b23e33f0ab 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -129,6 +129,28 @@ export const UseAssetServiceGetAssetKeyFn = (
},
queryKey?: Array<unknown>,
) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ uri }])];
+export type AssetServiceGetDagAssetQueuedEventsDefaultResponse = Awaited<
+ ReturnType<typeof AssetService.getDagAssetQueuedEvents>
+>;
+export type AssetServiceGetDagAssetQueuedEventsQueryResult<
+ TData = AssetServiceGetDagAssetQueuedEventsDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useAssetServiceGetDagAssetQueuedEventsKey =
+ "AssetServiceGetDagAssetQueuedEvents";
+export const UseAssetServiceGetDagAssetQueuedEventsKeyFn = (
+ {
+ before,
+ dagId,
+ }: {
+ before?: string;
+ dagId: string;
+ },
+ queryKey?: Array<unknown>,
+) => [
+ useAssetServiceGetDagAssetQueuedEventsKey,
+ ...(queryKey ?? [{ before, dagId }]),
+];
export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited<
ReturnType<typeof DashboardService.historicalMetrics>
>;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index 4c541670258..0c522f36e43 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -165,6 +165,32 @@ export const prefetchUseAssetServiceGetAsset = (
queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }),
queryFn: () => AssetService.getAsset({ uri }),
});
+/**
+ * Get Dag Asset Queued Events
+ * Get queued asset events for a DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.before
+ * @returns QueuedEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseAssetServiceGetDagAssetQueuedEvents = (
+ queryClient: QueryClient,
+ {
+ before,
+ dagId,
+ }: {
+ before?: string;
+ dagId: string;
+ },
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseAssetServiceGetDagAssetQueuedEventsKeyFn({
+ before,
+ dagId,
+ }),
+ queryFn: () => AssetService.getDagAssetQueuedEvents({ before, dagId }),
+ });
/**
* Historical Metrics
* Return cluster activity historical metrics.
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index a96b09e12a7..8ec0ea9234a 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -213,6 +213,39 @@ export const useAssetServiceGetAsset = <
queryFn: () => AssetService.getAsset({ uri }) as TData,
...options,
});
+/**
+ * Get Dag Asset Queued Events
+ * Get queued asset events for a DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.before
+ * @returns QueuedEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useAssetServiceGetDagAssetQueuedEvents = <
+ TData = Common.AssetServiceGetDagAssetQueuedEventsDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ before,
+ dagId,
+ }: {
+ before?: string;
+ dagId: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseAssetServiceGetDagAssetQueuedEventsKeyFn(
+ { before, dagId },
+ queryKey,
+ ),
+ queryFn: () =>
+ AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData,
+ ...options,
+ });
/**
* Historical Metrics
* Return cluster activity historical metrics.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index 43331b187fe..1b814222815 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -195,6 +195,39 @@ export const useAssetServiceGetAssetSuspense = <
queryFn: () => AssetService.getAsset({ uri }) as TData,
...options,
});
+/**
+ * Get Dag Asset Queued Events
+ * Get queued asset events for a DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.before
+ * @returns QueuedEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useAssetServiceGetDagAssetQueuedEventsSuspense = <
+ TData = Common.AssetServiceGetDagAssetQueuedEventsDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ before,
+ dagId,
+ }: {
+ before?: string;
+ dagId: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseAssetServiceGetDagAssetQueuedEventsKeyFn(
+ { before, dagId },
+ queryKey,
+ ),
+ queryFn: () =>
+ AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData,
+ ...options,
+ });
/**
* Historical Metrics
* Return cluster activity historical metrics.
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index e5ac0441a2a..1f83e434286 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -2880,6 +2880,48 @@ export const $ProviderResponse = {
description: "Provider serializer for responses.",
} as const;
+export const $QueuedEventCollectionResponse = {
+ properties: {
+ queued_events: {
+ items: {
+ $ref: "#/components/schemas/QueuedEventResponse",
+ },
+ type: "array",
+ title: "Queued Events",
+ },
+ total_entries: {
+ type: "integer",
+ title: "Total Entries",
+ },
+ },
+ type: "object",
+ required: ["queued_events", "total_entries"],
+ title: "QueuedEventCollectionResponse",
+ description: "Queued Event Collection serializer for responses.",
+} as const;
+
+export const $QueuedEventResponse = {
+ properties: {
+ uri: {
+ type: "string",
+ title: "Uri",
+ },
+ dag_id: {
+ type: "string",
+ title: "Dag Id",
+ },
+ created_at: {
+ type: "string",
+ format: "date-time",
+ title: "Created At",
+ },
+ },
+ type: "object",
+ required: ["uri", "dag_id", "created_at"],
+ title: "QueuedEventResponse",
+ description: "Queued Event serializer for responses..",
+} as const;
+
export const $ReprocessBehavior = {
type: "string",
enum: ["failed", "completed", "none"],
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 53bb3527d14..361cde79fc5 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -13,6 +13,8 @@ import type {
CreateAssetEventResponse,
GetAssetData,
GetAssetResponse,
+ GetDagAssetQueuedEventsData,
+ GetDagAssetQueuedEventsResponse,
HistoricalMetricsData,
HistoricalMetricsResponse,
RecentDagRunsData,
@@ -167,7 +169,7 @@ export class AssetService {
): CancelablePromise<GetAssetsResponse> {
return __request(OpenAPI, {
method: "GET",
- url: "/public/assets/",
+ url: "/public/assets",
query: {
limit: data.limit,
offset: data.offset,
@@ -274,6 +276,36 @@ export class AssetService {
},
});
}
+
+ /**
+ * Get Dag Asset Queued Events
+ * Get queued asset events for a DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.before
+ * @returns QueuedEventCollectionResponse Successful Response
+ * @throws ApiError
+ */
+ public static getDagAssetQueuedEvents(
+ data: GetDagAssetQueuedEventsData,
+ ): CancelablePromise<GetDagAssetQueuedEventsResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/public/dags/{dag_id}/assets/queuedEvent",
+ path: {
+ dag_id: data.dagId,
+ },
+ query: {
+ before: data.before,
+ },
+ errors: {
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
}
export class DashboardService {
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 078699cc0f2..948b3149c4f 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -712,6 +712,23 @@ export type ProviderResponse = {
version: string;
};
+/**
+ * Queued Event Collection serializer for responses.
+ */
+export type QueuedEventCollectionResponse = {
+ queued_events: Array<QueuedEventResponse>;
+ total_entries: number;
+};
+
+/**
+ * Queued Event serializer for responses..
+ */
+export type QueuedEventResponse = {
+ uri: string;
+ dag_id: string;
+ created_at: string;
+};
+
/**
* Internal enum for setting reprocess behavior in a backfill.
*
@@ -1045,6 +1062,13 @@ export type GetAssetData = {
export type GetAssetResponse = AssetResponse;
+export type GetDagAssetQueuedEventsData = {
+ before?: string | null;
+ dagId: string;
+};
+
+export type GetDagAssetQueuedEventsResponse = QueuedEventCollectionResponse;
+
export type HistoricalMetricsData = {
endDate: string;
startDate: string;
@@ -1535,7 +1559,7 @@ export type $OpenApiTs = {
};
};
};
- "/public/assets/": {
+ "/public/assets": {
get: {
req: GetAssetsData;
res: {
@@ -1641,6 +1665,33 @@ export type $OpenApiTs = {
};
};
};
+ "/public/dags/{dag_id}/assets/queuedEvent": {
+ get: {
+ req: GetDagAssetQueuedEventsData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: QueuedEventCollectionResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
"/ui/dashboard/historical_metrics_data": {
get: {
req: HistoricalMetricsData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py
b/tests/api_fastapi/core_api/routes/public/test_assets.py
index 42b7acd908f..4e127ffd0c7 100644
--- a/tests/api_fastapi/core_api/routes/public/test_assets.py
+++ b/tests/api_fastapi/core_api/routes/public/test_assets.py
@@ -24,7 +24,13 @@ import pytest
import time_machine
from airflow.models import DagModel
-from airflow.models.asset import AssetEvent, AssetModel,
DagScheduleAssetReference, TaskOutletAssetReference
+from airflow.models.asset import (
+ AssetDagRunQueue,
+ AssetEvent,
+ AssetModel,
+ DagScheduleAssetReference,
+ TaskOutletAssetReference,
+)
from airflow.models.dagrun import DagRun
from airflow.utils import timezone
from airflow.utils.session import provide_session
@@ -114,6 +120,15 @@ def _create_asset_dag_run(session, num: int = 2):
class TestAssets:
default_time = "2020-06-11T18:00:00+00:00"
+ @pytest.fixture
+ def time_freezer(self) -> Generator:
+ freezer = time_machine.travel(self.default_time, tick=False)
+ freezer.start()
+
+ yield
+
+ freezer.stop()
+
@pytest.fixture(autouse=True)
def setup(self) -> None:
clear_db_assets()
@@ -464,16 +479,51 @@ class TestGetAssetEndpoint(TestAssets):
assert response.json()["detail"] == "The Asset with uri:
`s3://bucket/key` was not found"
-class TestPostAssetEvents(TestAssets):
- @pytest.fixture
- def time_freezer(self) -> Generator:
- freezer = time_machine.travel(self.default_time, tick=False)
- freezer.start()
+class TestQueuedEventEndpoint(TestAssets):
+ def _create_asset_dag_run_queues(self, dag_id, asset_id, session):
+ adrq = AssetDagRunQueue(target_dag_id=dag_id, asset_id=asset_id)
+ session.add(adrq)
+ session.commit()
+ return adrq
- yield
- freezer.stop()
+class TestGetDagAssetQueuedEvents(TestQueuedEventEndpoint):
+ @pytest.mark.usefixtures("time_freezer")
+ def test_should_respond_200(self, test_client, session, create_dummy_dag):
+ dag, _ = create_dummy_dag()
+ dag_id = dag.dag_id
+ self.create_assets(session=session, num=1)
+ asset_id = 1
+ self._create_asset_dag_run_queues(dag_id, asset_id, session)
+ response = test_client.get(
+ f"/public/dags/{dag_id}/assets/queuedEvent",
+ )
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "queued_events": [
+ {
+ "created_at": self.default_time.replace("+00:00", "Z"),
+ "uri": "s3://bucket/key/1",
+ "dag_id": "dag",
+ }
+ ],
+ "total_entries": 1,
+ }
+
+ def test_should_respond_404(self, test_client):
+ dag_id = "not_exists"
+
+ response = test_client.get(
+ f"/public/dags/{dag_id}/assets/queuedEvent",
+ )
+
+ assert response.status_code == 404
+ assert response.json()["detail"] == "Queue event with dag_id:
`not_exists` was not found"
+
+
+class TestPostAssetEvents(TestAssets):
@pytest.mark.usefixtures("time_freezer")
def test_should_respond_200(self, test_client, session):
self.create_assets()