bugraoz93 commented on code in PR #50060:
URL: https://github.com/apache/airflow/pull/50060#discussion_r2072090239
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py:
##########
@@ -385,16 +434,43 @@ def get_asset(
session: SessionDep,
) -> AssetResponse:
"""Get an asset."""
+ # Build a subquery to be used to retrieve the latest AssetEvent by
matching timestamp
+ last_asset_event = (
+ select(func.max(AssetEvent.timestamp)).where(AssetEvent.asset_id ==
asset_id).scalar_subquery()
+ )
+
+ # Now, find the latest AssetEvent details using the subquery from above
+ asset_event_rows = session.execute(
+ select(AssetEvent.asset_id, AssetEvent.id, AssetEvent.timestamp).where(
+ AssetEvent.asset_id == asset_id, AssetEvent.timestamp ==
last_asset_event
+ )
+ )
+
+ # Retrieve the Asset; there should only be one for that asset_id
asset = session.scalar(
select(AssetModel)
.where(AssetModel.id == asset_id)
.options(joinedload(AssetModel.consuming_dags),
joinedload(AssetModel.producing_tasks))
)
+ last_asset_event_id, last_asset_event_timestamp = None, None
+
+ # Pull the event ID and timestamp from the asset_event_query
+ for row in asset_event_rows:
+ _, last_asset_event_id, last_asset_event_timestamp = row
Review Comment:
We don't need this loop if we `break` at the first iteration. We can call
the first row with `asset_event_rows[0]` while creating the dictionary.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py:
##########
@@ -385,16 +434,43 @@ def get_asset(
session: SessionDep,
) -> AssetResponse:
"""Get an asset."""
+ # Build a subquery to be used to retrieve the latest AssetEvent by
matching timestamp
+ last_asset_event = (
+ select(func.max(AssetEvent.timestamp)).where(AssetEvent.asset_id ==
asset_id).scalar_subquery()
+ )
+
+ # Now, find the latest AssetEvent details using the subquery from above
+ asset_event_rows = session.execute(
+ select(AssetEvent.asset_id, AssetEvent.id, AssetEvent.timestamp).where(
+ AssetEvent.asset_id == asset_id, AssetEvent.timestamp ==
last_asset_event
+ )
+ )
+
+ # Retrieve the Asset; there should only be one for that asset_id
asset = session.scalar(
select(AssetModel)
.where(AssetModel.id == asset_id)
.options(joinedload(AssetModel.consuming_dags),
joinedload(AssetModel.producing_tasks))
)
+ last_asset_event_id, last_asset_event_timestamp = None, None
+
+ # Pull the event ID and timestamp from the asset_event_query
+ for row in asset_event_rows:
+ _, last_asset_event_id, last_asset_event_timestamp = row
+ break # There should only be one record, but be proactive
+
if asset is None:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"The Asset with ID:
`{asset_id}` was not found")
- return AssetResponse.model_validate(asset)
+ return AssetResponse.model_validate(
+ {
+ **asset.__dict__,
+ "aliases": asset.aliases,
+ "last_asset_event_id": last_asset_event_id,
Review Comment:
For example, for loop comment, in here `asset_event_rows[0].something`
should give you the row you expected. Most probably, `asset_event_rows[0].id`
and `asset_event_rows[0].timestamp` since you skipped the first column you
selected.
Similar thing here, maybe you can follow where we use `one_or_none`.
https://github.com/apache/airflow/blob/a447474738413c29403413d33990b5af8413543d/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py#L249-L255
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]