jason810496 commented on code in PR #67902:
URL: https://github.com/apache/airflow/pull/67902#discussion_r3354260327
##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_store.py:
##########
@@ -26,12 +26,23 @@
_MAX_SERIALIZED_BYTES = 65535
+class AssetStoreLastUpdatedBy(BaseModel):
+ """Writer info for the last write to an asset store entry."""
+
+ kind: str
Review Comment:
Would it be better to annotate as `AssetStoreWriterKind` enum instead of the
broad `str` type in response?
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_store.py:
##########
@@ -153,10 +194,27 @@ def set_asset_store_by_uri(
key: Annotated[str, Query(min_length=1)],
body: AssetStorePutBody,
session: SessionDep,
+ token: TIToken = CurrentTIToken,
) -> None:
"""Set an asset store value by asset URI."""
asset_id = _resolve_asset_id_by_uri(uri, session)
- get_state_backend().set(AssetScope(asset_id=asset_id), key,
json.dumps(body.value), session=session)
+ backend = get_state_backend()
+ scope = AssetScope(asset_id=asset_id)
+ if isinstance(backend, MetastoreStoreBackend):
+ dag_id, run_id, task_id, map_index = _fetch_ti_writer_fields(token,
session)
+ backend.set_asset_store(
+ scope,
+ key,
+ json.dumps(body.value),
+ kind=AssetStoreWriterKind.TASK,
+ dag_id=dag_id,
+ run_id=run_id,
+ task_id=task_id,
+ map_index=map_index,
+ session=session,
+ )
+ else:
+ backend.set(scope, key, json.dumps(body.value), session=session)
Review Comment:
Non-blocking. The logic after `_resolve_asset_id_by_uri / name` is same for
both `put("/by-uri/value")` and `put("/by-name/value")`. Perhaps we can
introduce a helper to ensure the consistency.
##########
airflow-core/src/airflow/state/metastore.py:
##########
@@ -435,20 +488,69 @@ async def _aget_asset_store(self, scope: AssetScope, key:
str, *, session: Async
return row.value if row is not None else None
async def _aset_asset_store(
- self, scope: AssetScope, key: str, value: str, *, session: AsyncSession
+ self,
+ scope: AssetScope,
+ key: str,
+ value: str,
+ *,
+ kind: AssetStoreWriterKind | None = None,
+ dag_id: str | None = None,
+ run_id: str | None = None,
+ task_id: str | None = None,
+ map_index: int | None = None,
+ session: AsyncSession,
) -> None:
now = timezone.utcnow()
- values = dict(asset_id=scope.asset_id, key=key, value=value,
updated_at=now)
+ kind_str = kind.value if kind is not None else None
+ writer_info = dict(
+ last_updated_by_kind=kind_str,
+ last_updated_by_dag_id=dag_id,
+ last_updated_by_run_id=run_id,
+ last_updated_by_task_id=task_id,
+ last_updated_by_map_index=map_index,
+ )
+ values = dict(asset_id=scope.asset_id, key=key, value=value,
updated_at=now, **writer_info)
+ update_fields = dict(value=value, updated_at=now)
+ if kind is not None:
+ update_fields.update(writer_info)
Review Comment:
Non-blocking. The logic of computing `update_fields` here duplicate with
`_set_asset_store`.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_store.py:
##########
@@ -134,7 +172,13 @@ def set_asset_store(
session: SessionDep,
) -> None:
"""Set an asset store value. Creates or overwrites the key."""
- _get_db_backend().set(AssetScope(asset_id=asset_id), key,
json.dumps(body.value), session=session)
+ _get_db_backend().set_asset_store(
Review Comment:
IIUC, the `set_asset_store` _might_ raise the `AssertionError` exception,
would it be better to re-raise as 400 status code error. Or it will result in
500 internal error.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]