This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new af958bbf923 AIP-84: Migrating GET one queued asset events for DAG to
fastAPI (#44128)
af958bbf923 is described below
commit af958bbf923580e55f2bbf46b1cce3f75ce77000
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Nov 19 11:17:57 2024 +0530
AIP-84: Migrating GET one queued asset events for DAG to fastAPI (#44128)
* AIP-84: Migrating GET queued asset events for DAG to fastAPI
* fixing tests and server code
* fixing parameters
* fixing parameters
* AIP-84: Migrating GET one queued asset events for DAG to fastAPI
* adding annotation
* Update airflow/api_fastapi/core_api/routes/public/assets.py
Co-authored-by: Wei Lee <[email protected]>
* Update tests/api_fastapi/core_api/routes/public/test_assets.py
Co-authored-by: Wei Lee <[email protected]>
* Update airflow/api_fastapi/core_api/routes/public/assets.py
Co-authored-by: Pierre Jeambrun <[email protected]>
---------
Co-authored-by: Wei Lee <[email protected]>
Co-authored-by: Pierre Jeambrun <[email protected]>
---
airflow/api_connexion/endpoints/asset_endpoint.py | 1 +
.../api_fastapi/core_api/openapi/v1-generated.yaml | 59 ++++++++++++++
.../api_fastapi/core_api/routes/public/assets.py | 31 ++++++++
airflow/ui/openapi-gen/queries/common.ts | 24 ++++++
airflow/ui/openapi-gen/queries/prefetch.ts | 30 +++++++
airflow/ui/openapi-gen/queries/queries.ts | 36 +++++++++
airflow/ui/openapi-gen/queries/suspense.ts | 36 +++++++++
airflow/ui/openapi-gen/requests/services.gen.ts | 34 ++++++++
airflow/ui/openapi-gen/requests/types.gen.ts | 35 ++++++++
.../core_api/routes/public/test_assets.py | 93 +++++-----------------
10 files changed, 306 insertions(+), 73 deletions(-)
diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py
b/airflow/api_connexion/endpoints/asset_endpoint.py
index 986ad6bd617..e6ff30bcac0 100644
--- a/airflow/api_connexion/endpoints/asset_endpoint.py
+++ b/airflow/api_connexion/endpoints/asset_endpoint.py
@@ -181,6 +181,7 @@ def _generate_queued_event_where_clause(
return where_clause
+@mark_fastapi_migration_done
@security.requires_access_asset("GET")
@security.requires_access_dag("GET")
@provide_session
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 9b42d5c3a81..74703488fa3 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -546,6 +546,65 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /public/dags/{dag_id}/assets/queuedEvent/{uri}:
+ get:
+ tags:
+ - Asset
+ summary: Get Dag Asset Queued Event
+ description: Get a queued asset event for a DAG.
+ operationId: get_dag_asset_queued_event
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: uri
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Uri
+ - name: before
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Before
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/QueuedEventResponse'
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/public/assets/queuedEvent/{uri}:
delete:
tags:
diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py
b/airflow/api_fastapi/core_api/routes/public/assets.py
index a8a1afea32a..85f9efb690c 100644
--- a/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -254,6 +254,37 @@ def get_dag_asset_queued_events(
)
+@assets_router.get(
+ "/dags/{dag_id}/assets/queuedEvent/{uri:path}",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
+def get_dag_asset_queued_event(
+ dag_id: str,
+ uri: str,
+ session: Annotated[Session, Depends(get_session)],
+ before: OptionalDateTimeQuery = None,
+) -> QueuedEventResponse:
+ """Get a queued asset event for a DAG."""
+ where_clause = _generate_queued_event_where_clause(dag_id=dag_id, uri=uri,
before=before)
+ query = (
+ select(AssetDagRunQueue)
+ .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
+ .where(*where_clause)
+ )
+ adrq = session.scalar(query)
+ if not adrq:
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND,
+ f"Queued event with dag_id: `{dag_id}` and asset uri: `{uri}` was
not found",
+ )
+
+ return QueuedEventResponse(created_at=adrq.created_at,
dag_id=adrq.target_dag_id, uri=uri)
+
+
@assets_router.delete(
"/assets/queuedEvent/{uri:path}",
status_code=status.HTTP_204_NO_CONTENT,
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index 14377a81a96..1d954df4cc0 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -151,6 +151,30 @@ export const UseAssetServiceGetDagAssetQueuedEventsKeyFn =
(
useAssetServiceGetDagAssetQueuedEventsKey,
...(queryKey ?? [{ before, dagId }]),
];
+export type AssetServiceGetDagAssetQueuedEventDefaultResponse = Awaited<
+ ReturnType<typeof AssetService.getDagAssetQueuedEvent>
+>;
+export type AssetServiceGetDagAssetQueuedEventQueryResult<
+ TData = AssetServiceGetDagAssetQueuedEventDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useAssetServiceGetDagAssetQueuedEventKey =
+ "AssetServiceGetDagAssetQueuedEvent";
+export const UseAssetServiceGetDagAssetQueuedEventKeyFn = (
+ {
+ before,
+ dagId,
+ uri,
+ }: {
+ before?: string;
+ dagId: string;
+ uri: string;
+ },
+ queryKey?: Array<unknown>,
+) => [
+ useAssetServiceGetDagAssetQueuedEventKey,
+ ...(queryKey ?? [{ before, dagId, uri }]),
+];
export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited<
ReturnType<typeof DashboardService.historicalMetrics>
>;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index 0c522f36e43..3fef2a26d11 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -191,6 +191,36 @@ export const
prefetchUseAssetServiceGetDagAssetQueuedEvents = (
}),
queryFn: () => AssetService.getDagAssetQueuedEvents({ before, dagId }),
});
+/**
+ * Get Dag Asset Queued Event
+ * Get a queued asset event for a DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.uri
+ * @param data.before
+ * @returns QueuedEventResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseAssetServiceGetDagAssetQueuedEvent = (
+ queryClient: QueryClient,
+ {
+ before,
+ dagId,
+ uri,
+ }: {
+ before?: string;
+ dagId: string;
+ uri: string;
+ },
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn({
+ before,
+ dagId,
+ uri,
+ }),
+ queryFn: () => AssetService.getDagAssetQueuedEvent({ before, dagId, uri }),
+ });
/**
* Historical Metrics
* Return cluster activity historical metrics.
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index c461db254de..6b3e350c27c 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -246,6 +246,42 @@ export const useAssetServiceGetDagAssetQueuedEvents = <
AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData,
...options,
});
+/**
+ * Get Dag Asset Queued Event
+ * Get a queued asset event for a DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.uri
+ * @param data.before
+ * @returns QueuedEventResponse Successful Response
+ * @throws ApiError
+ */
+export const useAssetServiceGetDagAssetQueuedEvent = <
+ TData = Common.AssetServiceGetDagAssetQueuedEventDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ before,
+ dagId,
+ uri,
+ }: {
+ before?: string;
+ dagId: string;
+ uri: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn(
+ { before, dagId, uri },
+ queryKey,
+ ),
+ queryFn: () =>
+ AssetService.getDagAssetQueuedEvent({ before, dagId, uri }) as TData,
+ ...options,
+ });
/**
* Historical Metrics
* Return cluster activity historical metrics.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index 1b814222815..decdbdd3759 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -228,6 +228,42 @@ export const
useAssetServiceGetDagAssetQueuedEventsSuspense = <
AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData,
...options,
});
+/**
+ * Get Dag Asset Queued Event
+ * Get a queued asset event for a DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.uri
+ * @param data.before
+ * @returns QueuedEventResponse Successful Response
+ * @throws ApiError
+ */
+export const useAssetServiceGetDagAssetQueuedEventSuspense = <
+ TData = Common.AssetServiceGetDagAssetQueuedEventDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ before,
+ dagId,
+ uri,
+ }: {
+ before?: string;
+ dagId: string;
+ uri: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn(
+ { before, dagId, uri },
+ queryKey,
+ ),
+ queryFn: () =>
+ AssetService.getDagAssetQueuedEvent({ before, dagId, uri }) as TData,
+ ...options,
+ });
/**
* Historical Metrics
* Return cluster activity historical metrics.
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 2d75d9811b1..ed606e960cc 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -17,6 +17,8 @@ import type {
GetDagAssetQueuedEventsResponse,
DeleteDagAssetQueuedEventsData,
DeleteDagAssetQueuedEventsResponse,
+ GetDagAssetQueuedEventData,
+ GetDagAssetQueuedEventResponse,
DeleteAssetQueuedEventsData,
DeleteAssetQueuedEventsResponse,
HistoricalMetricsData,
@@ -341,6 +343,38 @@ export class AssetService {
});
}
+ /**
+ * Get Dag Asset Queued Event
+ * Get a queued asset event for a DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.uri
+ * @param data.before
+ * @returns QueuedEventResponse Successful Response
+ * @throws ApiError
+ */
+ public static getDagAssetQueuedEvent(
+ data: GetDagAssetQueuedEventData,
+ ): CancelablePromise<GetDagAssetQueuedEventResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/public/dags/{dag_id}/assets/queuedEvent/{uri}",
+ path: {
+ dag_id: data.dagId,
+ uri: data.uri,
+ },
+ query: {
+ before: data.before,
+ },
+ errors: {
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
+
/**
* Delete Asset Queued Events
* Delete queued asset events for an asset.
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 85a8a714087..e5960fc82f9 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1076,6 +1076,14 @@ export type DeleteDagAssetQueuedEventsData = {
export type DeleteDagAssetQueuedEventsResponse = void;
+export type GetDagAssetQueuedEventData = {
+ before?: string | null;
+ dagId: string;
+ uri: string;
+};
+
+export type GetDagAssetQueuedEventResponse = QueuedEventResponse;
+
export type DeleteAssetQueuedEventsData = {
before?: string | null;
uri: string;
@@ -1735,6 +1743,33 @@ export type $OpenApiTs = {
};
};
};
+ "/public/dags/{dag_id}/assets/queuedEvent/{uri}": {
+ get: {
+ req: GetDagAssetQueuedEventData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: QueuedEventResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
"/public/assets/queuedEvent/{uri}": {
delete: {
req: DeleteAssetQueuedEventsData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py
b/tests/api_fastapi/core_api/routes/public/test_assets.py
index 64a1c8a06d6..5d753134ac7 100644
--- a/tests/api_fastapi/core_api/routes/public/test_assets.py
+++ b/tests/api_fastapi/core_api/routes/public/test_assets.py
@@ -367,59 +367,6 @@ class TestGetAssetsEndpointPagination(TestAssets):
assert response.status_code == 200
assert len(response.json()["assets"]) == 100
- @pytest.mark.parametrize(
- "query_params, expected_detail",
- [
- (
- {"limit": 1, "offset": -1},
- [
- {
- "type": "greater_than_equal",
- "loc": ["query", "offset"],
- "msg": "Input should be greater than or equal to 0",
- "input": "-1",
- "ctx": {"ge": 0},
- }
- ],
- ),
- (
- {"limit": -1, "offset": 1},
- [
- {
- "type": "greater_than_equal",
- "loc": ["query", "limit"],
- "msg": "Input should be greater than or equal to 0",
- "input": "-1",
- "ctx": {"ge": 0},
- }
- ],
- ),
- (
- {"limit": -1, "offset": -1},
- [
- {
- "type": "greater_than_equal",
- "loc": ["query", "limit"],
- "msg": "Input should be greater than or equal to 0",
- "input": "-1",
- "ctx": {"ge": 0},
- },
- {
- "type": "greater_than_equal",
- "loc": ["query", "offset"],
- "msg": "Input should be greater than or equal to 0",
- "input": "-1",
- "ctx": {"ge": 0},
- },
- ],
- ),
- ],
- )
- def test_bad_limit_and_offset(self, test_client, query_params,
expected_detail):
- response = test_client.get("/public/assets", params=query_params)
- assert response.status_code == 422
- assert response.json()["detail"] == expected_detail
-
class TestGetAssetEvents(TestAssets):
def test_should_respond_200(self, test_client, session):
@@ -772,6 +719,26 @@ class TestPostAssetEvents(TestAssets):
assert response.status_code == 422
+ @pytest.mark.usefixtures("time_freezer")
+ @pytest.mark.enable_redact
+ def test_should_mask_sensitive_extra(self, test_client, session):
+ self.create_assets()
+ event_payload = {"uri": "s3://bucket/key/1", "extra": {"password":
"bar"}}
+ response = test_client.post("/public/assets/events",
json=event_payload)
+ assert response.status_code == 200
+ assert response.json() == {
+ "id": mock.ANY,
+ "asset_id": 1,
+ "uri": "s3://bucket/key/1",
+ "extra": {"password": "***", "from_rest_api": True},
+ "source_task_id": None,
+ "source_dag_id": None,
+ "source_run_id": None,
+ "source_map_index": -1,
+ "created_dagruns": [],
+ "timestamp": self.default_time.replace("+00:00", "Z"),
+ }
+
class TestDeleteAssetQueuedEvents(TestQueuedEventEndpoint):
@pytest.mark.usefixtures("time_freezer")
@@ -798,23 +765,3 @@ class TestDeleteAssetQueuedEvents(TestQueuedEventEndpoint):
assert response.status_code == 404
assert response.json()["detail"] == "Queue event with uri:
`not_exists` was not found"
-
- @pytest.mark.usefixtures("time_freezer")
- @pytest.mark.enable_redact
- def test_should_mask_sensitive_extra(self, test_client, session):
- self.create_assets()
- event_payload = {"uri": "s3://bucket/key/1", "extra": {"password":
"bar"}}
- response = test_client.post("/public/assets/events",
json=event_payload)
- assert response.status_code == 200
- assert response.json() == {
- "id": mock.ANY,
- "asset_id": 1,
- "uri": "s3://bucket/key/1",
- "extra": {"password": "***", "from_rest_api": True},
- "source_task_id": None,
- "source_dag_id": None,
- "source_run_id": None,
- "source_map_index": -1,
- "created_dagruns": [],
- "timestamp": self.default_time.replace("+00:00", "Z"),
- }