This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c84d35622cc AIP 84: Migrate GET one ASSET legacy API to fast API
(#43825)
c84d35622cc is described below
commit c84d35622cc13b5a84a46797f5a4496d4efea8d9
Author: Amogh Desai <[email protected]>
AuthorDate: Thu Nov 14 09:35:38 2024 +0530
AIP 84: Migrate GET one ASSET legacy API to fast API (#43825)
---
airflow/api_connexion/endpoints/asset_endpoint.py | 1 +
.../api_fastapi/core_api/openapi/v1-generated.yaml | 45 ++++++++++++++++++++++
.../api_fastapi/core_api/routes/public/assets.py | 32 +++++++++++++--
airflow/ui/openapi-gen/queries/common.ts | 16 ++++++++
airflow/ui/openapi-gen/queries/prefetch.ts | 20 ++++++++++
airflow/ui/openapi-gen/queries/queries.ts | 26 +++++++++++++
airflow/ui/openapi-gen/queries/suspense.ts | 26 +++++++++++++
airflow/ui/openapi-gen/requests/services.gen.ts | 28 ++++++++++++++
airflow/ui/openapi-gen/requests/types.gen.ts | 33 ++++++++++++++++
.../core_api/routes/public/test_assets.py | 42 ++++++++++++++++++++
10 files changed, 265 insertions(+), 4 deletions(-)
diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py
b/airflow/api_connexion/endpoints/asset_endpoint.py
index 085817213d0..0c45ddd7095 100644
--- a/airflow/api_connexion/endpoints/asset_endpoint.py
+++ b/airflow/api_connexion/endpoints/asset_endpoint.py
@@ -61,6 +61,7 @@ if TYPE_CHECKING:
RESOURCE_EVENT_PREFIX = "asset"
+@mark_fastapi_migration_done
@security.requires_access_asset("GET")
@provide_session
def get_asset(*, uri: str, session: Session = NEW_SESSION) -> APIResponse:
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 7d63fd1f180..6efa1e44585 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -246,6 +246,51 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /public/assets/{uri}:
+ get:
+ tags:
+ - Asset
+ summary: Get Asset
+ description: Get an asset.
+ operationId: get_asset
+ parameters:
+ - name: uri
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Uri
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/AssetResponse'
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/public/backfills/:
get:
tags:
diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py
b/airflow/api_fastapi/core_api/routes/public/assets.py
index 7ec7012de8d..d093eef3720 100644
--- a/airflow/api_fastapi/core_api/routes/public/assets.py
+++ b/airflow/api_fastapi/core_api/routes/public/assets.py
@@ -19,9 +19,9 @@ from __future__ import annotations
from typing import Annotated
-from fastapi import Depends
+from fastapi import Depends, HTTPException, status
from sqlalchemy import select
-from sqlalchemy.orm import Session
+from sqlalchemy.orm import Session, joinedload, subqueryload
from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import (
@@ -63,9 +63,33 @@ def get_assets(
limit=limit,
session=session,
)
-
- assets = session.scalars(assets_select).all()
+ assets = session.scalars(
+ assets_select.options(
+ subqueryload(AssetModel.consuming_dags),
subqueryload(AssetModel.producing_tasks)
+ )
+ ).all()
return AssetCollectionResponse(
assets=[AssetResponse.model_validate(asset, from_attributes=True) for
asset in assets],
total_entries=total_entries,
)
+
+
+@assets_router.get(
+ "/{uri:path}",
+ responses=create_openapi_http_exception_doc([401, 403, 404]),
+)
+def get_asset(
+ uri: str,
+ session: Annotated[Session, Depends(get_session)],
+) -> AssetResponse:
+ """Get an asset."""
+ asset = session.scalar(
+ select(AssetModel)
+ .where(AssetModel.uri == uri)
+ .options(joinedload(AssetModel.consuming_dags),
joinedload(AssetModel.producing_tasks))
+ )
+
+ if asset is None:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with uri:
`{uri}` was not found")
+
+ return AssetResponse.model_validate(asset, from_attributes=True)
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index 51fc4d6d001..1fff143182f 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -69,6 +69,22 @@ export const UseAssetServiceGetAssetsKeyFn = (
useAssetServiceGetAssetsKey,
...(queryKey ?? [{ dagIds, limit, offset, orderBy, uriPattern }]),
];
+export type AssetServiceGetAssetDefaultResponse = Awaited<
+ ReturnType<typeof AssetService.getAsset>
+>;
+export type AssetServiceGetAssetQueryResult<
+ TData = AssetServiceGetAssetDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useAssetServiceGetAssetKey = "AssetServiceGetAsset";
+export const UseAssetServiceGetAssetKeyFn = (
+ {
+ uri,
+ }: {
+ uri: string;
+ },
+ queryKey?: Array<unknown>,
+) => [useAssetServiceGetAssetKey, ...(queryKey ?? [{ uri }])];
export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited<
ReturnType<typeof DashboardService.historicalMetrics>
>;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index 76d0a35e7bf..b822e35dd33 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -85,6 +85,26 @@ export const prefetchUseAssetServiceGetAssets = (
queryFn: () =>
AssetService.getAssets({ dagIds, limit, offset, orderBy, uriPattern }),
});
+/**
+ * Get Asset
+ * Get an asset.
+ * @param data The data for the request.
+ * @param data.uri
+ * @returns AssetResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseAssetServiceGetAsset = (
+ queryClient: QueryClient,
+ {
+ uri,
+ }: {
+ uri: string;
+ },
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }),
+ queryFn: () => AssetService.getAsset({ uri }),
+ });
/**
* Historical Metrics
* Return cluster activity historical metrics.
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index 31b73b7aa26..475126ef506 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -115,6 +115,32 @@ export const useAssetServiceGetAssets = <
}) as TData,
...options,
});
+/**
+ * Get Asset
+ * Get an asset.
+ * @param data The data for the request.
+ * @param data.uri
+ * @returns AssetResponse Successful Response
+ * @throws ApiError
+ */
+export const useAssetServiceGetAsset = <
+ TData = Common.AssetServiceGetAssetDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ uri,
+ }: {
+ uri: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }, queryKey),
+ queryFn: () => AssetService.getAsset({ uri }) as TData,
+ ...options,
+ });
/**
* Historical Metrics
* Return cluster activity historical metrics.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index 534eafeca1f..83b76e51127 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -100,6 +100,32 @@ export const useAssetServiceGetAssetsSuspense = <
}) as TData,
...options,
});
+/**
+ * Get Asset
+ * Get an asset.
+ * @param data The data for the request.
+ * @param data.uri
+ * @returns AssetResponse Successful Response
+ * @throws ApiError
+ */
+export const useAssetServiceGetAssetSuspense = <
+ TData = Common.AssetServiceGetAssetDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ uri,
+ }: {
+ uri: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseAssetServiceGetAssetKeyFn({ uri }, queryKey),
+ queryFn: () => AssetService.getAsset({ uri }) as TData,
+ ...options,
+ });
/**
* Historical Metrics
* Return cluster activity historical metrics.
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index f3287bcc89a..34fa7f24765 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -7,6 +7,8 @@ import type {
NextRunAssetsResponse,
GetAssetsData,
GetAssetsResponse,
+ GetAssetData,
+ GetAssetResponse,
HistoricalMetricsData,
HistoricalMetricsResponse,
RecentDagRunsData,
@@ -169,6 +171,32 @@ export class AssetService {
},
});
}
+
+ /**
+ * Get Asset
+ * Get an asset.
+ * @param data The data for the request.
+ * @param data.uri
+ * @returns AssetResponse Successful Response
+ * @throws ApiError
+ */
+ public static getAsset(
+ data: GetAssetData,
+ ): CancelablePromise<GetAssetResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/public/assets/{uri}",
+ path: {
+ uri: data.uri,
+ },
+ errors: {
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
}
export class DashboardService {
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index ef9d25cfab1..6c036f76bf2 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -931,6 +931,12 @@ export type GetAssetsData = {
export type GetAssetsResponse = AssetCollectionResponse;
+export type GetAssetData = {
+ uri: string;
+};
+
+export type GetAssetResponse = AssetResponse;
+
export type HistoricalMetricsData = {
endDate: string;
startDate: string;
@@ -1418,6 +1424,33 @@ export type $OpenApiTs = {
};
};
};
+ "/public/assets/{uri}": {
+ get: {
+ req: GetAssetData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: AssetResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
"/ui/dashboard/historical_metrics_data": {
get: {
req: HistoricalMetricsData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py
b/tests/api_fastapi/core_api/routes/public/test_assets.py
index e243c3c2930..eb72c1a99ac 100644
--- a/tests/api_fastapi/core_api/routes/public/test_assets.py
+++ b/tests/api_fastapi/core_api/routes/public/test_assets.py
@@ -16,6 +16,8 @@
# under the License.
from __future__ import annotations
+import urllib
+
import pytest
from airflow.models import DagModel
@@ -23,6 +25,7 @@ from airflow.models.asset import AssetModel,
DagScheduleAssetReference, TaskOutl
from airflow.utils import timezone
from airflow.utils.session import provide_session
+from tests_common.test_utils.asserts import assert_queries_count
from tests_common.test_utils.db import clear_db_assets
pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
@@ -229,3 +232,42 @@ class TestGetAssetsEndpointPagination(TestAssets):
assert response.status_code == 200
assert len(response.json()["assets"]) == 100
+
+
+class TestGetAssetEndpoint(TestAssets):
+ @pytest.mark.parametrize(
+ "url",
+ [
+ urllib.parse.quote(
+ "s3://bucket/key/1", safe=""
+ ), # api should cover raw as well as unquoted case like legacy
+ "s3://bucket/key/1",
+ ],
+ )
+ @provide_session
+ def test_should_respond_200(self, test_client, url, session):
+ self.create_assets(num=1)
+ assert session.query(AssetModel).count() == 1
+ tz_datetime_format = self.default_time.replace("+00:00", "Z")
+ with assert_queries_count(6):
+ response = test_client.get(
+ f"/public/assets/{url}",
+ )
+ assert response.status_code == 200
+ assert response.json() == {
+ "id": 1,
+ "uri": "s3://bucket/key/1",
+ "extra": {"foo": "bar"},
+ "created_at": tz_datetime_format,
+ "updated_at": tz_datetime_format,
+ "consuming_dags": [],
+ "producing_tasks": [],
+ "aliases": [],
+ }
+
+ def test_should_respond_404(self, test_client):
+ response = test_client.get(
+ f"/public/assets/{urllib.parse.quote('s3://bucket/key', safe='')}",
+ )
+ assert response.status_code == 404
+ assert response.json()["detail"] == "The Asset with uri:
`s3://bucket/key` was not found"