This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dcd41f60f1c Switch asset endpoints to use id instead of uri (#44801)
dcd41f60f1c is described below

commit dcd41f60f1c9b5583b49bfb49b6d85c640a2892c
Author: Tzu-ping Chung <uranu...@gmail.com>
AuthorDate: Tue Dec 10 23:13:08 2024 +0800

    Switch asset endpoints to use id instead of uri (#44801)
    
    * Switch asset endpoints to use id instead of uri
    
    With the addition of Asset.name, we can't guarantee the uri to be unique
    anymore. Using uri also presents addition issues on endpoints since it
    conflicts with subroutes.
    
    * Does not need annnotation in route format
---
 airflow/api_fastapi/core_api/datamodels/assets.py  |   5 +-
 .../api_fastapi/core_api/openapi/v1-generated.yaml |  56 +++++-----
 .../api_fastapi/core_api/routes/public/assets.py   | 121 ++++++++-------------
 airflow/ui/openapi-gen/queries/common.ts           |  18 +--
 airflow/ui/openapi-gen/queries/prefetch.ts         |  34 +++---
 airflow/ui/openapi-gen/queries/queries.ts          |  51 ++++-----
 airflow/ui/openapi-gen/queries/suspense.ts         |  31 +++---
 airflow/ui/openapi-gen/requests/schemas.gen.ts     |  23 ++--
 airflow/ui/openapi-gen/requests/services.gen.ts    |  30 ++---
 airflow/ui/openapi-gen/requests/types.gen.ts       |  21 ++--
 .../core_api/routes/public/test_assets.py          | 101 ++++++-----------
 .../core_api/routes/public/test_dag_run.py         |   3 +-
 12 files changed, 211 insertions(+), 283 deletions(-)

diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py 
b/airflow/api_fastapi/core_api/datamodels/assets.py
index 047f3f557bc..97211579985 100644
--- a/airflow/api_fastapi/core_api/datamodels/assets.py
+++ b/airflow/api_fastapi/core_api/datamodels/assets.py
@@ -102,7 +102,6 @@ class AssetEventResponse(BaseModel):
 
     id: int
     asset_id: int
-    uri: str
     extra: dict | None = None
     source_task_id: str | None = None
     source_dag_id: str | None = None
@@ -127,8 +126,8 @@ class AssetEventCollectionResponse(BaseModel):
 class QueuedEventResponse(BaseModel):
     """Queued Event serializer for responses.."""
 
-    uri: str
     dag_id: str
+    asset_id: int
     created_at: datetime
 
 
@@ -142,7 +141,7 @@ class QueuedEventCollectionResponse(BaseModel):
 class CreateAssetEventsBody(BaseModel):
     """Create asset events request."""
 
-    uri: str
+    asset_id: int
     extra: dict = Field(default_factory=dict)
 
     @field_validator("extra", mode="after")
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 3b6d55c9313..00f1c5aa510 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -594,7 +594,7 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
-  /public/assets/queuedEvents/{uri}:
+  /public/assets/{asset_id}/queuedEvents:
     get:
       tags:
       - Asset
@@ -602,12 +602,12 @@ paths:
       description: Get queued asset events for an asset.
       operationId: get_asset_queued_events
       parameters:
-      - name: uri
+      - name: asset_id
         in: path
         required: true
         schema:
-          type: string
-          title: Uri
+          type: integer
+          title: Asset Id
       - name: before
         in: query
         required: false
@@ -654,12 +654,12 @@ paths:
       description: Delete queued asset events for an asset.
       operationId: delete_asset_queued_events
       parameters:
-      - name: uri
+      - name: asset_id
         in: path
         required: true
         schema:
-          type: string
-          title: Uri
+          type: integer
+          title: Asset Id
       - name: before
         in: query
         required: false
@@ -695,7 +695,7 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
-  /public/assets/{uri}:
+  /public/assets/{asset_id}:
     get:
       tags:
       - Asset
@@ -703,12 +703,12 @@ paths:
       description: Get an asset.
       operationId: get_asset
       parameters:
-      - name: uri
+      - name: asset_id
         in: path
         required: true
         schema:
-          type: string
-          title: Uri
+          type: integer
+          title: Asset Id
       responses:
         '200':
           description: Successful Response
@@ -846,7 +846,7 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
-  /public/dags/{dag_id}/assets/queuedEvents/{uri}:
+  /public/dags/{dag_id}/assets/{asset_id}/queuedEvents:
     get:
       tags:
       - Asset
@@ -860,12 +860,12 @@ paths:
         schema:
           type: string
           title: Dag Id
-      - name: uri
+      - name: asset_id
         in: path
         required: true
         schema:
-          type: string
-          title: Uri
+          type: integer
+          title: Asset Id
       - name: before
         in: query
         required: false
@@ -918,12 +918,12 @@ paths:
         schema:
           type: string
           title: Dag Id
-      - name: uri
+      - name: asset_id
         in: path
         required: true
         schema:
-          type: string
-          title: Uri
+          type: integer
+          title: Asset Id
       - name: before
         in: query
         required: false
@@ -5982,9 +5982,6 @@ components:
         asset_id:
           type: integer
           title: Asset Id
-        uri:
-          type: string
-          title: Uri
         extra:
           anyOf:
           - type: object
@@ -6021,7 +6018,6 @@ components:
       required:
       - id
       - asset_id
-      - uri
       - source_map_index
       - created_dagruns
       - timestamp
@@ -6548,16 +6544,16 @@ components:
       description: Connection Test serializer for responses.
     CreateAssetEventsBody:
       properties:
-        uri:
-          type: string
-          title: Uri
+        asset_id:
+          type: integer
+          title: Asset Id
         extra:
           type: object
           title: Extra
       additionalProperties: false
       type: object
       required:
-      - uri
+      - asset_id
       title: CreateAssetEventsBody
       description: Create asset events request.
     DAGCollectionResponse:
@@ -8273,20 +8269,20 @@ components:
       description: Queued Event Collection serializer for responses.
     QueuedEventResponse:
       properties:
-        uri:
-          type: string
-          title: Uri
         dag_id:
           type: string
           title: Dag Id
+        asset_id:
+          type: integer
+          title: Asset Id
         created_at:
           type: string
           format: date-time
           title: Created At
       type: object
       required:
-      - uri
       - dag_id
+      - asset_id
       - created_at
       title: QueuedEventResponse
       description: Queued Event serializer for responses..
diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py 
b/airflow/api_fastapi/core_api/routes/public/assets.py
index 779c32f6a4c..c8cc9fb0f76 100644
--- a/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -59,20 +59,16 @@ assets_router = AirflowRouter(tags=["Asset"])
 
 def _generate_queued_event_where_clause(
     *,
+    asset_id: int | None = None,
     dag_id: str | None = None,
-    uri: str | None = None,
     before: datetime | str | None = None,
 ) -> list:
     """Get AssetDagRunQueue where clause."""
     where_clause = []
     if dag_id is not None:
         where_clause.append(AssetDagRunQueue.target_dag_id == dag_id)
-    if uri is not None:
-        where_clause.append(
-            AssetDagRunQueue.asset_id.in_(
-                select(AssetModel.id).where(AssetModel.uri == uri),
-            ),
-        )
+    if asset_id is not None:
+        where_clause.append(AssetDagRunQueue.asset_id == asset_id)
     if before is not None:
         where_clause.append(AssetDagRunQueue.created_at < before)
     return where_clause
@@ -227,9 +223,9 @@ def create_asset_event(
     session: SessionDep,
 ) -> AssetEventResponse:
     """Create asset events."""
-    asset_model = session.scalar(select(AssetModel).where(AssetModel.uri == 
body.uri).limit(1))
+    asset_model = session.scalar(select(AssetModel).where(AssetModel.id == 
body.asset_id).limit(1))
     if not asset_model:
-        raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with uri: 
`{body.uri}` was not found")
+        raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with ID: 
`{body.asset_id}` was not found")
     timestamp = timezone.utcnow()
 
     assets_event = asset_manager.register_asset_change(
@@ -240,41 +236,35 @@ def create_asset_event(
     )
 
     if not assets_event:
-        raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with uri: 
`{body.uri}` was not found")
-    return assets_event
+        raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with ID: 
`{body.asset_id}` was not found")
+    return AssetEventResponse.model_validate(assets_event)
 
 
 @assets_router.get(
-    "/assets/queuedEvents/{uri:path}",
-    responses=create_openapi_http_exception_doc(
-        [
-            status.HTTP_404_NOT_FOUND,
-        ]
-    ),
+    "/assets/{asset_id}/queuedEvents",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
 )
 def get_asset_queued_events(
-    uri: str,
+    asset_id: int,
     session: SessionDep,
     before: OptionalDateTimeQuery = None,
 ) -> QueuedEventCollectionResponse:
     """Get queued asset events for an asset."""
-    print(f"uri: {uri}")
-    where_clause = _generate_queued_event_where_clause(uri=uri, before=before)
-    query = (
-        select(AssetDagRunQueue, AssetModel.uri)
-        .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
-        .where(*where_clause)
-    )
+    where_clause = _generate_queued_event_where_clause(asset_id=asset_id, 
before=before)
+    query = select(AssetDagRunQueue).where(*where_clause)
 
     dag_asset_queued_events_select, total_entries = 
paginated_select(statement=query)
-    adrqs = session.execute(dag_asset_queued_events_select).all()
+    adrqs = session.scalars(dag_asset_queued_events_select).all()
 
     if not adrqs:
-        raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with uri: 
`{uri}` was not found")
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND,
+            f"Queue event with asset_id: `{asset_id}` was not found",
+        )
 
     queued_events = [
-        QueuedEventResponse(created_at=adrq.created_at, 
dag_id=adrq.target_dag_id, uri=uri)
-        for adrq, uri in adrqs
+        QueuedEventResponse(created_at=adrq.created_at, 
dag_id=adrq.target_dag_id, asset_id=adrq.asset_id)
+        for adrq in adrqs
     ]
 
     return QueuedEventCollectionResponse(
@@ -284,33 +274,29 @@ def get_asset_queued_events(
 
 
 @assets_router.get(
-    "/assets/{uri:path}",
+    "/assets/{asset_id}",
     responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
 )
 def get_asset(
-    uri: str,
+    asset_id: int,
     session: SessionDep,
 ) -> AssetResponse:
     """Get an asset."""
     asset = session.scalar(
         select(AssetModel)
-        .where(AssetModel.uri == uri)
+        .where(AssetModel.id == asset_id)
         .options(joinedload(AssetModel.consuming_dags), 
joinedload(AssetModel.producing_tasks))
     )
 
     if asset is None:
-        raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with uri: 
`{uri}` was not found")
+        raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with ID: 
`{asset_id}` was not found")
 
     return AssetResponse.model_validate(asset)
 
 
 @assets_router.get(
     "/dags/{dag_id}/assets/queuedEvents",
-    responses=create_openapi_http_exception_doc(
-        [
-            status.HTTP_404_NOT_FOUND,
-        ]
-    ),
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
 )
 def get_dag_asset_queued_events(
     dag_id: str,
@@ -319,20 +305,16 @@ def get_dag_asset_queued_events(
 ) -> QueuedEventCollectionResponse:
     """Get queued asset events for a DAG."""
     where_clause = _generate_queued_event_where_clause(dag_id=dag_id, 
before=before)
-    query = (
-        select(AssetDagRunQueue, AssetModel.uri)
-        .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
-        .where(*where_clause)
-    )
+    query = select(AssetDagRunQueue).where(*where_clause)
 
     dag_asset_queued_events_select, total_entries = 
paginated_select(statement=query)
-    adrqs = session.execute(dag_asset_queued_events_select).all()
+    adrqs = session.scalars(dag_asset_queued_events_select).all()
     if not adrqs:
         raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with 
dag_id: `{dag_id}` was not found")
 
     queued_events = [
-        QueuedEventResponse(created_at=adrq.created_at, 
dag_id=adrq.target_dag_id, uri=uri)
-        for adrq, uri in adrqs
+        QueuedEventResponse(created_at=adrq.created_at, 
dag_id=adrq.target_dag_id, asset_id=adrq.asset_id)
+        for adrq in adrqs
     ]
 
     return QueuedEventCollectionResponse(
@@ -342,56 +324,47 @@ def get_dag_asset_queued_events(
 
 
 @assets_router.get(
-    "/dags/{dag_id}/assets/queuedEvents/{uri:path}",
-    responses=create_openapi_http_exception_doc(
-        [
-            status.HTTP_404_NOT_FOUND,
-        ]
-    ),
+    "/dags/{dag_id}/assets/{asset_id}/queuedEvents",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
 )
 def get_dag_asset_queued_event(
     dag_id: str,
-    uri: str,
+    asset_id: int,
     session: SessionDep,
     before: OptionalDateTimeQuery = None,
 ) -> QueuedEventResponse:
     """Get a queued asset event for a DAG."""
-    where_clause = _generate_queued_event_where_clause(dag_id=dag_id, uri=uri, 
before=before)
-    query = (
-        select(AssetDagRunQueue)
-        .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
-        .where(*where_clause)
-    )
+    where_clause = _generate_queued_event_where_clause(dag_id=dag_id, 
asset_id=asset_id, before=before)
+    query = select(AssetDagRunQueue).where(*where_clause)
     adrq = session.scalar(query)
     if not adrq:
         raise HTTPException(
             status.HTTP_404_NOT_FOUND,
-            f"Queued event with dag_id: `{dag_id}` and asset uri: `{uri}` was 
not found",
+            f"Queued event with dag_id: `{dag_id}` and asset_id: `{asset_id}` 
was not found",
         )
 
-    return QueuedEventResponse(created_at=adrq.created_at, 
dag_id=adrq.target_dag_id, uri=uri)
+    return QueuedEventResponse(created_at=adrq.created_at, 
dag_id=adrq.target_dag_id, asset_id=asset_id)
 
 
 @assets_router.delete(
-    "/assets/queuedEvents/{uri:path}",
+    "/assets/{asset_id}/queuedEvents",
     status_code=status.HTTP_204_NO_CONTENT,
-    responses=create_openapi_http_exception_doc(
-        [
-            status.HTTP_404_NOT_FOUND,
-        ]
-    ),
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
 )
 def delete_asset_queued_events(
-    uri: str,
+    asset_id: int,
     session: SessionDep,
     before: OptionalDateTimeQuery = None,
 ):
     """Delete queued asset events for an asset."""
-    where_clause = _generate_queued_event_where_clause(uri=uri, before=before)
+    where_clause = _generate_queued_event_where_clause(asset_id=asset_id, 
before=before)
     delete_stmt = 
delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch")
     result = session.execute(delete_stmt)
     if result.rowcount == 0:
-        raise HTTPException(status.HTTP_404_NOT_FOUND, detail=f"Queue event 
with uri: `{uri}` was not found")
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND,
+            detail=f"Queue event with asset_id: `{asset_id}` was not found",
+        )
 
 
 @assets_router.delete(
@@ -419,7 +392,7 @@ def delete_dag_asset_queued_events(
 
 
 @assets_router.delete(
-    "/dags/{dag_id}/assets/queuedEvents/{uri:path}",
+    "/dags/{dag_id}/assets/{asset_id}/queuedEvents",
     status_code=status.HTTP_204_NO_CONTENT,
     responses=create_openapi_http_exception_doc(
         [
@@ -430,12 +403,12 @@ def delete_dag_asset_queued_events(
 )
 def delete_dag_asset_queued_event(
     dag_id: str,
-    uri: str,
+    asset_id: int,
     session: SessionDep,
     before: OptionalDateTimeQuery = None,
 ):
     """Delete a queued asset event for a DAG."""
-    where_clause = _generate_queued_event_where_clause(dag_id=dag_id, 
before=before, uri=uri)
+    where_clause = _generate_queued_event_where_clause(dag_id=dag_id, 
before=before, asset_id=asset_id)
     delete_statement = (
         
delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch")
     )
@@ -443,5 +416,5 @@ def delete_dag_asset_queued_event(
     if result.rowcount == 0:
         raise HTTPException(
             status.HTTP_404_NOT_FOUND,
-            detail=f"Queued event with dag_id: `{dag_id}` and asset uri: 
`{uri}` was not found",
+            detail=f"Queued event with dag_id: `{dag_id}` and asset_id: 
`{asset_id}` was not found",
         )
diff --git a/airflow/ui/openapi-gen/queries/common.ts 
b/airflow/ui/openapi-gen/queries/common.ts
index b759c0f2f23..ed1d6066bdc 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -174,16 +174,16 @@ export const useAssetServiceGetAssetQueuedEventsKey =
   "AssetServiceGetAssetQueuedEvents";
 export const UseAssetServiceGetAssetQueuedEventsKeyFn = (
   {
+    assetId,
     before,
-    uri,
   }: {
+    assetId: number;
     before?: string;
-    uri: string;
   },
   queryKey?: Array<unknown>,
 ) => [
   useAssetServiceGetAssetQueuedEventsKey,
-  ...(queryKey ?? [{ before, uri }]),
+  ...(queryKey ?? [{ assetId, before }]),
 ];
 export type AssetServiceGetAssetDefaultResponse = Awaited<
   ReturnType<typeof AssetService.getAsset>
@@ -195,12 +195,12 @@ export type AssetServiceGetAssetQueryResult<
 export const useAssetServiceGetAssetKey = "AssetServiceGetAsset";
 export const UseAssetServiceGetAssetKeyFn = (
   {
-    uri,
+    assetId,
   }: {
-    uri: string;
+    assetId: number;
   },
   queryKey?: Array<unknown>,
-) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ uri }])];
+) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ assetId }])];
 export type AssetServiceGetDagAssetQueuedEventsDefaultResponse = Awaited<
   ReturnType<typeof AssetService.getDagAssetQueuedEvents>
 >;
@@ -234,18 +234,18 @@ export const useAssetServiceGetDagAssetQueuedEventKey =
   "AssetServiceGetDagAssetQueuedEvent";
 export const UseAssetServiceGetDagAssetQueuedEventKeyFn = (
   {
+    assetId,
     before,
     dagId,
-    uri,
   }: {
+    assetId: number;
     before?: string;
     dagId: string;
-    uri: string;
   },
   queryKey?: Array<unknown>,
 ) => [
   useAssetServiceGetDagAssetQueuedEventKey,
-  ...(queryKey ?? [{ before, dagId, uri }]),
+  ...(queryKey ?? [{ assetId, before, dagId }]),
 ];
 export type ConfigServiceGetConfigsDefaultResponse = Awaited<
   ReturnType<typeof ConfigService.getConfigs>
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts 
b/airflow/ui/openapi-gen/queries/prefetch.ts
index eab8ea6b356..4bb01a7feaa 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -219,7 +219,7 @@ export const prefetchUseAssetServiceGetAssetEvents = (
  * Get Asset Queued Events
  * Get queued asset events for an asset.
  * @param data The data for the request.
- * @param data.uri
+ * @param data.assetId
  * @param data.before
  * @returns QueuedEventCollectionResponse Successful Response
  * @throws ApiError
@@ -227,36 +227,39 @@ export const prefetchUseAssetServiceGetAssetEvents = (
 export const prefetchUseAssetServiceGetAssetQueuedEvents = (
   queryClient: QueryClient,
   {
+    assetId,
     before,
-    uri,
   }: {
+    assetId: number;
     before?: string;
-    uri: string;
   },
 ) =>
   queryClient.prefetchQuery({
-    queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn({ before, uri }),
-    queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }),
+    queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn({
+      assetId,
+      before,
+    }),
+    queryFn: () => AssetService.getAssetQueuedEvents({ assetId, before }),
   });
 /**
  * Get Asset
  * Get an asset.
  * @param data The data for the request.
- * @param data.uri
+ * @param data.assetId
  * @returns AssetResponse Successful Response
  * @throws ApiError
  */
 export const prefetchUseAssetServiceGetAsset = (
   queryClient: QueryClient,
   {
-    uri,
+    assetId,
   }: {
-    uri: string;
+    assetId: number;
   },
 ) =>
   queryClient.prefetchQuery({
-    queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }),
-    queryFn: () => AssetService.getAsset({ uri }),
+    queryKey: Common.UseAssetServiceGetAssetKeyFn({ assetId }),
+    queryFn: () => AssetService.getAsset({ assetId }),
   });
 /**
  * Get Dag Asset Queued Events
@@ -289,7 +292,7 @@ export const prefetchUseAssetServiceGetDagAssetQueuedEvents 
= (
  * Get a queued asset event for a DAG.
  * @param data The data for the request.
  * @param data.dagId
- * @param data.uri
+ * @param data.assetId
  * @param data.before
  * @returns QueuedEventResponse Successful Response
  * @throws ApiError
@@ -297,22 +300,23 @@ export const 
prefetchUseAssetServiceGetDagAssetQueuedEvents = (
 export const prefetchUseAssetServiceGetDagAssetQueuedEvent = (
   queryClient: QueryClient,
   {
+    assetId,
     before,
     dagId,
-    uri,
   }: {
+    assetId: number;
     before?: string;
     dagId: string;
-    uri: string;
   },
 ) =>
   queryClient.prefetchQuery({
     queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn({
+      assetId,
       before,
       dagId,
-      uri,
     }),
-    queryFn: () => AssetService.getDagAssetQueuedEvent({ before, dagId, uri }),
+    queryFn: () =>
+      AssetService.getDagAssetQueuedEvent({ assetId, before, dagId }),
   });
 /**
  * Get Configs
diff --git a/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow/ui/openapi-gen/queries/queries.ts
index e90ae58e6b0..f8c8bcd7a58 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -279,7 +279,7 @@ export const useAssetServiceGetAssetEvents = <
  * Get Asset Queued Events
  * Get queued asset events for an asset.
  * @param data The data for the request.
- * @param data.uri
+ * @param data.assetId
  * @param data.before
  * @returns QueuedEventCollectionResponse Successful Response
  * @throws ApiError
@@ -290,28 +290,29 @@ export const useAssetServiceGetAssetQueuedEvents = <
   TQueryKey extends Array<unknown> = unknown[],
 >(
   {
+    assetId,
     before,
-    uri,
   }: {
+    assetId: number;
     before?: string;
-    uri: string;
   },
   queryKey?: TQueryKey,
   options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
 ) =>
   useQuery<TData, TError>({
     queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn(
-      { before, uri },
+      { assetId, before },
       queryKey,
     ),
-    queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }) as TData,
+    queryFn: () =>
+      AssetService.getAssetQueuedEvents({ assetId, before }) as TData,
     ...options,
   });
 /**
  * Get Asset
  * Get an asset.
  * @param data The data for the request.
- * @param data.uri
+ * @param data.assetId
  * @returns AssetResponse Successful Response
  * @throws ApiError
  */
@@ -321,16 +322,16 @@ export const useAssetServiceGetAsset = <
   TQueryKey extends Array<unknown> = unknown[],
 >(
   {
-    uri,
+    assetId,
   }: {
-    uri: string;
+    assetId: number;
   },
   queryKey?: TQueryKey,
   options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
 ) =>
   useQuery<TData, TError>({
-    queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }, queryKey),
-    queryFn: () => AssetService.getAsset({ uri }) as TData,
+    queryKey: Common.UseAssetServiceGetAssetKeyFn({ assetId }, queryKey),
+    queryFn: () => AssetService.getAsset({ assetId }) as TData,
     ...options,
   });
 /**
@@ -371,7 +372,7 @@ export const useAssetServiceGetDagAssetQueuedEvents = <
  * Get a queued asset event for a DAG.
  * @param data The data for the request.
  * @param data.dagId
- * @param data.uri
+ * @param data.assetId
  * @param data.before
  * @returns QueuedEventResponse Successful Response
  * @throws ApiError
@@ -382,24 +383,24 @@ export const useAssetServiceGetDagAssetQueuedEvent = <
   TQueryKey extends Array<unknown> = unknown[],
 >(
   {
+    assetId,
     before,
     dagId,
-    uri,
   }: {
+    assetId: number;
     before?: string;
     dagId: string;
-    uri: string;
   },
   queryKey?: TQueryKey,
   options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
 ) =>
   useQuery<TData, TError>({
     queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn(
-      { before, dagId, uri },
+      { assetId, before, dagId },
       queryKey,
     ),
     queryFn: () =>
-      AssetService.getDagAssetQueuedEvent({ before, dagId, uri }) as TData,
+      AssetService.getDagAssetQueuedEvent({ assetId, before, dagId }) as TData,
     ...options,
   });
 /**
@@ -3801,7 +3802,7 @@ export const useVariableServicePatchVariable = <
  * Delete Asset Queued Events
  * Delete queued asset events for an asset.
  * @param data The data for the request.
- * @param data.uri
+ * @param data.assetId
  * @param data.before
  * @returns void Successful Response
  * @throws ApiError
@@ -3816,8 +3817,8 @@ export const useAssetServiceDeleteAssetQueuedEvents = <
       TData,
       TError,
       {
+        assetId: number;
         before?: string;
-        uri: string;
       },
       TContext
     >,
@@ -3828,15 +3829,15 @@ export const useAssetServiceDeleteAssetQueuedEvents = <
     TData,
     TError,
     {
+      assetId: number;
       before?: string;
-      uri: string;
     },
     TContext
   >({
-    mutationFn: ({ before, uri }) =>
+    mutationFn: ({ assetId, before }) =>
       AssetService.deleteAssetQueuedEvents({
+        assetId,
         before,
-        uri,
       }) as unknown as Promise<TData>,
     ...options,
   });
@@ -3887,7 +3888,7 @@ export const useAssetServiceDeleteDagAssetQueuedEvents = <
  * Delete a queued asset event for a DAG.
  * @param data The data for the request.
  * @param data.dagId
- * @param data.uri
+ * @param data.assetId
  * @param data.before
  * @returns void Successful Response
  * @throws ApiError
@@ -3902,9 +3903,9 @@ export const useAssetServiceDeleteDagAssetQueuedEvent = <
       TData,
       TError,
       {
+        assetId: number;
         before?: string;
         dagId: string;
-        uri: string;
       },
       TContext
     >,
@@ -3915,17 +3916,17 @@ export const useAssetServiceDeleteDagAssetQueuedEvent = 
<
     TData,
     TError,
     {
+      assetId: number;
       before?: string;
       dagId: string;
-      uri: string;
     },
     TContext
   >({
-    mutationFn: ({ before, dagId, uri }) =>
+    mutationFn: ({ assetId, before, dagId }) =>
       AssetService.deleteDagAssetQueuedEvent({
+        assetId,
         before,
         dagId,
-        uri,
       }) as unknown as Promise<TData>,
     ...options,
   });
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts 
b/airflow/ui/openapi-gen/queries/suspense.ts
index e7b21338760..cd6d7c5a798 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -254,7 +254,7 @@ export const useAssetServiceGetAssetEventsSuspense = <
  * Get Asset Queued Events
  * Get queued asset events for an asset.
  * @param data The data for the request.
- * @param data.uri
+ * @param data.assetId
  * @param data.before
  * @returns QueuedEventCollectionResponse Successful Response
  * @throws ApiError
@@ -265,28 +265,29 @@ export const useAssetServiceGetAssetQueuedEventsSuspense 
= <
   TQueryKey extends Array<unknown> = unknown[],
 >(
   {
+    assetId,
     before,
-    uri,
   }: {
+    assetId: number;
     before?: string;
-    uri: string;
   },
   queryKey?: TQueryKey,
   options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
 ) =>
   useSuspenseQuery<TData, TError>({
     queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn(
-      { before, uri },
+      { assetId, before },
       queryKey,
     ),
-    queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }) as TData,
+    queryFn: () =>
+      AssetService.getAssetQueuedEvents({ assetId, before }) as TData,
     ...options,
   });
 /**
  * Get Asset
  * Get an asset.
  * @param data The data for the request.
- * @param data.uri
+ * @param data.assetId
  * @returns AssetResponse Successful Response
  * @throws ApiError
  */
