This is an automated email from the ASF dual-hosted git repository. pierrejeambrun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new dcd41f60f1c Switch asset endpoints to use id instead of uri (#44801) dcd41f60f1c is described below commit dcd41f60f1c9b5583b49bfb49b6d85c640a2892c Author: Tzu-ping Chung <uranu...@gmail.com> AuthorDate: Tue Dec 10 23:13:08 2024 +0800 Switch asset endpoints to use id instead of uri (#44801) * Switch asset endpoints to use id instead of uri With the addition of Asset.name, we can't guarantee the uri to be unique anymore. Using uri also presents addition issues on endpoints since it conflicts with subroutes. * Does not need annnotation in route format --- airflow/api_fastapi/core_api/datamodels/assets.py | 5 +- .../api_fastapi/core_api/openapi/v1-generated.yaml | 56 +++++----- .../api_fastapi/core_api/routes/public/assets.py | 121 ++++++++------------- airflow/ui/openapi-gen/queries/common.ts | 18 +-- airflow/ui/openapi-gen/queries/prefetch.ts | 34 +++--- airflow/ui/openapi-gen/queries/queries.ts | 51 ++++----- airflow/ui/openapi-gen/queries/suspense.ts | 31 +++--- airflow/ui/openapi-gen/requests/schemas.gen.ts | 23 ++-- airflow/ui/openapi-gen/requests/services.gen.ts | 30 ++--- airflow/ui/openapi-gen/requests/types.gen.ts | 21 ++-- .../core_api/routes/public/test_assets.py | 101 ++++++----------- .../core_api/routes/public/test_dag_run.py | 3 +- 12 files changed, 211 insertions(+), 283 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py b/airflow/api_fastapi/core_api/datamodels/assets.py index 047f3f557bc..97211579985 100644 --- a/airflow/api_fastapi/core_api/datamodels/assets.py +++ b/airflow/api_fastapi/core_api/datamodels/assets.py @@ -102,7 +102,6 @@ class AssetEventResponse(BaseModel): id: int asset_id: int - uri: str extra: dict | None = None source_task_id: str | None = None source_dag_id: str | None = None @@ -127,8 +126,8 @@ class AssetEventCollectionResponse(BaseModel): class QueuedEventResponse(BaseModel): """Queued Event serializer for responses..""" - uri: str dag_id: str + asset_id: int created_at: datetime @@ -142,7 +141,7 @@ class QueuedEventCollectionResponse(BaseModel): class CreateAssetEventsBody(BaseModel): """Create asset events request.""" - uri: str + asset_id: int extra: dict = Field(default_factory=dict) @field_validator("extra", mode="after") diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 3b6d55c9313..00f1c5aa510 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -594,7 +594,7 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/assets/queuedEvents/{uri}: + /public/assets/{asset_id}/queuedEvents: get: tags: - Asset @@ -602,12 +602,12 @@ paths: description: Get queued asset events for an asset. operationId: get_asset_queued_events parameters: - - name: uri + - name: asset_id in: path required: true schema: - type: string - title: Uri + type: integer + title: Asset Id - name: before in: query required: false @@ -654,12 +654,12 @@ paths: description: Delete queued asset events for an asset. operationId: delete_asset_queued_events parameters: - - name: uri + - name: asset_id in: path required: true schema: - type: string - title: Uri + type: integer + title: Asset Id - name: before in: query required: false @@ -695,7 +695,7 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/assets/{uri}: + /public/assets/{asset_id}: get: tags: - Asset @@ -703,12 +703,12 @@ paths: description: Get an asset. operationId: get_asset parameters: - - name: uri + - name: asset_id in: path required: true schema: - type: string - title: Uri + type: integer + title: Asset Id responses: '200': description: Successful Response @@ -846,7 +846,7 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/dags/{dag_id}/assets/queuedEvents/{uri}: + /public/dags/{dag_id}/assets/{asset_id}/queuedEvents: get: tags: - Asset @@ -860,12 +860,12 @@ paths: schema: type: string title: Dag Id - - name: uri + - name: asset_id in: path required: true schema: - type: string - title: Uri + type: integer + title: Asset Id - name: before in: query required: false @@ -918,12 +918,12 @@ paths: schema: type: string title: Dag Id - - name: uri + - name: asset_id in: path required: true schema: - type: string - title: Uri + type: integer + title: Asset Id - name: before in: query required: false @@ -5982,9 +5982,6 @@ components: asset_id: type: integer title: Asset Id - uri: - type: string - title: Uri extra: anyOf: - type: object @@ -6021,7 +6018,6 @@ components: required: - id - asset_id - - uri - source_map_index - created_dagruns - timestamp @@ -6548,16 +6544,16 @@ components: description: Connection Test serializer for responses. CreateAssetEventsBody: properties: - uri: - type: string - title: Uri + asset_id: + type: integer + title: Asset Id extra: type: object title: Extra additionalProperties: false type: object required: - - uri + - asset_id title: CreateAssetEventsBody description: Create asset events request. DAGCollectionResponse: @@ -8273,20 +8269,20 @@ components: description: Queued Event Collection serializer for responses. QueuedEventResponse: properties: - uri: - type: string - title: Uri dag_id: type: string title: Dag Id + asset_id: + type: integer + title: Asset Id created_at: type: string format: date-time title: Created At type: object required: - - uri - dag_id + - asset_id - created_at title: QueuedEventResponse description: Queued Event serializer for responses.. diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index 779c32f6a4c..c8cc9fb0f76 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -59,20 +59,16 @@ assets_router = AirflowRouter(tags=["Asset"]) def _generate_queued_event_where_clause( *, + asset_id: int | None = None, dag_id: str | None = None, - uri: str | None = None, before: datetime | str | None = None, ) -> list: """Get AssetDagRunQueue where clause.""" where_clause = [] if dag_id is not None: where_clause.append(AssetDagRunQueue.target_dag_id == dag_id) - if uri is not None: - where_clause.append( - AssetDagRunQueue.asset_id.in_( - select(AssetModel.id).where(AssetModel.uri == uri), - ), - ) + if asset_id is not None: + where_clause.append(AssetDagRunQueue.asset_id == asset_id) if before is not None: where_clause.append(AssetDagRunQueue.created_at < before) return where_clause @@ -227,9 +223,9 @@ def create_asset_event( session: SessionDep, ) -> AssetEventResponse: """Create asset events.""" - asset_model = session.scalar(select(AssetModel).where(AssetModel.uri == body.uri).limit(1)) + asset_model = session.scalar(select(AssetModel).where(AssetModel.id == body.asset_id).limit(1)) if not asset_model: - raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with uri: `{body.uri}` was not found") + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with ID: `{body.asset_id}` was not found") timestamp = timezone.utcnow() assets_event = asset_manager.register_asset_change( @@ -240,41 +236,35 @@ def create_asset_event( ) if not assets_event: - raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with uri: `{body.uri}` was not found") - return assets_event + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Asset with ID: `{body.asset_id}` was not found") + return AssetEventResponse.model_validate(assets_event) @assets_router.get( - "/assets/queuedEvents/{uri:path}", - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - ] - ), + "/assets/{asset_id}/queuedEvents", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def get_asset_queued_events( - uri: str, + asset_id: int, session: SessionDep, before: OptionalDateTimeQuery = None, ) -> QueuedEventCollectionResponse: """Get queued asset events for an asset.""" - print(f"uri: {uri}") - where_clause = _generate_queued_event_where_clause(uri=uri, before=before) - query = ( - select(AssetDagRunQueue, AssetModel.uri) - .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) - .where(*where_clause) - ) + where_clause = _generate_queued_event_where_clause(asset_id=asset_id, before=before) + query = select(AssetDagRunQueue).where(*where_clause) dag_asset_queued_events_select, total_entries = paginated_select(statement=query) - adrqs = session.execute(dag_asset_queued_events_select).all() + adrqs = session.scalars(dag_asset_queued_events_select).all() if not adrqs: - raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with uri: `{uri}` was not found") + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"Queue event with asset_id: `{asset_id}` was not found", + ) queued_events = [ - QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) - for adrq, uri in adrqs + QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, asset_id=adrq.asset_id) + for adrq in adrqs ] return QueuedEventCollectionResponse( @@ -284,33 +274,29 @@ def get_asset_queued_events( @assets_router.get( - "/assets/{uri:path}", + "/assets/{asset_id}", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def get_asset( - uri: str, + asset_id: int, session: SessionDep, ) -> AssetResponse: """Get an asset.""" asset = session.scalar( select(AssetModel) - .where(AssetModel.uri == uri) + .where(AssetModel.id == asset_id) .options(joinedload(AssetModel.consuming_dags), joinedload(AssetModel.producing_tasks)) ) if asset is None: - raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with uri: `{uri}` was not found") + raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with ID: `{asset_id}` was not found") return AssetResponse.model_validate(asset) @assets_router.get( "/dags/{dag_id}/assets/queuedEvents", - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - ] - ), + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def get_dag_asset_queued_events( dag_id: str, @@ -319,20 +305,16 @@ def get_dag_asset_queued_events( ) -> QueuedEventCollectionResponse: """Get queued asset events for a DAG.""" where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before) - query = ( - select(AssetDagRunQueue, AssetModel.uri) - .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) - .where(*where_clause) - ) + query = select(AssetDagRunQueue).where(*where_clause) dag_asset_queued_events_select, total_entries = paginated_select(statement=query) - adrqs = session.execute(dag_asset_queued_events_select).all() + adrqs = session.scalars(dag_asset_queued_events_select).all() if not adrqs: raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with dag_id: `{dag_id}` was not found") queued_events = [ - QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) - for adrq, uri in adrqs + QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, asset_id=adrq.asset_id) + for adrq in adrqs ] return QueuedEventCollectionResponse( @@ -342,56 +324,47 @@ def get_dag_asset_queued_events( @assets_router.get( - "/dags/{dag_id}/assets/queuedEvents/{uri:path}", - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - ] - ), + "/dags/{dag_id}/assets/{asset_id}/queuedEvents", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def get_dag_asset_queued_event( dag_id: str, - uri: str, + asset_id: int, session: SessionDep, before: OptionalDateTimeQuery = None, ) -> QueuedEventResponse: """Get a queued asset event for a DAG.""" - where_clause = _generate_queued_event_where_clause(dag_id=dag_id, uri=uri, before=before) - query = ( - select(AssetDagRunQueue) - .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) - .where(*where_clause) - ) + where_clause = _generate_queued_event_where_clause(dag_id=dag_id, asset_id=asset_id, before=before) + query = select(AssetDagRunQueue).where(*where_clause) adrq = session.scalar(query) if not adrq: raise HTTPException( status.HTTP_404_NOT_FOUND, - f"Queued event with dag_id: `{dag_id}` and asset uri: `{uri}` was not found", + f"Queued event with dag_id: `{dag_id}` and asset_id: `{asset_id}` was not found", ) - return QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) + return QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, asset_id=asset_id) @assets_router.delete( - "/assets/queuedEvents/{uri:path}", + "/assets/{asset_id}/queuedEvents", status_code=status.HTTP_204_NO_CONTENT, - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - ] - ), + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), ) def delete_asset_queued_events( - uri: str, + asset_id: int, session: SessionDep, before: OptionalDateTimeQuery = None, ): """Delete queued asset events for an asset.""" - where_clause = _generate_queued_event_where_clause(uri=uri, before=before) + where_clause = _generate_queued_event_where_clause(asset_id=asset_id, before=before) delete_stmt = delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch") result = session.execute(delete_stmt) if result.rowcount == 0: - raise HTTPException(status.HTTP_404_NOT_FOUND, detail=f"Queue event with uri: `{uri}` was not found") + raise HTTPException( + status.HTTP_404_NOT_FOUND, + detail=f"Queue event with asset_id: `{asset_id}` was not found", + ) @assets_router.delete( @@ -419,7 +392,7 @@ def delete_dag_asset_queued_events( @assets_router.delete( - "/dags/{dag_id}/assets/queuedEvents/{uri:path}", + "/dags/{dag_id}/assets/{asset_id}/queuedEvents", status_code=status.HTTP_204_NO_CONTENT, responses=create_openapi_http_exception_doc( [ @@ -430,12 +403,12 @@ def delete_dag_asset_queued_events( ) def delete_dag_asset_queued_event( dag_id: str, - uri: str, + asset_id: int, session: SessionDep, before: OptionalDateTimeQuery = None, ): """Delete a queued asset event for a DAG.""" - where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before, uri=uri) + where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before, asset_id=asset_id) delete_statement = ( delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch") ) @@ -443,5 +416,5 @@ def delete_dag_asset_queued_event( if result.rowcount == 0: raise HTTPException( status.HTTP_404_NOT_FOUND, - detail=f"Queued event with dag_id: `{dag_id}` and asset uri: `{uri}` was not found", + detail=f"Queued event with dag_id: `{dag_id}` and asset_id: `{asset_id}` was not found", ) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index b759c0f2f23..ed1d6066bdc 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -174,16 +174,16 @@ export const useAssetServiceGetAssetQueuedEventsKey = "AssetServiceGetAssetQueuedEvents"; export const UseAssetServiceGetAssetQueuedEventsKeyFn = ( { + assetId, before, - uri, }: { + assetId: number; before?: string; - uri: string; }, queryKey?: Array<unknown>, ) => [ useAssetServiceGetAssetQueuedEventsKey, - ...(queryKey ?? [{ before, uri }]), + ...(queryKey ?? [{ assetId, before }]), ]; export type AssetServiceGetAssetDefaultResponse = Awaited< ReturnType<typeof AssetService.getAsset> @@ -195,12 +195,12 @@ export type AssetServiceGetAssetQueryResult< export const useAssetServiceGetAssetKey = "AssetServiceGetAsset"; export const UseAssetServiceGetAssetKeyFn = ( { - uri, + assetId, }: { - uri: string; + assetId: number; }, queryKey?: Array<unknown>, -) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ uri }])]; +) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ assetId }])]; export type AssetServiceGetDagAssetQueuedEventsDefaultResponse = Awaited< ReturnType<typeof AssetService.getDagAssetQueuedEvents> >; @@ -234,18 +234,18 @@ export const useAssetServiceGetDagAssetQueuedEventKey = "AssetServiceGetDagAssetQueuedEvent"; export const UseAssetServiceGetDagAssetQueuedEventKeyFn = ( { + assetId, before, dagId, - uri, }: { + assetId: number; before?: string; dagId: string; - uri: string; }, queryKey?: Array<unknown>, ) => [ useAssetServiceGetDagAssetQueuedEventKey, - ...(queryKey ?? [{ before, dagId, uri }]), + ...(queryKey ?? [{ assetId, before, dagId }]), ]; export type ConfigServiceGetConfigsDefaultResponse = Awaited< ReturnType<typeof ConfigService.getConfigs> diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index eab8ea6b356..4bb01a7feaa 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -219,7 +219,7 @@ export const prefetchUseAssetServiceGetAssetEvents = ( * Get Asset Queued Events * Get queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventCollectionResponse Successful Response * @throws ApiError @@ -227,36 +227,39 @@ export const prefetchUseAssetServiceGetAssetEvents = ( export const prefetchUseAssetServiceGetAssetQueuedEvents = ( queryClient: QueryClient, { + assetId, before, - uri, }: { + assetId: number; before?: string; - uri: string; }, ) => queryClient.prefetchQuery({ - queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn({ before, uri }), - queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }), + queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn({ + assetId, + before, + }), + queryFn: () => AssetService.getAssetQueuedEvents({ assetId, before }), }); /** * Get Asset * Get an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @returns AssetResponse Successful Response * @throws ApiError */ export const prefetchUseAssetServiceGetAsset = ( queryClient: QueryClient, { - uri, + assetId, }: { - uri: string; + assetId: number; }, ) => queryClient.prefetchQuery({ - queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }), - queryFn: () => AssetService.getAsset({ uri }), + queryKey: Common.UseAssetServiceGetAssetKeyFn({ assetId }), + queryFn: () => AssetService.getAsset({ assetId }), }); /** * Get Dag Asset Queued Events @@ -289,7 +292,7 @@ export const prefetchUseAssetServiceGetDagAssetQueuedEvents = ( * Get a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventResponse Successful Response * @throws ApiError @@ -297,22 +300,23 @@ export const prefetchUseAssetServiceGetDagAssetQueuedEvents = ( export const prefetchUseAssetServiceGetDagAssetQueuedEvent = ( queryClient: QueryClient, { + assetId, before, dagId, - uri, }: { + assetId: number; before?: string; dagId: string; - uri: string; }, ) => queryClient.prefetchQuery({ queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn({ + assetId, before, dagId, - uri, }), - queryFn: () => AssetService.getDagAssetQueuedEvent({ before, dagId, uri }), + queryFn: () => + AssetService.getDagAssetQueuedEvent({ assetId, before, dagId }), }); /** * Get Configs diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index e90ae58e6b0..f8c8bcd7a58 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -279,7 +279,7 @@ export const useAssetServiceGetAssetEvents = < * Get Asset Queued Events * Get queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventCollectionResponse Successful Response * @throws ApiError @@ -290,28 +290,29 @@ export const useAssetServiceGetAssetQueuedEvents = < TQueryKey extends Array<unknown> = unknown[], >( { + assetId, before, - uri, }: { + assetId: number; before?: string; - uri: string; }, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">, ) => useQuery<TData, TError>({ queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn( - { before, uri }, + { assetId, before }, queryKey, ), - queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }) as TData, + queryFn: () => + AssetService.getAssetQueuedEvents({ assetId, before }) as TData, ...options, }); /** * Get Asset * Get an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @returns AssetResponse Successful Response * @throws ApiError */ @@ -321,16 +322,16 @@ export const useAssetServiceGetAsset = < TQueryKey extends Array<unknown> = unknown[], >( { - uri, + assetId, }: { - uri: string; + assetId: number; }, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">, ) => useQuery<TData, TError>({ - queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }, queryKey), - queryFn: () => AssetService.getAsset({ uri }) as TData, + queryKey: Common.UseAssetServiceGetAssetKeyFn({ assetId }, queryKey), + queryFn: () => AssetService.getAsset({ assetId }) as TData, ...options, }); /** @@ -371,7 +372,7 @@ export const useAssetServiceGetDagAssetQueuedEvents = < * Get a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventResponse Successful Response * @throws ApiError @@ -382,24 +383,24 @@ export const useAssetServiceGetDagAssetQueuedEvent = < TQueryKey extends Array<unknown> = unknown[], >( { + assetId, before, dagId, - uri, }: { + assetId: number; before?: string; dagId: string; - uri: string; }, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">, ) => useQuery<TData, TError>({ queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn( - { before, dagId, uri }, + { assetId, before, dagId }, queryKey, ), queryFn: () => - AssetService.getDagAssetQueuedEvent({ before, dagId, uri }) as TData, + AssetService.getDagAssetQueuedEvent({ assetId, before, dagId }) as TData, ...options, }); /** @@ -3801,7 +3802,7 @@ export const useVariableServicePatchVariable = < * Delete Asset Queued Events * Delete queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns void Successful Response * @throws ApiError @@ -3816,8 +3817,8 @@ export const useAssetServiceDeleteAssetQueuedEvents = < TData, TError, { + assetId: number; before?: string; - uri: string; }, TContext >, @@ -3828,15 +3829,15 @@ export const useAssetServiceDeleteAssetQueuedEvents = < TData, TError, { + assetId: number; before?: string; - uri: string; }, TContext >({ - mutationFn: ({ before, uri }) => + mutationFn: ({ assetId, before }) => AssetService.deleteAssetQueuedEvents({ + assetId, before, - uri, }) as unknown as Promise<TData>, ...options, }); @@ -3887,7 +3888,7 @@ export const useAssetServiceDeleteDagAssetQueuedEvents = < * Delete a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns void Successful Response * @throws ApiError @@ -3902,9 +3903,9 @@ export const useAssetServiceDeleteDagAssetQueuedEvent = < TData, TError, { + assetId: number; before?: string; dagId: string; - uri: string; }, TContext >, @@ -3915,17 +3916,17 @@ export const useAssetServiceDeleteDagAssetQueuedEvent = < TData, TError, { + assetId: number; before?: string; dagId: string; - uri: string; }, TContext >({ - mutationFn: ({ before, dagId, uri }) => + mutationFn: ({ assetId, before, dagId }) => AssetService.deleteDagAssetQueuedEvent({ + assetId, before, dagId, - uri, }) as unknown as Promise<TData>, ...options, }); diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index e7b21338760..cd6d7c5a798 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -254,7 +254,7 @@ export const useAssetServiceGetAssetEventsSuspense = < * Get Asset Queued Events * Get queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventCollectionResponse Successful Response * @throws ApiError @@ -265,28 +265,29 @@ export const useAssetServiceGetAssetQueuedEventsSuspense = < TQueryKey extends Array<unknown> = unknown[], >( { + assetId, before, - uri, }: { + assetId: number; before?: string; - uri: string; }, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">, ) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn( - { before, uri }, + { assetId, before }, queryKey, ), - queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }) as TData, + queryFn: () => + AssetService.getAssetQueuedEvents({ assetId, before }) as TData, ...options, }); /** * Get Asset * Get an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @returns AssetResponse Successful Response * @throws ApiError */ @@ -296,16 +297,16 @@ export const useAssetServiceGetAssetSuspense = < TQueryKey extends Array<unknown> = unknown[], >( { - uri, + assetId, }: { - uri: string; + assetId: number; }, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">, ) => useSuspenseQuery<TData, TError>({ - queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }, queryKey), - queryFn: () => AssetService.getAsset({ uri }) as TData, + queryKey: Common.UseAssetServiceGetAssetKeyFn({ assetId }, queryKey), + queryFn: () => AssetService.getAsset({ assetId }) as TData, ...options, }); /** @@ -346,7 +347,7 @@ export const useAssetServiceGetDagAssetQueuedEventsSuspense = < * Get a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventResponse Successful Response * @throws ApiError @@ -357,24 +358,24 @@ export const useAssetServiceGetDagAssetQueuedEventSuspense = < TQueryKey extends Array<unknown> = unknown[], >( { + assetId, before, dagId, - uri, }: { + assetId: number; before?: string; dagId: string; - uri: string; }, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">, ) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseAssetServiceGetDagAssetQueuedEventKeyFn( - { before, dagId, uri }, + { assetId, before, dagId }, queryKey, ), queryFn: () => - AssetService.getDagAssetQueuedEvent({ before, dagId, uri }) as TData, + AssetService.getDagAssetQueuedEvent({ assetId, before, dagId }) as TData, ...options, }); /** diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 622ee94d697..965355b0c19 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -180,10 +180,6 @@ export const $AssetEventResponse = { type: "integer", title: "Asset Id", }, - uri: { - type: "string", - title: "Uri", - }, extra: { anyOf: [ { @@ -249,7 +245,6 @@ export const $AssetEventResponse = { required: [ "id", "asset_id", - "uri", "source_map_index", "created_dagruns", "timestamp", @@ -1025,9 +1020,9 @@ export const $ConnectionTestResponse = { export const $CreateAssetEventsBody = { properties: { - uri: { - type: "string", - title: "Uri", + asset_id: { + type: "integer", + title: "Asset Id", }, extra: { type: "object", @@ -1036,7 +1031,7 @@ export const $CreateAssetEventsBody = { }, additionalProperties: false, type: "object", - required: ["uri"], + required: ["asset_id"], title: "CreateAssetEventsBody", description: "Create asset events request.", } as const; @@ -3640,14 +3635,14 @@ export const $QueuedEventCollectionResponse = { export const $QueuedEventResponse = { properties: { - uri: { - type: "string", - title: "Uri", - }, dag_id: { type: "string", title: "Dag Id", }, + asset_id: { + type: "integer", + title: "Asset Id", + }, created_at: { type: "string", format: "date-time", @@ -3655,7 +3650,7 @@ export const $QueuedEventResponse = { }, }, type: "object", - required: ["uri", "dag_id", "created_at"], + required: ["dag_id", "asset_id", "created_at"], title: "QueuedEventResponse", description: "Queued Event serializer for responses..", } as const; diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index c855ca58092..f0446ae0fd8 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -370,7 +370,7 @@ export class AssetService { * Get Asset Queued Events * Get queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventCollectionResponse Successful Response * @throws ApiError @@ -380,9 +380,9 @@ export class AssetService { ): CancelablePromise<GetAssetQueuedEventsResponse> { return __request(OpenAPI, { method: "GET", - url: "/public/assets/queuedEvents/{uri}", + url: "/public/assets/{asset_id}/queuedEvents", path: { - uri: data.uri, + asset_id: data.assetId, }, query: { before: data.before, @@ -400,7 +400,7 @@ export class AssetService { * Delete Asset Queued Events * Delete queued asset events for an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @param data.before * @returns void Successful Response * @throws ApiError @@ -410,9 +410,9 @@ export class AssetService { ): CancelablePromise<DeleteAssetQueuedEventsResponse> { return __request(OpenAPI, { method: "DELETE", - url: "/public/assets/queuedEvents/{uri}", + url: "/public/assets/{asset_id}/queuedEvents", path: { - uri: data.uri, + asset_id: data.assetId, }, query: { before: data.before, @@ -430,7 +430,7 @@ export class AssetService { * Get Asset * Get an asset. * @param data The data for the request. - * @param data.uri + * @param data.assetId * @returns AssetResponse Successful Response * @throws ApiError */ @@ -439,9 +439,9 @@ export class AssetService { ): CancelablePromise<GetAssetResponse> { return __request(OpenAPI, { method: "GET", - url: "/public/assets/{uri}", + url: "/public/assets/{asset_id}", path: { - uri: data.uri, + asset_id: data.assetId, }, errors: { 401: "Unauthorized", @@ -517,7 +517,7 @@ export class AssetService { * Get a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns QueuedEventResponse Successful Response * @throws ApiError @@ -527,10 +527,10 @@ export class AssetService { ): CancelablePromise<GetDagAssetQueuedEventResponse> { return __request(OpenAPI, { method: "GET", - url: "/public/dags/{dag_id}/assets/queuedEvents/{uri}", + url: "/public/dags/{dag_id}/assets/{asset_id}/queuedEvents", path: { dag_id: data.dagId, - uri: data.uri, + asset_id: data.assetId, }, query: { before: data.before, @@ -549,7 +549,7 @@ export class AssetService { * Delete a queued asset event for a DAG. * @param data The data for the request. * @param data.dagId - * @param data.uri + * @param data.assetId * @param data.before * @returns void Successful Response * @throws ApiError @@ -559,10 +559,10 @@ export class AssetService { ): CancelablePromise<DeleteDagAssetQueuedEventResponse> { return __request(OpenAPI, { method: "DELETE", - url: "/public/dags/{dag_id}/assets/queuedEvents/{uri}", + url: "/public/dags/{dag_id}/assets/{asset_id}/queuedEvents", path: { dag_id: data.dagId, - uri: data.uri, + asset_id: data.assetId, }, query: { before: data.before, diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 96a159bad07..88f6a2fc763 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -60,7 +60,6 @@ export type AssetEventCollectionResponse = { export type AssetEventResponse = { id: number; asset_id: number; - uri: string; extra?: { [key: string]: unknown; } | null; @@ -265,7 +264,7 @@ export type ConnectionTestResponse = { * Create asset events request. */ export type CreateAssetEventsBody = { - uri: string; + asset_id: number; extra?: { [key: string]: unknown; }; @@ -899,8 +898,8 @@ export type QueuedEventCollectionResponse = { * Queued Event serializer for responses.. */ export type QueuedEventResponse = { - uri: string; dag_id: string; + asset_id: number; created_at: string; }; @@ -1353,21 +1352,21 @@ export type CreateAssetEventData = { export type CreateAssetEventResponse = AssetEventResponse; export type GetAssetQueuedEventsData = { + assetId: number; before?: string | null; - uri: string; }; export type GetAssetQueuedEventsResponse = QueuedEventCollectionResponse; export type DeleteAssetQueuedEventsData = { + assetId: number; before?: string | null; - uri: string; }; export type DeleteAssetQueuedEventsResponse = void; export type GetAssetData = { - uri: string; + assetId: number; }; export type GetAssetResponse = AssetResponse; @@ -1387,17 +1386,17 @@ export type DeleteDagAssetQueuedEventsData = { export type DeleteDagAssetQueuedEventsResponse = void; export type GetDagAssetQueuedEventData = { + assetId: number; before?: string | null; dagId: string; - uri: string; }; export type GetDagAssetQueuedEventResponse = QueuedEventResponse; export type DeleteDagAssetQueuedEventData = { + assetId: number; before?: string | null; dagId: string; - uri: string; }; export type DeleteDagAssetQueuedEventResponse = void; @@ -2226,7 +2225,7 @@ export type $OpenApiTs = { }; }; }; - "/public/assets/queuedEvents/{uri}": { + "/public/assets/{asset_id}/queuedEvents": { get: { req: GetAssetQueuedEventsData; res: { @@ -2278,7 +2277,7 @@ export type $OpenApiTs = { }; }; }; - "/public/assets/{uri}": { + "/public/assets/{asset_id}": { get: { req: GetAssetData; res: { @@ -2361,7 +2360,7 @@ export type $OpenApiTs = { }; }; }; - "/public/dags/{dag_id}/assets/queuedEvents/{uri}": { + "/public/dags/{dag_id}/assets/{asset_id}/queuedEvents": { get: { req: GetDagAssetQueuedEventData; res: { diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index c7c2c2405d2..46c769640a8 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -16,7 +16,6 @@ # under the License. from __future__ import annotations -import urllib from collections.abc import Generator from datetime import datetime from unittest import mock @@ -543,7 +542,6 @@ class TestGetAssetEvents(TestAssets): { "id": 1, "asset_id": 1, - "uri": "s3://bucket/key/1", "extra": {"foo": "bar"}, "source_task_id": "source_task_id", "source_dag_id": "source_dag_id", @@ -566,7 +564,6 @@ class TestGetAssetEvents(TestAssets): { "id": 2, "asset_id": 2, - "uri": "s3://bucket/key/2", "extra": {"foo": "bar"}, "source_task_id": "source_task_id", "source_dag_id": "source_dag_id", @@ -618,17 +615,17 @@ class TestGetAssetEvents(TestAssets): assert response.json()["detail"] == msg @pytest.mark.parametrize( - "params, expected_asset_uris", + "params, expected_asset_ids", [ # Limit test data - ({"limit": "1"}, ["s3://bucket/key/1"]), - ({"limit": "100"}, [f"s3://bucket/key/{i}" for i in range(1, 101)]), + ({"limit": "1"}, [1]), + ({"limit": "100"}, list(range(1, 101))), # Offset test data - ({"offset": "1"}, [f"s3://bucket/key/{i}" for i in range(2, 102)]), - ({"offset": "3"}, [f"s3://bucket/key/{i}" for i in range(4, 104)]), + ({"offset": "1"}, list(range(2, 102))), + ({"offset": "3"}, list(range(4, 104))), ], ) - def test_limit_and_offset(self, test_client, params, expected_asset_uris): + def test_limit_and_offset(self, test_client, params, expected_asset_ids): self.create_assets(num=110) self.create_assets_events(num=110) self.create_dag_run(num=110) @@ -637,8 +634,8 @@ class TestGetAssetEvents(TestAssets): response = test_client.get("/public/assets/events", params=params) assert response.status_code == 200 - asset_uris = [asset["uri"] for asset in response.json()["asset_events"]] - assert asset_uris == expected_asset_uris + asset_ids = [asset["id"] for asset in response.json()["asset_events"]] + assert asset_ids == expected_asset_ids @pytest.mark.usefixtures("time_freezer") @pytest.mark.enable_redact @@ -655,7 +652,6 @@ class TestGetAssetEvents(TestAssets): { "id": 1, "asset_id": 1, - "uri": "s3://bucket/key/1", "extra": {"password": "***"}, "source_task_id": "source_task_id", "source_dag_id": "source_dag_id", @@ -678,7 +674,6 @@ class TestGetAssetEvents(TestAssets): { "id": 2, "asset_id": 2, - "uri": "s3://bucket/key/2", "extra": {"password": "***"}, "source_task_id": "source_task_id", "source_dag_id": "source_dag_id", @@ -704,24 +699,13 @@ class TestGetAssetEvents(TestAssets): class TestGetAssetEndpoint(TestAssets): - @pytest.mark.parametrize( - "url", - [ - urllib.parse.quote( - "s3://bucket/key/1", safe="" - ), # api should cover raw as well as unquoted case like legacy - "s3://bucket/key/1", - ], - ) @provide_session - def test_should_respond_200(self, test_client, url, session): + def test_should_respond_200(self, test_client, session): self.create_assets(num=1) assert session.query(AssetModel).count() == 1 tz_datetime_format = from_datetime_to_zulu_without_ms(DEFAULT_DATE) with assert_queries_count(6): - response = test_client.get( - f"/public/assets/{url}", - ) + response = test_client.get("/public/assets/1") assert response.status_code == 200 assert response.json() == { "id": 1, @@ -737,21 +721,16 @@ class TestGetAssetEndpoint(TestAssets): } def test_should_respond_404(self, test_client): - response = test_client.get( - f"/public/assets/{urllib.parse.quote('s3://bucket/key', safe='')}", - ) + response = test_client.get("/public/assets/1") assert response.status_code == 404 - assert response.json()["detail"] == "The Asset with uri: `s3://bucket/key` was not found" + assert response.json()["detail"] == "The Asset with ID: `1` was not found" @pytest.mark.usefixtures("time_freezer") @pytest.mark.enable_redact def test_should_mask_sensitive_extra(self, test_client, session): self.create_assets_with_sensitive_extra() tz_datetime_format = from_datetime_to_zulu_without_ms(DEFAULT_DATE) - uri = "s3://bucket/key/1" - response = test_client.get( - f"/public/assets/{uri}", - ) + response = test_client.get("/public/assets/1") assert response.status_code == 200 assert response.json() == { "id": 1, @@ -808,9 +787,9 @@ class TestGetDagAssetQueuedEvents(TestQueuedEventEndpoint): assert response.json() == { "queued_events": [ { - "created_at": from_datetime_to_zulu_without_ms(DEFAULT_DATE), - "uri": "s3://bucket/key/1", + "asset_id": 1, "dag_id": "dag", + "created_at": from_datetime_to_zulu_without_ms(DEFAULT_DATE), } ], "total_entries": 1, @@ -875,13 +854,12 @@ class TestPostAssetEvents(TestAssets): @pytest.mark.usefixtures("time_freezer") def test_should_respond_200(self, test_client, session): self.create_assets() - event_payload = {"uri": "s3://bucket/key/1", "extra": {"foo": "bar"}} + event_payload = {"asset_id": 1, "extra": {"foo": "bar"}} response = test_client.post("/public/assets/events", json=event_payload) assert response.status_code == 200 assert response.json() == { "id": mock.ANY, "asset_id": 1, - "uri": "s3://bucket/key/1", "extra": {"foo": "bar", "from_rest_api": True}, "source_task_id": None, "source_dag_id": None, @@ -902,13 +880,12 @@ class TestPostAssetEvents(TestAssets): @pytest.mark.enable_redact def test_should_mask_sensitive_extra(self, test_client, session): self.create_assets(session) - event_payload = {"uri": "s3://bucket/key/1", "extra": {"password": "bar"}} + event_payload = {"asset_id": 1, "extra": {"password": "bar"}} response = test_client.post("/public/assets/events", json=event_payload) assert response.status_code == 200 assert response.json() == { "id": mock.ANY, "asset_id": 1, - "uri": "s3://bucket/key/1", "extra": {"password": "***", "from_rest_api": True}, "source_task_id": None, "source_dag_id": None, @@ -925,34 +902,26 @@ class TestGetAssetQueuedEvents(TestQueuedEventEndpoint): dag, _ = create_dummy_dag() dag_id = dag.dag_id self.create_assets(session=session, num=1) - uri = "s3://bucket/key/1" asset_id = 1 self._create_asset_dag_run_queues(dag_id, asset_id, session) - response = test_client.get( - f"/public/assets/queuedEvents/{uri}", - ) + response = test_client.get(f"/public/assets/{asset_id}/queuedEvents/") assert response.status_code == 200 assert response.json() == { "queued_events": [ { - "created_at": from_datetime_to_zulu_without_ms(DEFAULT_DATE), - "uri": "s3://bucket/key/1", + "asset_id": asset_id, "dag_id": "dag", + "created_at": from_datetime_to_zulu_without_ms(DEFAULT_DATE), } ], "total_entries": 1, } def test_should_respond_404(self, test_client): - uri = "not_exists" - - response = test_client.get( - f"/public/assets/queuedEvents/{uri}", - ) - + response = test_client.get("/public/assets/1/queuedEvents") assert response.status_code == 404 - assert response.json()["detail"] == "Queue event with uri: `not_exists` was not found" + assert response.json()["detail"] == "Queue event with asset_id: `1` was not found" class TestDeleteAssetQueuedEvents(TestQueuedEventEndpoint): @@ -960,33 +929,25 @@ class TestDeleteAssetQueuedEvents(TestQueuedEventEndpoint): def test_should_respond_204(self, test_client, session, create_dummy_dag): dag, _ = create_dummy_dag() dag_id = dag.dag_id - uri = "s3://bucket/key/1" self.create_assets(session=session, num=1) asset_id = 1 self._create_asset_dag_run_queues(dag_id, asset_id, session) - response = test_client.delete( - f"/public/assets/queuedEvents/{uri}", - ) + assert session.get(AssetDagRunQueue, (asset_id, dag_id)) is not None + response = test_client.delete(f"/public/assets/{asset_id}/queuedEvents") assert response.status_code == 204 - assert session.query(AssetDagRunQueue).filter_by(asset_id=1).first() is None + assert session.get(AssetDagRunQueue, (asset_id, dag_id)) is None def test_should_respond_404(self, test_client): - uri = "not_exists" - - response = test_client.delete( - f"/public/assets/queuedEvents/{uri}", - ) - + response = test_client.delete("/public/assets/1/queuedEvents") assert response.status_code == 404 - assert response.json()["detail"] == "Queue event with uri: `not_exists` was not found" + assert response.json()["detail"] == "Queue event with asset_id: `1` was not found" class TestDeleteDagAssetQueuedEvent(TestQueuedEventEndpoint): def test_delete_should_respond_204(self, test_client, session, create_dummy_dag): dag, _ = create_dummy_dag() dag_id = dag.dag_id - asset_uri = "s3://bucket/key/1" self.create_assets(session=session, num=1) asset_id = 1 @@ -995,7 +956,7 @@ class TestDeleteDagAssetQueuedEvent(TestQueuedEventEndpoint): assert len(adrq) == 1 response = test_client.delete( - f"/public/dags/{dag_id}/assets/queuedEvents/{asset_uri}", + f"/public/dags/{dag_id}/assets/{asset_id}/queuedEvents", ) assert response.status_code == 204 @@ -1004,14 +965,14 @@ class TestDeleteDagAssetQueuedEvent(TestQueuedEventEndpoint): def test_should_respond_404(self, test_client): dag_id = "not_exists" - asset_uri = "not_exists" + asset_id = 1 response = test_client.delete( - f"/public/dags/{dag_id}/assets/queuedEvents/{asset_uri}", + f"/public/dags/{dag_id}/assets/{asset_id}/queuedEvents/", ) assert response.status_code == 404 assert ( response.json()["detail"] - == "Queued event with dag_id: `not_exists` and asset uri: `not_exists` was not found" + == "Queued event with dag_id: `not_exists` and asset_id: `1` was not found" ) diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 995b98a61dd..ee57b77c489 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -941,7 +941,7 @@ class TestDeleteDagRun: class TestGetDagRunAssetTriggerEvents: def test_should_respond_200(self, test_client, dag_maker, session): - asset1 = Asset(uri="ds1") + asset1 = Asset(name="ds1", uri="file:///da1") with dag_maker(dag_id="source_dag", start_date=START_DATE1, session=session): EmptyOperator(task_id="task", outlets=[asset1]) @@ -975,7 +975,6 @@ class TestGetDagRunAssetTriggerEvents: { "timestamp": from_datetime_to_zulu(event.timestamp), "asset_id": asset1_id, - "uri": asset1.uri, "extra": {}, "id": event.id, "source_dag_id": ti.dag_id,