This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new af958bbf923 AIP-84: Migrating GET one queued asset events for DAG to 
fastAPI (#44128)
af958bbf923 is described below

commit af958bbf923580e55f2bbf46b1cce3f75ce77000
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Nov 19 11:17:57 2024 +0530

    AIP-84: Migrating GET one queued asset events for DAG to fastAPI (#44128)
    
    * AIP-84: Migrating GET queued asset events for DAG to fastAPI
    
    * fixing tests and server code
    
    * fixing parameters
    
    * fixing parameters
    
    * AIP-84: Migrating GET one queued asset events for DAG to fastAPI
    
    * adding annotation
    
    * Update airflow/api_fastapi/core_api/routes/public/assets.py
    
    Co-authored-by: Wei Lee <[email protected]>
    
    * Update tests/api_fastapi/core_api/routes/public/test_assets.py
    
    Co-authored-by: Wei Lee <[email protected]>
    
    * Update airflow/api_fastapi/core_api/routes/public/assets.py
    
    Co-authored-by: Pierre Jeambrun <[email protected]>
    
    ---------
    
    Co-authored-by: Wei Lee <[email protected]>
    Co-authored-by: Pierre Jeambrun <[email protected]>
---
 airflow/api_connexion/endpoints/asset_endpoint.py  |  1 +
 .../api_fastapi/core_api/openapi/v1-generated.yaml | 59 ++++++++++++++
 .../api_fastapi/core_api/routes/public/assets.py   | 31 ++++++++
 airflow/ui/openapi-gen/queries/common.ts           | 24 ++++++
 airflow/ui/openapi-gen/queries/prefetch.ts         | 30 +++++++
 airflow/ui/openapi-gen/queries/queries.ts          | 36 +++++++++
 airflow/ui/openapi-gen/queries/suspense.ts         | 36 +++++++++
 airflow/ui/openapi-gen/requests/services.gen.ts    | 34 ++++++++
 airflow/ui/openapi-gen/requests/types.gen.ts       | 35 ++++++++
 .../core_api/routes/public/test_assets.py          | 93 +++++-----------------
 10 files changed, 306 insertions(+), 73 deletions(-)

diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py 
b/airflow/api_connexion/endpoints/asset_endpoint.py
index 986ad6bd617..e6ff30bcac0 100644
--- a/airflow/api_connexion/endpoints/asset_endpoint.py
+++ b/airflow/api_connexion/endpoints/asset_endpoint.py
@@ -181,6 +181,7 @@ def _generate_queued_event_where_clause(
     return where_clause
 
 
+@mark_fastapi_migration_done
 @security.requires_access_asset("GET")
 @security.requires_access_dag("GET")
 @provide_session
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 9b42d5c3a81..74703488fa3 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -546,6 +546,65 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
+  /public/dags/{dag_id}/assets/queuedEvent/{uri}:
+    get:
+      tags:
+      - Asset
+      summary: Get Dag Asset Queued Event
+      description: Get a queued asset event for a DAG.
+      operationId: get_dag_asset_queued_event
+      parameters:
+      - name: dag_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Id
+      - name: uri
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Uri
+      - name: before
+        in: query
+        required: false
+        schema:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Before
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/QueuedEventResponse'
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
   /public/assets/queuedEvent/{uri}:
     delete:
       tags:
diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py 
b/airflow/api_fastapi/core_api/routes/public/assets.py
index a8a1afea32a..85f9efb690c 100644
--- a/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -254,6 +254,37 @@ def get_dag_asset_queued_events(
     )
 
 
+@assets_router.get(
+    "/dags/{dag_id}/assets/queuedEvent/{uri:path}",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_404_NOT_FOUND,
+        ]
+    ),
+)
+def get_dag_asset_queued_event(
+    dag_id: str,
+    uri: str,
+    session: Annotated[Session, Depends(get_session)],
+    before: OptionalDateTimeQuery = None,
+) -> QueuedEventResponse:
+    """Get a queued asset event for a DAG."""
+    where_clause = _generate_queued_event_where_clause(dag_id=dag_id, uri=uri, 
before=before)
+    query = (
+        select(AssetDagRunQueue)
+        .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
+        .where(*where_clause)
+    )
+    adrq = session.scalar(query)
+    if not adrq:
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND,
+            f"Queued event with dag_id: `{dag_id}` and asset uri: `{uri}` was 
not found",
+        )
+
+    return QueuedEventResponse(created_at=adrq.created_at, 
dag_id=adrq.target_dag_id, uri=uri)
+
+
 @assets_router.delete(
     "/assets/queuedEvent/{uri:path}",
     status_code=status.HTTP_204_NO_CONTENT,
diff --git a/airflow/ui/openapi-gen/queries/common.ts 
b/airflow/ui/openapi-gen/queries/common.ts
index 14377a81a96..1d954df4cc0 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -151,6 +151,30 @@ export const UseAssetServiceGetDagAssetQueuedEventsKeyFn = 
(
   useAssetServiceGetDagAssetQueuedEventsKey,
   ...(queryKey ?? [{ before, dagId }]),
 ];
+export type AssetServiceGetDagAssetQueuedEventDefaultResponse = Awaited<
+  ReturnType<typeof AssetService.getDagAssetQueuedEvent>
+>;
+export type AssetServiceGetDagAssetQueuedEventQueryResult<
+  TData = AssetServiceGetDagAssetQueuedEventDefaultResponse,
+  TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useAssetServiceGetDagAssetQueuedEventKey =
+  "AssetServiceGetDagAssetQueuedEvent";
+export const UseAssetServiceGetDagAssetQueuedEventKeyFn = (
+  {
+    before,
+    dagId,
+    uri,
+  }: {
+    before?: string;
+    dagId: string;
+    uri: string;
+  },
+  queryKey?: Array<unknown>,
+) => [
+  useAssetServiceGetDagAssetQueuedEventKey,
+  ...(queryKey ?? [{ before, dagId, uri }]),
+];
 export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited<
   ReturnType<typeof DashboardService.historicalMetrics>
 >;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts 
b/airflow/ui/openapi-gen/queries/prefetch.ts
index 0c522f36e43..3fef2a26d11 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -191,6 +191,36 @@ export const 
prefetchUseAssetServiceGetDagAssetQueuedEvents = (
     }),
     queryFn: () => AssetService.getDagAssetQueuedEvents({ before, dagId }),
   });