@@ -296,16 +297,16 @@ export const useAssetServiceGetAssetSuspense = <
   TQueryKey extends Array<unknown> = unknown[],
 >(
   {
-    uri,
+    assetId,
   }: {
-    uri: string;
+    assetId: number;
   },
   queryKey?: TQueryKey,
   options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
 ) =>
   useSuspenseQuery<TData, TError>({
-    queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }, queryKey),
-    queryFn: () => AssetService.getAsset({ uri }) as TData,
+    queryKey: Common.UseAssetServiceGetAssetKeyFn({ assetId }, queryKey),
+    queryFn: () => AssetService.getAsset({ assetId }) as TData,
     ...options,
   });
 /**
@@ -346,7 +347,7 @@ export const useAssetServiceGetDagAssetQueuedEventsSuspense 
= <
  * Get a queued asset event for a DAG.
  * @param data The data for the request.
  * @param data.dagId
- * @param data.uri
+ * @param data.assetId
  * @param data.before
  * @returns QueuedEventResponse Successful Response
  * @throws ApiError
@@ -357,24 +358,24 @@ export const 
useAssetServiceGetDagAssetQueuedEventSuspense = <
   TQueryKey extends Array<unknown> = unknown[],
 >(
   {
+    assetId,
     before,
     dagId,
-    uri,
   }: {
+    assetId: number;
     before?: string;
     dagId: string;
-    uri: string;
   },
   queryKey?: TQueryKey,
   options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
 ) =>
   useSuspenseQuery<TData, TError>({
     queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn(
-      { before, dagId, uri },
+      { assetId, before, dagId },
       queryKey,
     ),
     queryFn: () =>
-      AssetService.getDagAssetQueuedEvent({ before, dagId, uri }) as TData,
+      AssetService.getDagAssetQueuedEvent({ assetId, before, dagId }) as TData,
     ...options,
   });
 /**
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 622ee94d697..965355b0c19 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -180,10 +180,6 @@ export const $AssetEventResponse = {
       type: "integer",
       title: "Asset Id",
     },
-    uri: {
-      type: "string",
-      title: "Uri",
-    },
     extra: {
       anyOf: [
         {
@@ -249,7 +245,6 @@ export const $AssetEventResponse = {
   required: [
     "id",
     "asset_id",
-    "uri",
     "source_map_index",
     "created_dagruns",
     "timestamp",
@@ -1025,9 +1020,9 @@ export const $ConnectionTestResponse = {
 
 export const $CreateAssetEventsBody = {
   properties: {
-    uri: {
-      type: "string",
-      title: "Uri",
+    asset_id: {
+      type: "integer",
+      title: "Asset Id",
     },
     extra: {
       type: "object",
@@ -1036,7 +1031,7 @@ export const $CreateAssetEventsBody = {
   },
   additionalProperties: false,
   type: "object",
-  required: ["uri"],
+  required: ["asset_id"],
   title: "CreateAssetEventsBody",
   description: "Create asset events request.",
 } as const;
@@ -3640,14 +3635,14 @@ export const $QueuedEventCollectionResponse = {
 
 export const $QueuedEventResponse = {
   properties: {
-    uri: {
-      type: "string",
-      title: "Uri",
-    },
     dag_id: {
       type: "string",
       title: "Dag Id",
     },
+    asset_id: {
+      type: "integer",
+      title: "Asset Id",
+    },
     created_at: {
       type: "string",
       format: "date-time",
@@ -3655,7 +3650,7 @@ export const $QueuedEventResponse = {
     },
   },
   type: "object",
-  required: ["uri", "dag_id", "created_at"],
+  required: ["dag_id", "asset_id", "created_at"],
   title: "QueuedEventResponse",
   description: "Queued Event serializer for responses..",
 } as const;
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow/ui/openapi-gen/requests/services.gen.ts
index c855ca58092..f0446ae0fd8 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -370,7 +370,7 @@ export class AssetService {
    * Get Asset Queued Events
    * Get queued asset events for an asset.
    * @param data The data for the request.
-   * @param data.uri
+   * @param data.assetId
    * @param data.before
    * @returns QueuedEventCollectionResponse Successful Response
    * @throws ApiError
@@ -380,9 +380,9 @@ export class AssetService {
   ): CancelablePromise<GetAssetQueuedEventsResponse> {
     return __request(OpenAPI, {
       method: "GET",
-      url: "/public/assets/queuedEvents/{uri}",
+      url: "/public/assets/{asset_id}/queuedEvents",
       path: {
-        uri: data.uri,
+        asset_id: data.assetId,
       },
       query: {
         before: data.before,
@@ -400,7 +400,7 @@ export class AssetService {
    * Delete Asset Queued Events
    * Delete queued asset events for an asset.
    * @param data The data for the request.
-   * @param data.uri
+   * @param data.assetId
    * @param data.before
    * @returns void Successful Response
    * @throws ApiError
@@ -410,9 +410,9 @@ export class AssetService {
   ): CancelablePromise<DeleteAssetQueuedEventsResponse> {
     return __request(OpenAPI, {
       method: "DELETE",
-      url: "/public/assets/queuedEvents/{uri}",
+      url: "/public/assets/{asset_id}/queuedEvents",
       path: {
-        uri: data.uri,
+        asset_id: data.assetId,
       },
       query: {
         before: data.before,
@@ -430,7 +430,7 @@ export class AssetService {
    * Get Asset
    * Get an asset.
    * @param data The data for the request.
-   * @param data.uri
+   * @param data.assetId
    * @returns AssetResponse Successful Response
    * @throws ApiError
    */
