This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f9631ece1c AIP-84: Migrating DELETE a queued asset events for DAG to 
fastAPI (#44130)
8f9631ece1c is described below

commit 8f9631ece1c21a56fc3d66d9f2f7f68d6688592e
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Nov 19 12:42:05 2024 +0530

    AIP-84: Migrating DELETE a queued asset events for DAG to fastAPI (#44130)
    
    * AIP-84: Migrating GET queued asset events for DAG to fastAPI
    
    * fixing tests and server code
    
    * fixing parameters
    
    * fixing parameters
    
    * AIP-84: Migrating DELETE a queued asset events for DAG to fastAPI
    
    * Update airflow/api_fastapi/core_api/routes/public/assets.py
    
    Co-authored-by: Wei Lee <[email protected]>
    
    * Update tests/api_fastapi/core_api/routes/public/test_assets.py
    
    Co-authored-by: Wei Lee <[email protected]>
    
    ---------
    
    Co-authored-by: Wei Lee <[email protected]>
---
 airflow/api_connexion/endpoints/asset_endpoint.py  |  1 +
 .../api_fastapi/core_api/openapi/v1-generated.yaml | 60 ++++++++++++++++++++++
 .../api_fastapi/core_api/routes/public/assets.py   | 29 +++++++++++
 airflow/ui/openapi-gen/queries/common.ts           |  3 ++
 airflow/ui/openapi-gen/queries/queries.ts          | 47 +++++++++++++++++
 airflow/ui/openapi-gen/requests/services.gen.ts    | 35 +++++++++++++
 airflow/ui/openapi-gen/requests/types.gen.ts       | 37 +++++++++++++
 .../core_api/routes/public/test_assets.py          | 35 +++++++++++++
 8 files changed, 247 insertions(+)

diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py 
b/airflow/api_connexion/endpoints/asset_endpoint.py
index e6ff30bcac0..423e02d1b9c 100644
--- a/airflow/api_connexion/endpoints/asset_endpoint.py
+++ b/airflow/api_connexion/endpoints/asset_endpoint.py
@@ -204,6 +204,7 @@ def get_dag_asset_queued_event(
     return queued_event_schema.dump(queued_event)
 
 
+@mark_fastapi_migration_done
 @security.requires_access_asset("DELETE")
 @security.requires_access_dag("GET")
 @provide_session
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 74703488fa3..e23f83634c4 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -605,6 +605,66 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
+    delete:
+      tags:
+      - Asset
+      summary: Delete Dag Asset Queued Event
+      description: Delete a queued asset event for a DAG.
+      operationId: delete_dag_asset_queued_event
+      parameters:
+      - name: dag_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Id
+      - name: uri
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Uri
+      - name: before
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Before
+      responses:
+        '204':
+          description: Successful Response
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '400':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Bad Request
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
   /public/assets/queuedEvent/{uri}:
     delete:
       tags:
diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py 
b/airflow/api_fastapi/core_api/routes/public/assets.py
index 85f9efb690c..b94b825f762 100644
--- a/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -329,3 +329,32 @@ def delete_dag_asset_queued_events(
 
     if result.rowcount == 0:
         raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with 
dag_id: `{dag_id}` was not found")
+
+
+@assets_router.delete(
+    "/dags/{dag_id}/assets/queuedEvent/{uri:path}",
+    status_code=status.HTTP_204_NO_CONTENT,
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_400_BAD_REQUEST,
+            status.HTTP_404_NOT_FOUND,
+        ]
+    ),
+)
+def delete_dag_asset_queued_event(
+    dag_id: str,
+    uri: str,
+    session: Annotated[Session, Depends(get_session)],
+    before: OptionalDateTimeQuery = None,
+):
+    """Delete a queued asset event for a DAG."""
+    where_clause = _generate_queued_event_where_clause(dag_id=dag_id, 
before=before, uri=uri)
+    delete_statement = (
+        
delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch")
+    )
+    result = session.execute(delete_statement)
+    if result.rowcount == 0:
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND,
+            detail=f"Queued event with dag_id: `{dag_id}` and asset uri: 
`{uri}` was not found",
+        )
diff --git a/airflow/ui/openapi-gen/queries/common.ts 
b/airflow/ui/openapi-gen/queries/common.ts
index 1d954df4cc0..8b9af7821bf 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -1185,6 +1185,9 @@ export type VariableServicePatchVariableMutationResult = 
Awaited<
 export type AssetServiceDeleteDagAssetQueuedEventsMutationResult = Awaited<
   ReturnType<typeof AssetService.deleteDagAssetQueuedEvents>
 >;
+export type AssetServiceDeleteDagAssetQueuedEventMutationResult = Awaited<
+  ReturnType<typeof AssetService.deleteDagAssetQueuedEvent>
+>;
 export type AssetServiceDeleteAssetQueuedEventsMutationResult = Awaited<
   ReturnType<typeof AssetService.deleteAssetQueuedEvents>
 >;
diff --git a/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow/ui/openapi-gen/queries/queries.ts
index 6b3e350c27c..b5beed39cb5 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -2629,6 +2629,53 @@ export const useAssetServiceDeleteDagAssetQueuedEvents = 
<
       }) as unknown as Promise<TData>,
     ...options,
   });