+/**
+ * Get Dag Asset Queued Event
+ * Get a queued asset event for a DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.uri
+ * @param data.before
+ * @returns QueuedEventResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseAssetServiceGetDagAssetQueuedEvent = (
+  queryClient: QueryClient,
+  {
+    before,
+    dagId,
+    uri,
+  }: {
+    before?: string;
+    dagId: string;
+    uri: string;
+  },
+) =>
+  queryClient.prefetchQuery({
+    queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn({
+      before,
+      dagId,
+      uri,
+    }),
+    queryFn: () => AssetService.getDagAssetQueuedEvent({ before, dagId, uri }),
+  });
 /**
  * Historical Metrics
  * Return cluster activity historical metrics.
diff --git a/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow/ui/openapi-gen/queries/queries.ts
index c461db254de..6b3e350c27c 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -246,6 +246,42 @@ export const useAssetServiceGetDagAssetQueuedEvents = <
       AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData,
     ...options,
   });
+/**
+ * Get Dag Asset Queued Event
+ * Get a queued asset event for a DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.uri
+ * @param data.before
+ * @returns QueuedEventResponse Successful Response
+ * @throws ApiError
+ */
+export const useAssetServiceGetDagAssetQueuedEvent = <
+  TData = Common.AssetServiceGetDagAssetQueuedEventDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    before,
+    dagId,
+    uri,
+  }: {
+    before?: string;
+    dagId: string;
+    uri: string;
+  },
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useQuery<TData, TError>({
+    queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn(
+      { before, dagId, uri },
+      queryKey,
+    ),
+    queryFn: () =>
+      AssetService.getDagAssetQueuedEvent({ before, dagId, uri }) as TData,
+    ...options,
+  });
 /**
  * Historical Metrics
  * Return cluster activity historical metrics.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts 
b/airflow/ui/openapi-gen/queries/suspense.ts
index 1b814222815..decdbdd3759 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -228,6 +228,42 @@ export const 
useAssetServiceGetDagAssetQueuedEventsSuspense = <
       AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData,
     ...options,
   });
+/**
+ * Get Dag Asset Queued Event
+ * Get a queued asset event for a DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.uri
+ * @param data.before
+ * @returns QueuedEventResponse Successful Response
+ * @throws ApiError
+ */
+export const useAssetServiceGetDagAssetQueuedEventSuspense = <
+  TData = Common.AssetServiceGetDagAssetQueuedEventDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    before,
+    dagId,
+    uri,
+  }: {
+    before?: string;
+    dagId: string;
+    uri: string;
+  },
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useSuspenseQuery<TData, TError>({
+    queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn(
+      { before, dagId, uri },
+      queryKey,
+    ),
+    queryFn: () =>
+      AssetService.getDagAssetQueuedEvent({ before, dagId, uri }) as TData,
+    ...options,
+  });
 /**
  * Historical Metrics
  * Return cluster activity historical metrics.
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 2d75d9811b1..ed606e960cc 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -17,6 +17,8 @@ import type {
   GetDagAssetQueuedEventsResponse,
   DeleteDagAssetQueuedEventsData,
   DeleteDagAssetQueuedEventsResponse,
+  GetDagAssetQueuedEventData,
+  GetDagAssetQueuedEventResponse,
   DeleteAssetQueuedEventsData,
   DeleteAssetQueuedEventsResponse,
   HistoricalMetricsData,
@@ -341,6 +343,38 @@ export class AssetService {
     });
   }
 
+  /**
+   * Get Dag Asset Queued Event
+   * Get a queued asset event for a DAG.
+   * @param data The data for the request.
+   * @param data.dagId
+   * @param data.uri
+   * @param data.before
+   * @returns QueuedEventResponse Successful Response
+   * @throws ApiError
+   */
+  public static getDagAssetQueuedEvent(
+    data: GetDagAssetQueuedEventData,
+  ): CancelablePromise<GetDagAssetQueuedEventResponse> {
+    return __request(OpenAPI, {
+      method: "GET",
+      url: "/public/dags/{dag_id}/assets/queuedEvent/{uri}",
+      path: {
+        dag_id: data.dagId,
+        uri: data.uri,
+      },
+      query: {
+        before: data.before,
+      },
+      errors: {
+        401: "Unauthorized",
+        403: "Forbidden",
+        404: "Not Found",
+        422: "Validation Error",
+      },
+    });
+  }
+
   /**
    * Delete Asset Queued Events
    * Delete queued asset events for an asset.
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 85a8a714087..e5960fc82f9 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1076,6 +1076,14 @@ export type DeleteDagAssetQueuedEventsData = {
 
 export type DeleteDagAssetQueuedEventsResponse = void;
 
+export type GetDagAssetQueuedEventData = {
+  before?: string | null;
+  dagId: string;
+  uri: string;
+};
+
+export type GetDagAssetQueuedEventResponse = QueuedEventResponse;
+
 export type DeleteAssetQueuedEventsData = {
   before?: string | null;
   uri: string;
@@ -1735,6 +1743,33 @@ export type $OpenApiTs = {
       };
     };
   };
+  "/public/dags/{dag_id}/assets/queuedEvent/{uri}": {
+    get: {
+      req: GetDagAssetQueuedEventData;
+      res: {
+        /**
+         * Successful Response
+         */
+        200: QueuedEventResponse;
+        /**
+         * Unauthorized
+         */
+        401: HTTPExceptionResponse;
+        /**
+         * Forbidden
+         */
+        403: HTTPExceptionResponse;
+        /**
+         * Not Found
+         */
+        404: HTTPExceptionResponse;
+        /**
+         * Validation Error
+         */
+        422: HTTPValidationError;
+      };
+    };
+  };
   "/public/assets/queuedEvent/{uri}": {
     delete: {
       req: DeleteAssetQueuedEventsData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py 
b/tests/api_fastapi/core_api/routes/public/test_assets.py
index 64a1c8a06d6..5d753134ac7 100644
--- a/tests/api_fastapi/core_api/routes/public/test_assets.py
+++ b/tests/api_fastapi/core_api/routes/public/test_assets.py
@@ -367,59 +367,6 @@ class TestGetAssetsEndpointPagination(TestAssets):
         assert response.status_code == 200
         assert len(response.json()["assets"]) == 100
 
-    @pytest.mark.parametrize(
-        "query_params, expected_detail",
-        [
-            (
-                {"limit": 1, "offset": -1},
-                [
-                    {
-                        "type": "greater_than_equal",
-                        "loc": ["query", "offset"],
-                        "msg": "Input should be greater than or equal to 0",
-                        "input": "-1",
-                        "ctx": {"ge": 0},
-                    }
-                ],
-            ),
-            (
-                {"limit": -1, "offset": 1},
-                [
-                    {
-                        "type": "greater_than_equal",
-                        "loc": ["query", "limit"],
-                        "msg": "Input should be greater than or equal to 0",
-                        "input": "-1",
-                        "ctx": {"ge": 0},
-                    }
-                ],
-            ),
-            (
-                {"limit": -1, "offset": -1},
-                [
-                    {
-                        "type": "greater_than_equal",
-                        "loc": ["query", "limit"],
-                        "msg": "Input should be greater than or equal to 0",
-                        "input": "-1",
-                        "ctx": {"ge": 0},
-                    },
-                    {
-                        "type": "greater_than_equal",
-                        "loc": ["query", "offset"],
-                        "msg": "Input should be greater than or equal to 0",
-                        "input": "-1",
-                        "ctx": {"ge": 0},
-                    },
-                ],
-            ),
-        ],
-    )
-    def test_bad_limit_and_offset(self, test_client, query_params, 
expected_detail):
-        response = test_client.get("/public/assets", params=query_params)
-        assert response.status_code == 422
-        assert response.json()["detail"] == expected_detail
-
 
 class TestGetAssetEvents(TestAssets):
     def test_should_respond_200(self, test_client, session):
@@ -772,6 +719,26 @@ class TestPostAssetEvents(TestAssets):
 
         assert response.status_code == 422
 
+    @pytest.mark.usefixtures("time_freezer")
+    @pytest.mark.enable_redact
+    def test_should_mask_sensitive_extra(self, test_client, session):
+        self.create_assets()
+        event_payload = {"uri": "s3://bucket/key/1", "extra": {"password": 
"bar"}}
+        response = test_client.post("/public/assets/events", 
json=event_payload)
+        assert response.status_code == 200
+        assert response.json() == {
+            "id": mock.ANY,
+            "asset_id": 1,
+            "uri": "s3://bucket/key/1",
+            "extra": {"password": "***", "from_rest_api": True},
+            "source_task_id": None,
+            "source_dag_id": None,
+            "source_run_id": None,
+            "source_map_index": -1,
+            "created_dagruns": [],
+            "timestamp": self.default_time.replace("+00:00", "Z"),
+        }
+
 
 class TestDeleteAssetQueuedEvents(TestQueuedEventEndpoint):
     @pytest.mark.usefixtures("time_freezer")
@@ -798,23 +765,3 @@ class TestDeleteAssetQueuedEvents(TestQueuedEventEndpoint):
 
         assert response.status_code == 404
         assert response.json()["detail"] == "Queue event with uri: 
`not_exists` was not found"
-
-    @pytest.mark.usefixtures("time_freezer")
-    @pytest.mark.enable_redact
-    def test_should_mask_sensitive_extra(self, test_client, session):
-        self.create_assets()
-        event_payload = {"uri": "s3://bucket/key/1", "extra": {"password": 
"bar"}}
-        response = test_client.post("/public/assets/events", 
json=event_payload)
-        assert response.status_code == 200
-        assert response.json() == {
-            "id": mock.ANY,
-            "asset_id": 1,
-            "uri": "s3://bucket/key/1",
-            "extra": {"password": "***", "from_rest_api": True},
-            "source_task_id": None,
-            "source_dag_id": None,
-            "source_run_id": None,
-            "source_map_index": -1,
-            "created_dagruns": [],
-            "timestamp": self.default_time.replace("+00:00", "Z"),
-        }

Reply via email to