@@ -439,9 +439,9 @@ export class AssetService {
   ): CancelablePromise<GetAssetResponse> {
     return __request(OpenAPI, {
       method: "GET",
-      url: "/public/assets/{uri}",
+      url: "/public/assets/{asset_id}",
       path: {
-        uri: data.uri,
+        asset_id: data.assetId,
       },
       errors: {
         401: "Unauthorized",
@@ -517,7 +517,7 @@ export class AssetService {
    * Get a queued asset event for a DAG.
    * @param data The data for the request.
    * @param data.dagId
-   * @param data.uri
+   * @param data.assetId
    * @param data.before
    * @returns QueuedEventResponse Successful Response
    * @throws ApiError
@@ -527,10 +527,10 @@ export class AssetService {
   ): CancelablePromise<GetDagAssetQueuedEventResponse> {
     return __request(OpenAPI, {
       method: "GET",
-      url: "/public/dags/{dag_id}/assets/queuedEvents/{uri}",
+      url: "/public/dags/{dag_id}/assets/{asset_id}/queuedEvents",
       path: {
         dag_id: data.dagId,
-        uri: data.uri,
+        asset_id: data.assetId,
       },
       query: {
         before: data.before,
@@ -549,7 +549,7 @@ export class AssetService {
    * Delete a queued asset event for a DAG.
    * @param data The data for the request.
    * @param data.dagId
-   * @param data.uri
+   * @param data.assetId
    * @param data.before
    * @returns void Successful Response
    * @throws ApiError
@@ -559,10 +559,10 @@ export class AssetService {
   ): CancelablePromise<DeleteDagAssetQueuedEventResponse> {
     return __request(OpenAPI, {
       method: "DELETE",
-      url: "/public/dags/{dag_id}/assets/queuedEvents/{uri}",
+      url: "/public/dags/{dag_id}/assets/{asset_id}/queuedEvents",
       path: {
         dag_id: data.dagId,
-        uri: data.uri,
+        asset_id: data.assetId,
       },
       query: {
         before: data.before,
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 96a159bad07..88f6a2fc763 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -60,7 +60,6 @@ export type AssetEventCollectionResponse = {
 export type AssetEventResponse = {
   id: number;
   asset_id: number;
-  uri: string;
   extra?: {
     [key: string]: unknown;
   } | null;
@@ -265,7 +264,7 @@ export type ConnectionTestResponse = {
  * Create asset events request.
  */
 export type CreateAssetEventsBody = {
-  uri: string;
+  asset_id: number;
   extra?: {
     [key: string]: unknown;
   };
@@ -899,8 +898,8 @@ export type QueuedEventCollectionResponse = {
  * Queued Event serializer for responses..
  */
 export type QueuedEventResponse = {
-  uri: string;
   dag_id: string;
+  asset_id: number;
   created_at: string;
 };
 
@@ -1353,21 +1352,21 @@ export type CreateAssetEventData = {
 export type CreateAssetEventResponse = AssetEventResponse;
 
 export type GetAssetQueuedEventsData = {
+  assetId: number;
   before?: string | null;
-  uri: string;
 };
 
 export type GetAssetQueuedEventsResponse = QueuedEventCollectionResponse;
 
 export type DeleteAssetQueuedEventsData = {
+  assetId: number;
   before?: string | null;
-  uri: string;
 };
 
 export type DeleteAssetQueuedEventsResponse = void;
 
 export type GetAssetData = {
-  uri: string;
+  assetId: number;
 };
 
 export type GetAssetResponse = AssetResponse;
@@ -1387,17 +1386,17 @@ export type DeleteDagAssetQueuedEventsData = {
 export type DeleteDagAssetQueuedEventsResponse = void;
 
 export type GetDagAssetQueuedEventData = {
+  assetId: number;
   before?: string | null;
   dagId: string;
-  uri: string;
 };
 
 export type GetDagAssetQueuedEventResponse = QueuedEventResponse;
 
 export type DeleteDagAssetQueuedEventData = {
+  assetId: number;
   before?: string | null;
   dagId: string;
-  uri: string;
 };
 
 export type DeleteDagAssetQueuedEventResponse = void;
@@ -2226,7 +2225,7 @@ export type $OpenApiTs = {
       };
     };
   };
-  "/public/assets/queuedEvents/{uri}": {
+  "/public/assets/{asset_id}/queuedEvents": {
     get: {
       req: GetAssetQueuedEventsData;
       res: {
@@ -2278,7 +2277,7 @@ export type $OpenApiTs = {
       };
     };
   };
-  "/public/assets/{uri}": {
+  "/public/assets/{asset_id}": {
     get: {
       req: GetAssetData;
       res: {
@@ -2361,7 +2360,7 @@ export type $OpenApiTs = {
       };
     };
   };
-  "/public/dags/{dag_id}/assets/queuedEvents/{uri}": {
+  "/public/dags/{dag_id}/assets/{asset_id}/queuedEvents": {
     get: {
       req: GetDagAssetQueuedEventData;
       res: {
diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py 
b/tests/api_fastapi/core_api/routes/public/test_assets.py
index c7c2c2405d2..46c769640a8 100644
--- a/tests/api_fastapi/core_api/routes/public/test_assets.py
+++ b/tests/api_fastapi/core_api/routes/public/test_assets.py
@@ -16,7 +16,6 @@
 # under the License.
 from __future__ import annotations
 
-import urllib
 from collections.abc import Generator
 from datetime import datetime
 from unittest import mock
@@ -543,7 +542,6 @@ class TestGetAssetEvents(TestAssets):
                 {
                     "id": 1,
                     "asset_id": 1,
-                    "uri": "s3://bucket/key/1",
                     "extra": {"foo": "bar"},
                     "source_task_id": "source_task_id",
                     "source_dag_id": "source_dag_id",
@@ -566,7 +564,6 @@ class TestGetAssetEvents(TestAssets):
                 {
                     "id": 2,
                     "asset_id": 2,
-                    "uri": "s3://bucket/key/2",
                     "extra": {"foo": "bar"},
                     "source_task_id": "source_task_id",
                     "source_dag_id": "source_dag_id",
@@ -618,17 +615,17 @@ class TestGetAssetEvents(TestAssets):
         assert response.json()["detail"] == msg
 
     @pytest.mark.parametrize(
-        "params, expected_asset_uris",
+        "params, expected_asset_ids",
         [
             # Limit test data
-            ({"limit": "1"}, ["s3://bucket/key/1"]),
-            ({"limit": "100"}, [f"s3://bucket/key/{i}" for i in range(1, 
101)]),
+            ({"limit": "1"}, [1]),
+            ({"limit": "100"}, list(range(1, 101))),
             # Offset test data
-            ({"offset": "1"}, [f"s3://bucket/key/{i}" for i in range(2, 102)]),
-            ({"offset": "3"}, [f"s3://bucket/key/{i}" for i in range(4, 104)]),
+            ({"offset": "1"}, list(range(2, 102))),
+            ({"offset": "3"}, list(range(4, 104))),
         ],
     )
-    def test_limit_and_offset(self, test_client, params, expected_asset_uris):
+    def test_limit_and_offset(self, test_client, params, expected_asset_ids):
         self.create_assets(num=110)
         self.create_assets_events(num=110)
         self.create_dag_run(num=110)
@@ -637,8 +634,8 @@ class TestGetAssetEvents(TestAssets):
         response = test_client.get("/public/assets/events", params=params)
 
         assert response.status_code == 200
-        asset_uris = [asset["uri"] for asset in 
response.json()["asset_events"]]
-        assert asset_uris == expected_asset_uris
+        asset_ids = [asset["id"] for asset in response.json()["asset_events"]]
+        assert asset_ids == expected_asset_ids
 
     @pytest.mark.usefixtures("time_freezer")
     @pytest.mark.enable_redact
@@ -655,7 +652,6 @@ class TestGetAssetEvents(TestAssets):
                 {
                     "id": 1,
                     "asset_id": 1,
-                    "uri": "s3://bucket/key/1",
                     "extra": {"password": "***"},
                     "source_task_id": "source_task_id",
                     "source_dag_id": "source_dag_id",
@@ -678,7 +674,6 @@ class TestGetAssetEvents(TestAssets):
                 {
                     "id": 2,
                     "asset_id": 2,
-                    "uri": "s3://bucket/key/2",
                     "extra": {"password": "***"},
                     "source_task_id": "source_task_id",
                     "source_dag_id": "source_dag_id",
@@ -704,24 +699,13 @@ class TestGetAssetEvents(TestAssets):
 
 
 class TestGetAssetEndpoint(TestAssets):
-    @pytest.mark.parametrize(
-        "url",
-        [
-            urllib.parse.quote(
-                "s3://bucket/key/1", safe=""
-            ),  # api should cover raw as well as unquoted case like legacy
-            "s3://bucket/key/1",
-        ],
-    )
     @provide_session
-    def test_should_respond_200(self, test_client, url, session):
+    def test_should_respond_200(self, test_client, session):
         self.create_assets(num=1)
         assert session.query(AssetModel).count() == 1
         tz_datetime_format = from_datetime_to_zulu_without_ms(DEFAULT_DATE)
         with assert_queries_count(6):
-            response = test_client.get(
-                f"/public/assets/{url}",
-            )
+            response = test_client.get("/public/assets/1")
         assert response.status_code == 200
         assert response.json() == {
             "id": 1,
@@ -737,21 +721,16 @@ class TestGetAssetEndpoint(TestAssets):
         }
 
     def test_should_respond_404(self, test_client):
-        response = test_client.get(
-            f"/public/assets/{urllib.parse.quote('s3://bucket/key', safe='')}",
-        )
+        response = test_client.get("/public/assets/1")
         assert response.status_code == 404
-        assert response.json()["detail"] == "The Asset with uri: 
`s3://bucket/key` was not found"
+        assert response.json()["detail"] == "The Asset with ID: `1` was not 
found"
 
     @pytest.mark.usefixtures("time_freezer")
     @pytest.mark.enable_redact
     def test_should_mask_sensitive_extra(self, test_client, session):
         self.create_assets_with_sensitive_extra()
         tz_datetime_format = from_datetime_to_zulu_without_ms(DEFAULT_DATE)
-        uri = "s3://bucket/key/1"
-        response = test_client.get(
-            f"/public/assets/{uri}",
-        )
+        response = test_client.get("/public/assets/1")
         assert response.status_code == 200
         assert response.json() == {
             "id": 1,
@@ -808,9 +787,9 @@ class TestGetDagAssetQueuedEvents(TestQueuedEventEndpoint):
         assert response.json() == {
             "queued_events": [
                 {
-                    "created_at": 
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
-                    "uri": "s3://bucket/key/1",
+                    "asset_id": 1,
                     "dag_id": "dag",
+                    "created_at": 
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
                 }
             ],
             "total_entries": 1,
@@ -875,13 +854,12 @@ class TestPostAssetEvents(TestAssets):
     @pytest.mark.usefixtures("time_freezer")
     def test_should_respond_200(self, test_client, session):
         self.create_assets()
-        event_payload = {"uri": "s3://bucket/key/1", "extra": {"foo": "bar"}}
+        event_payload = {"asset_id": 1, "extra": {"foo": "bar"}}
         response = test_client.post("/public/assets/events", 
json=event_payload)
         assert response.status_code == 200
         assert response.json() == {
             "id": mock.ANY,
             "asset_id": 1,
-            "uri": "s3://bucket/key/1",
             "extra": {"foo": "bar", "from_rest_api": True},
             "source_task_id": None,
             "source_dag_id": None,
@@ -902,13 +880,12 @@ class TestPostAssetEvents(TestAssets):
     @pytest.mark.enable_redact
     def test_should_mask_sensitive_extra(self, test_client, session):
         self.create_assets(session)
-        event_payload = {"uri": "s3://bucket/key/1", "extra": {"password": 
"bar"}}
+        event_payload = {"asset_id": 1, "extra": {"password": "bar"}}
         response = test_client.post("/public/assets/events", 
json=event_payload)
         assert response.status_code == 200
         assert response.json() == {
             "id": mock.ANY,
             "asset_id": 1,
-            "uri": "s3://bucket/key/1",
             "extra": {"password": "***", "from_rest_api": True},
             "source_task_id": None,
             "source_dag_id": None,
@@ -925,34 +902,26 @@ class TestGetAssetQueuedEvents(TestQueuedEventEndpoint):
         dag, _ = create_dummy_dag()
         dag_id = dag.dag_id
         self.create_assets(session=session, num=1)
-        uri = "s3://bucket/key/1"
         asset_id = 1
         self._create_asset_dag_run_queues(dag_id, asset_id, session)
 
-        response = test_client.get(
-            f"/public/assets/queuedEvents/{uri}",
-        )
+        response = test_client.get(f"/public/assets/{asset_id}/queuedEvents/")
         assert response.status_code == 200
         assert response.json() == {
             "queued_events": [
                 {
-                    "created_at": 
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
-                    "uri": "s3://bucket/key/1",
+                    "asset_id": asset_id,
                     "dag_id": "dag",
+                    "created_at": 
from_datetime_to_zulu_without_ms(DEFAULT_DATE),
                 }
             ],
             "total_entries": 1,
         }
 
     def test_should_respond_404(self, test_client):
-        uri = "not_exists"
-
-        response = test_client.get(
-            f"/public/assets/queuedEvents/{uri}",
-        )
-
+        response = test_client.get("/public/assets/1/queuedEvents")
         assert response.status_code == 404
-        assert response.json()["detail"] == "Queue event with uri: 
`not_exists` was not found"
+        assert response.json()["detail"] == "Queue event with asset_id: `1` 
was not found"
 
 
 class TestDeleteAssetQueuedEvents(TestQueuedEventEndpoint):
@@ -960,33 +929,25 @@ class 
TestDeleteAssetQueuedEvents(TestQueuedEventEndpoint):
     def test_should_respond_204(self, test_client, session, create_dummy_dag):
         dag, _ = create_dummy_dag()
         dag_id = dag.dag_id
-        uri = "s3://bucket/key/1"
         self.create_assets(session=session, num=1)
         asset_id = 1
         self._create_asset_dag_run_queues(dag_id, asset_id, session)
 
-        response = test_client.delete(
-            f"/public/assets/queuedEvents/{uri}",
-        )
+        assert session.get(AssetDagRunQueue, (asset_id, dag_id)) is not None
+        response = 
test_client.delete(f"/public/assets/{asset_id}/queuedEvents")
         assert response.status_code == 204
-        assert session.query(AssetDagRunQueue).filter_by(asset_id=1).first() 
is None
+        assert session.get(AssetDagRunQueue, (asset_id, dag_id)) is None
 
     def test_should_respond_404(self, test_client):
-        uri = "not_exists"
-
-        response = test_client.delete(
-            f"/public/assets/queuedEvents/{uri}",
-        )
-
+        response = test_client.delete("/public/assets/1/queuedEvents")
         assert response.status_code == 404
-        assert response.json()["detail"] == "Queue event with uri: 
`not_exists` was not found"
+        assert response.json()["detail"] == "Queue event with asset_id: `1` 
was not found"
 
 
 class TestDeleteDagAssetQueuedEvent(TestQueuedEventEndpoint):
     def test_delete_should_respond_204(self, test_client, session, 
create_dummy_dag):
         dag, _ = create_dummy_dag()
         dag_id = dag.dag_id
-        asset_uri = "s3://bucket/key/1"
         self.create_assets(session=session, num=1)
         asset_id = 1
 
@@ -995,7 +956,7 @@ class 
TestDeleteDagAssetQueuedEvent(TestQueuedEventEndpoint):
         assert len(adrq) == 1
 
         response = test_client.delete(
-            f"/public/dags/{dag_id}/assets/queuedEvents/{asset_uri}",
+            f"/public/dags/{dag_id}/assets/{asset_id}/queuedEvents",
         )
 
         assert response.status_code == 204
@@ -1004,14 +965,14 @@ class 
TestDeleteDagAssetQueuedEvent(TestQueuedEventEndpoint):
 
     def test_should_respond_404(self, test_client):
         dag_id = "not_exists"
-        asset_uri = "not_exists"
+        asset_id = 1
 
         response = test_client.delete(
-            f"/public/dags/{dag_id}/assets/queuedEvents/{asset_uri}",
+            f"/public/dags/{dag_id}/assets/{asset_id}/queuedEvents/",
         )
 
         assert response.status_code == 404
         assert (
             response.json()["detail"]
-            == "Queued event with dag_id: `not_exists` and asset uri: 
`not_exists` was not found"
+            == "Queued event with dag_id: `not_exists` and asset_id: `1` was 
not found"
         )
diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py 
b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
index 995b98a61dd..ee57b77c489 100644
--- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -941,7 +941,7 @@ class TestDeleteDagRun:
 
 class TestGetDagRunAssetTriggerEvents:
     def test_should_respond_200(self, test_client, dag_maker, session):
-        asset1 = Asset(uri="ds1")
+        asset1 = Asset(name="ds1", uri="file:///da1")
 
         with dag_maker(dag_id="source_dag", start_date=START_DATE1, 
session=session):
             EmptyOperator(task_id="task", outlets=[asset1])
@@ -975,7 +975,6 @@ class TestGetDagRunAssetTriggerEvents:
                 {
                     "timestamp": from_datetime_to_zulu(event.timestamp),
                     "asset_id": asset1_id,
-                    "uri": asset1.uri,
                     "extra": {},
                     "id": event.id,
                     "source_dag_id": ti.dag_id,

Reply via email to