This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8f9631ece1c AIP-84: Migrating DELETE a queued asset events for DAG to
fastAPI (#44130)
8f9631ece1c is described below
commit 8f9631ece1c21a56fc3d66d9f2f7f68d6688592e
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Nov 19 12:42:05 2024 +0530
AIP-84: Migrating DELETE a queued asset events for DAG to fastAPI (#44130)
* AIP-84: Migrating GET queued asset events for DAG to fastAPI
* fixing tests and server code
* fixing parameters
* fixing parameters
* AIP-84: Migrating DELETE a queued asset events for DAG to fastAPI
* Update airflow/api_fastapi/core_api/routes/public/assets.py
Co-authored-by: Wei Lee <[email protected]>
* Update tests/api_fastapi/core_api/routes/public/test_assets.py
Co-authored-by: Wei Lee <[email protected]>
---------
Co-authored-by: Wei Lee <[email protected]>
---
airflow/api_connexion/endpoints/asset_endpoint.py | 1 +
.../api_fastapi/core_api/openapi/v1-generated.yaml | 60 ++++++++++++++++++++++
.../api_fastapi/core_api/routes/public/assets.py | 29 +++++++++++
airflow/ui/openapi-gen/queries/common.ts | 3 ++
airflow/ui/openapi-gen/queries/queries.ts | 47 +++++++++++++++++
airflow/ui/openapi-gen/requests/services.gen.ts | 35 +++++++++++++
airflow/ui/openapi-gen/requests/types.gen.ts | 37 +++++++++++++
.../core_api/routes/public/test_assets.py | 35 +++++++++++++
8 files changed, 247 insertions(+)
diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py
b/airflow/api_connexion/endpoints/asset_endpoint.py
index e6ff30bcac0..423e02d1b9c 100644
--- a/airflow/api_connexion/endpoints/asset_endpoint.py
+++ b/airflow/api_connexion/endpoints/asset_endpoint.py
@@ -204,6 +204,7 @@ def get_dag_asset_queued_event(
return queued_event_schema.dump(queued_event)
+@mark_fastapi_migration_done
@security.requires_access_asset("DELETE")
@security.requires_access_dag("GET")
@provide_session
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 74703488fa3..e23f83634c4 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -605,6 +605,66 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ delete:
+ tags:
+ - Asset
+ summary: Delete Dag Asset Queued Event
+ description: Delete a queued asset event for a DAG.
+ operationId: delete_dag_asset_queued_event
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: uri
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Uri
+ - name: before
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Before
+ responses:
+ '204':
+ description: Successful Response
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '400':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Bad Request
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/public/assets/queuedEvent/{uri}:
delete:
tags:
diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py
b/airflow/api_fastapi/core_api/routes/public/assets.py
index 85f9efb690c..b94b825f762 100644
--- a/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -329,3 +329,32 @@ def delete_dag_asset_queued_events(
if result.rowcount == 0:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with
dag_id: `{dag_id}` was not found")
+
+
+@assets_router.delete(
+ "/dags/{dag_id}/assets/queuedEvent/{uri:path}",
+ status_code=status.HTTP_204_NO_CONTENT,
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_400_BAD_REQUEST,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
+def delete_dag_asset_queued_event(
+ dag_id: str,
+ uri: str,
+ session: Annotated[Session, Depends(get_session)],
+ before: OptionalDateTimeQuery = None,
+):
+ """Delete a queued asset event for a DAG."""
+ where_clause = _generate_queued_event_where_clause(dag_id=dag_id,
before=before, uri=uri)
+ delete_statement = (
+
delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch")
+ )
+ result = session.execute(delete_statement)
+ if result.rowcount == 0:
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND,
+ detail=f"Queued event with dag_id: `{dag_id}` and asset uri:
`{uri}` was not found",
+ )
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index 1d954df4cc0..8b9af7821bf 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -1185,6 +1185,9 @@ export type VariableServicePatchVariableMutationResult =
Awaited<
export type AssetServiceDeleteDagAssetQueuedEventsMutationResult = Awaited<
ReturnType<typeof AssetService.deleteDagAssetQueuedEvents>
>;
+export type AssetServiceDeleteDagAssetQueuedEventMutationResult = Awaited<
+ ReturnType<typeof AssetService.deleteDagAssetQueuedEvent>
+>;
export type AssetServiceDeleteAssetQueuedEventsMutationResult = Awaited<
ReturnType<typeof AssetService.deleteAssetQueuedEvents>
>;
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index 6b3e350c27c..b5beed39cb5 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -2629,6 +2629,53 @@ export const useAssetServiceDeleteDagAssetQueuedEvents =
<
}) as unknown as Promise<TData>,
...options,
});
+/**
+ * Delete Dag Asset Queued Event
+ * Delete a queued asset event for a DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.uri
+ * @param data.before
+ * @returns void Successful Response
+ * @throws ApiError
+ */
+export const useAssetServiceDeleteDagAssetQueuedEvent = <
+ TData = Common.AssetServiceDeleteDagAssetQueuedEventMutationResult,
+ TError = unknown,
+ TContext = unknown,
+>(
+ options?: Omit<
+ UseMutationOptions<
+ TData,
+ TError,
+ {
+ before?: string;
+ dagId: string;
+ uri: string;
+ },
+ TContext
+ >,
+ "mutationFn"
+ >,
+) =>
+ useMutation<
+ TData,
+ TError,
+ {
+ before?: string;
+ dagId: string;
+ uri: string;
+ },
+ TContext
+ >({
+ mutationFn: ({ before, dagId, uri }) =>
+ AssetService.deleteDagAssetQueuedEvent({
+ before,
+ dagId,
+ uri,
+ }) as unknown as Promise<TData>,
+ ...options,
+ });
/**
* Delete Asset Queued Events
* Delete queued asset events for an asset.
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index ed606e960cc..c36bdb8c92b 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -19,6 +19,8 @@ import type {
DeleteDagAssetQueuedEventsResponse,
GetDagAssetQueuedEventData,
GetDagAssetQueuedEventResponse,
+ DeleteDagAssetQueuedEventData,
+ DeleteDagAssetQueuedEventResponse,
DeleteAssetQueuedEventsData,
DeleteAssetQueuedEventsResponse,
HistoricalMetricsData,
@@ -375,6 +377,39 @@ export class AssetService {
});
}
+ /**
+ * Delete Dag Asset Queued Event
+ * Delete a queued asset event for a DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.uri
+ * @param data.before
+ * @returns void Successful Response
+ * @throws ApiError
+ */
+ public static deleteDagAssetQueuedEvent(
+ data: DeleteDagAssetQueuedEventData,
+ ): CancelablePromise<DeleteDagAssetQueuedEventResponse> {
+ return __request(OpenAPI, {
+ method: "DELETE",
+ url: "/public/dags/{dag_id}/assets/queuedEvent/{uri}",
+ path: {
+ dag_id: data.dagId,
+ uri: data.uri,
+ },
+ query: {
+ before: data.before,
+ },
+ errors: {
+ 400: "Bad Request",
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
+
/**
* Delete Asset Queued Events
* Delete queued asset events for an asset.
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index e5960fc82f9..3611df15458 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1084,6 +1084,14 @@ export type GetDagAssetQueuedEventData = {
export type GetDagAssetQueuedEventResponse = QueuedEventResponse;
+export type DeleteDagAssetQueuedEventData = {
+ before?: string | null;
+ dagId: string;
+ uri: string;
+};
+
+export type DeleteDagAssetQueuedEventResponse = void;
+
export type DeleteAssetQueuedEventsData = {
before?: string | null;
uri: string;
@@ -1769,6 +1777,35 @@ export type $OpenApiTs = {
422: HTTPValidationError;
};
};
+ delete: {
+ req: DeleteDagAssetQueuedEventData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 204: void;
+ /**
+ * Bad Request
+ */
+ 400: HTTPExceptionResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
};
"/public/assets/queuedEvent/{uri}": {
delete: {
diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py
b/tests/api_fastapi/core_api/routes/public/test_assets.py
index 5d753134ac7..3970d7e4670 100644
--- a/tests/api_fastapi/core_api/routes/public/test_assets.py
+++ b/tests/api_fastapi/core_api/routes/public/test_assets.py
@@ -765,3 +765,38 @@ class TestDeleteAssetQueuedEvents(TestQueuedEventEndpoint):
assert response.status_code == 404
assert response.json()["detail"] == "Queue event with uri:
`not_exists` was not found"
+
+
+class TestDeleteDagAssetQueuedEvent(TestQueuedEventEndpoint):
+ def test_delete_should_respond_204(self, test_client, session,
create_dummy_dag):
+ dag, _ = create_dummy_dag()
+ dag_id = dag.dag_id
+ asset_uri = "s3://bucket/key/1"
+ self.create_assets(session=session, num=1)
+ asset_id = 1
+
+ self._create_asset_dag_run_queues(dag_id, asset_id, session)
+ adrq = session.query(AssetDagRunQueue).all()
+ assert len(adrq) == 1
+
+ response = test_client.delete(
+ f"/public/dags/{dag_id}/assets/queuedEvent/{asset_uri}",
+ )
+
+ assert response.status_code == 204
+ adrq = session.query(AssetDagRunQueue).all()
+ assert len(adrq) == 0
+
+ def test_should_respond_404(self, test_client):
+ dag_id = "not_exists"
+ asset_uri = "not_exists"
+
+ response = test_client.delete(
+ f"/public/dags/{dag_id}/assets/queuedEvent/{asset_uri}",
+ )
+
+ assert response.status_code == 404
+ assert (
+ response.json()["detail"]
+ == "Queued event with dag_id: `not_exists` and asset uri:
`not_exists` was not found"
+ )