amoghrajesh commented on code in PR #67902:
URL: https://github.com/apache/airflow/pull/67902#discussion_r3350488646
##########
airflow-core/src/airflow/state/metastore.py:
##########
@@ -279,18 +279,68 @@ def _get_asset_store(self, scope: AssetScope, key: str,
*, session: Session) ->
)
return row.value if row is not None else None
- def _set_asset_store(self, scope: AssetScope, key: str, value: str, *,
session: Session) -> None:
+ def _set_asset_store(
+ self,
+ scope: AssetScope,
+ key: str,
+ value: str,
+ *,
+ kind: AssetStoreWriterKind | None = None,
+ dag_id: str | None = None,
+ run_id: str | None = None,
+ task_id: str | None = None,
+ map_index: int | None = None,
+ session: Session,
+ ) -> None:
now = timezone.utcnow()
- values = dict(asset_id=scope.asset_id, key=key, value=value,
updated_at=now)
+ kind_str = kind.value if kind is not None else None
+ writer_info = dict(
+ last_updated_by_kind=kind_str,
+ last_updated_by_dag_id=dag_id,
+ last_updated_by_run_id=run_id,
+ last_updated_by_task_id=task_id,
+ last_updated_by_map_index=map_index,
+ )
+ values = dict(asset_id=scope.asset_id, key=key, value=value,
updated_at=now, **writer_info)
stmt = _build_upsert_stmt(
get_dialect_name(session),
AssetStoreModel,
["asset_id", "key"],
values,
- dict(value=value, updated_at=now),
+ dict(value=value, updated_at=now, **writer_info),
Review Comment:
Fixed. When `kind is None` (plain `set()` call), writer columns are now
excluded from
the `ON CONFLICT UPDATE` set and only `value` and `updated_at` are updated
on conflict, leaving any existing writer info untouched. Writer columns are
still included in the `INSERT` values (so new rows get `NULL` writer fields as
expected). The writer-aware path (`set_asset_store` / `aset_asset_store`)
continues to include writer columns in both the insert and update sets.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]