+/**
+ * Delete Dag Asset Queued Event
+ * Delete a queued asset event for a DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.uri
+ * @param data.before
+ * @returns void Successful Response
+ * @throws ApiError
+ */
+export const useAssetServiceDeleteDagAssetQueuedEvent = <
+  TData = Common.AssetServiceDeleteDagAssetQueuedEventMutationResult,
+  TError = unknown,
+  TContext = unknown,
+>(
+  options?: Omit<
+    UseMutationOptions<
+      TData,
+      TError,
+      {
+        before?: string;
+        dagId: string;
+        uri: string;
+      },
+      TContext
+    >,
+    "mutationFn"
+  >,
+) =>
+  useMutation<
+    TData,
+    TError,
+    {
+      before?: string;
+      dagId: string;
+      uri: string;
+    },
+    TContext
+  >({
+    mutationFn: ({ before, dagId, uri }) =>
+      AssetService.deleteDagAssetQueuedEvent({
+        before,
+        dagId,
+        uri,
+      }) as unknown as Promise<TData>,
+    ...options,
+  });
 /**
  * Delete Asset Queued Events
  * Delete queued asset events for an asset.
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow/ui/openapi-gen/requests/services.gen.ts
index ed606e960cc..c36bdb8c92b 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -19,6 +19,8 @@ import type {
   DeleteDagAssetQueuedEventsResponse,
   GetDagAssetQueuedEventData,
   GetDagAssetQueuedEventResponse,
+  DeleteDagAssetQueuedEventData,
+  DeleteDagAssetQueuedEventResponse,
   DeleteAssetQueuedEventsData,
   DeleteAssetQueuedEventsResponse,
   HistoricalMetricsData,
@@ -375,6 +377,39 @@ export class AssetService {
     });
   }
 
+  /**
+   * Delete Dag Asset Queued Event
+   * Delete a queued asset event for a DAG.
+   * @param data The data for the request.
+   * @param data.dagId
+   * @param data.uri
+   * @param data.before
+   * @returns void Successful Response
+   * @throws ApiError
+   */
+  public static deleteDagAssetQueuedEvent(
+    data: DeleteDagAssetQueuedEventData,
+  ): CancelablePromise<DeleteDagAssetQueuedEventResponse> {
+    return __request(OpenAPI, {
+      method: "DELETE",
+      url: "/public/dags/{dag_id}/assets/queuedEvent/{uri}",
+      path: {
+        dag_id: data.dagId,
+        uri: data.uri,
+      },
+      query: {
+        before: data.before,
+      },
+      errors: {
+        400: "Bad Request",
+        401: "Unauthorized",
+        403: "Forbidden",
+        404: "Not Found",
+        422: "Validation Error",
+      },
+    });
+  }
+
   /**
    * Delete Asset Queued Events
    * Delete queued asset events for an asset.
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index e5960fc82f9..3611df15458 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1084,6 +1084,14 @@ export type GetDagAssetQueuedEventData = {
 
 export type GetDagAssetQueuedEventResponse = QueuedEventResponse;
 
+export type DeleteDagAssetQueuedEventData = {
+  before?: string | null;
+  dagId: string;
+  uri: string;
+};
+
+export type DeleteDagAssetQueuedEventResponse = void;
+
 export type DeleteAssetQueuedEventsData = {
   before?: string | null;
   uri: string;
@@ -1769,6 +1777,35 @@ export type $OpenApiTs = {
         422: HTTPValidationError;
       };
     };
+    delete: {
+      req: DeleteDagAssetQueuedEventData;
+      res: {
+        /**
+         * Successful Response
+         */
+        204: void;
+        /**
+         * Bad Request
+         */
+        400: HTTPExceptionResponse;
+        /**
+         * Unauthorized
+         */
+        401: HTTPExceptionResponse;
+        /**
+         * Forbidden
+         */
+        403: HTTPExceptionResponse;
+        /**
+         * Not Found
+         */
+        404: HTTPExceptionResponse;
+        /**
+         * Validation Error
+         */
+        422: HTTPValidationError;
+      };
+    };
   };
   "/public/assets/queuedEvent/{uri}": {
     delete: {
diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py 
b/tests/api_fastapi/core_api/routes/public/test_assets.py
index 5d753134ac7..3970d7e4670 100644
--- a/tests/api_fastapi/core_api/routes/public/test_assets.py
+++ b/tests/api_fastapi/core_api/routes/public/test_assets.py
@@ -765,3 +765,38 @@ class TestDeleteAssetQueuedEvents(TestQueuedEventEndpoint):
 
         assert response.status_code == 404
         assert response.json()["detail"] == "Queue event with uri: 
`not_exists` was not found"
+
+
+class TestDeleteDagAssetQueuedEvent(TestQueuedEventEndpoint):
+    def test_delete_should_respond_204(self, test_client, session, 
create_dummy_dag):
+        dag, _ = create_dummy_dag()
+        dag_id = dag.dag_id
+        asset_uri = "s3://bucket/key/1"
+        self.create_assets(session=session, num=1)
+        asset_id = 1
+
+        self._create_asset_dag_run_queues(dag_id, asset_id, session)
+        adrq = session.query(AssetDagRunQueue).all()
+        assert len(adrq) == 1
+
+        response = test_client.delete(
+            f"/public/dags/{dag_id}/assets/queuedEvent/{asset_uri}",
+        )
+
+        assert response.status_code == 204
+        adrq = session.query(AssetDagRunQueue).all()
+        assert len(adrq) == 0
+
+    def test_should_respond_404(self, test_client):
+        dag_id = "not_exists"
+        asset_uri = "not_exists"
+
+        response = test_client.delete(
+            f"/public/dags/{dag_id}/assets/queuedEvent/{asset_uri}",
+        )
+
+        assert response.status_code == 404
+        assert (
+            response.json()["detail"]
+            == "Queued event with dag_id: `not_exists` and asset uri: 
`not_exists` was not found"
+        )

Reply via email to