This is an automated email from the ASF dual-hosted git repository.
amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new adead2b2385 Record the writer info for every asset store write for
better cross linkage (#67902)
adead2b2385 is described below
commit adead2b2385965921e745b2d028cc47bf77ee5a9
Author: Amogh Desai <[email protected]>
AuthorDate: Thu Jun 4 17:52:13 2026 +0530
Record the writer info for every asset store write for better cross linkage
(#67902)
---
.../api_fastapi/core_api/datamodels/asset_store.py | 12 ++
.../core_api/openapi/v2-rest-api-generated.yaml | 44 ++++++
.../core_api/routes/public/asset_store.py | 52 ++++++-
.../execution_api/routes/asset_store.py | 61 ++++++++-
..._3_3_0_add_task_store_and_asset_store_tables.py | 5 +
airflow-core/src/airflow/models/asset_store.py | 10 ++
airflow-core/src/airflow/state/metastore.py | 122 ++++++++++++++++-
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 77 +++++++++++
.../airflow/ui/openapi-gen/requests/types.gen.ts | 21 +++
.../core_api/routes/public/test_asset_store.py | 56 ++++++++
.../versions/head/test_asset_store.py | 104 ++++++++++++--
airflow-core/tests/unit/state/test_metastore.py | 150 +++++++++++++++++++++
.../src/airflowctl/api/datamodels/generated.py | 27 ++++
shared/state/src/airflow_shared/state/__init__.py | 39 ++++++
14 files changed, 750 insertions(+), 30 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_store.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_store.py
index 5f5fd19388f..eecd6a6de6d 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_store.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_store.py
@@ -21,17 +21,29 @@ from datetime import datetime
from pydantic import JsonValue, field_validator
+from airflow._shared.state import AssetStoreWriterKind
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
_MAX_SERIALIZED_BYTES = 65535
+class AssetStoreLastUpdatedBy(BaseModel):
+ """Writer info for the last write to an asset store entry."""
+
+ kind: AssetStoreWriterKind
+ dag_id: str | None = None
+ run_id: str | None = None
+ task_id: str | None = None
+ map_index: int | None = None
+
+
class AssetStoreResponse(BaseModel):
"""A single asset store key/value pair with metadata."""
key: str
value: JsonValue
updated_at: datetime
+ last_updated_by: AssetStoreLastUpdatedBy | None = None
class AssetStoreCollectionResponse(BaseModel):
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index d9e1067a20f..6c0ec008322 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -11620,6 +11620,35 @@ components:
- total_entries
title: AssetStoreCollectionResponse
description: All asset store entries for an asset.
+ AssetStoreLastUpdatedBy:
+ properties:
+ kind:
+ $ref: '#/components/schemas/AssetStoreWriterKind'
+ dag_id:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Dag Id
+ run_id:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Run Id
+ task_id:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Task Id
+ map_index:
+ anyOf:
+ - type: integer
+ - type: 'null'
+ title: Map Index
+ type: object
+ required:
+ - kind
+ title: AssetStoreLastUpdatedBy
+ description: Writer info for the last write to an asset store entry.
AssetStoreResponse:
properties:
key:
@@ -11631,6 +11660,10 @@ components:
type: string
format: date-time
title: Updated At
+ last_updated_by:
+ anyOf:
+ - $ref: '#/components/schemas/AssetStoreLastUpdatedBy'
+ - type: 'null'
type: object
required:
- key
@@ -11638,6 +11671,17 @@ components:
- updated_at
title: AssetStoreResponse
description: A single asset store key/value pair with metadata.
+ AssetStoreWriterKind:
+ type: string
+ enum:
+ - task
+ - watcher
+ - api
+ title: AssetStoreWriterKind
+ description: "Identifies what kind of writer last updated an asset store
entry.\n\
+ \n``TASK`` \u2014 written by a task via the execution
API.\n``WATCHER`` \u2014\
+ \ written by a ``BaseEventTrigger`` (no task instance).\n``API``
\u2014 written\
+ \ directly through the Core API (e.g. manual admin write)."
AssetWatcherResponse:
properties:
name:
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_store.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_store.py
index 00f02bb664e..87b54c92ecd 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_store.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_store.py
@@ -22,13 +22,14 @@ from typing import Annotated
from fastapi import Depends, HTTPException, status
from sqlalchemy import select
-from airflow._shared.state import AssetScope
+from airflow._shared.state import AssetScope, AssetStoreWriterKind
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.asset_store import (
AssetStoreBody,
AssetStoreCollectionResponse,
+ AssetStoreLastUpdatedBy,
AssetStoreResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
@@ -73,6 +74,11 @@ def list_asset_store(
AssetStoreModel.key,
AssetStoreModel.value,
AssetStoreModel.updated_at,
+ AssetStoreModel.last_updated_by_kind,
+ AssetStoreModel.last_updated_by_dag_id,
+ AssetStoreModel.last_updated_by_run_id,
+ AssetStoreModel.last_updated_by_task_id,
+ AssetStoreModel.last_updated_by_map_index,
)
.where(AssetStoreModel.asset_id == asset_id)
.order_by(AssetStoreModel.key.asc())
@@ -87,7 +93,21 @@ def list_asset_store(
)
rows = session.execute(paginated).all()
entries = [
- AssetStoreResponse(key=r.key, value=json.loads(r.value),
updated_at=r.updated_at) for r in rows
+ AssetStoreResponse(
+ key=r.key,
+ value=json.loads(r.value),
+ updated_at=r.updated_at,
+ last_updated_by=AssetStoreLastUpdatedBy(
+ kind=r.last_updated_by_kind,
+ dag_id=r.last_updated_by_dag_id,
+ run_id=r.last_updated_by_run_id,
+ task_id=r.last_updated_by_task_id,
+ map_index=r.last_updated_by_map_index,
+ )
+ if r.last_updated_by_kind is not None
+ else None,
+ )
+ for r in rows
]
return AssetStoreCollectionResponse(asset_store=entries,
total_entries=total_entries)
@@ -108,6 +128,11 @@ def get_asset_store(
AssetStoreModel.key,
AssetStoreModel.value,
AssetStoreModel.updated_at,
+ AssetStoreModel.last_updated_by_kind,
+ AssetStoreModel.last_updated_by_dag_id,
+ AssetStoreModel.last_updated_by_run_id,
+ AssetStoreModel.last_updated_by_task_id,
+ AssetStoreModel.last_updated_by_map_index,
).where(
AssetStoreModel.asset_id == asset_id,
AssetStoreModel.key == key,
@@ -118,7 +143,20 @@ def get_asset_store(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Asset store key {key!r} not found",
)
- return AssetStoreResponse(key=row.key, value=json.loads(row.value),
updated_at=row.updated_at)
+ return AssetStoreResponse(
+ key=row.key,
+ value=json.loads(row.value),
+ updated_at=row.updated_at,
+ last_updated_by=AssetStoreLastUpdatedBy(
+ kind=row.last_updated_by_kind,
+ dag_id=row.last_updated_by_dag_id,
+ run_id=row.last_updated_by_run_id,
+ task_id=row.last_updated_by_task_id,
+ map_index=row.last_updated_by_map_index,
+ )
+ if row.last_updated_by_kind is not None
+ else None,
+ )
@asset_store_router.put(
@@ -134,7 +172,13 @@ def set_asset_store(
session: SessionDep,
) -> None:
"""Set an asset store value. Creates or overwrites the key."""
- _get_db_backend().set(AssetScope(asset_id=asset_id), key,
json.dumps(body.value), session=session)
+ _get_db_backend().set_asset_store(
+ AssetScope(asset_id=asset_id),
+ key,
+ json.dumps(body.value),
+ kind=AssetStoreWriterKind.API,
+ session=session,
+ )
@asset_store_router.delete(
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_store.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_store.py
index 5b94b1da2f5..925ad2efddd 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_store.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_store.py
@@ -35,15 +35,39 @@ from cadwyn import VersionedAPIRouter
from fastapi import HTTPException, Query, status
from sqlalchemy import select
-from airflow._shared.state import AssetScope
+from airflow._shared.state import AssetScope, AssetStoreWriterKind
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.execution_api.datamodels.asset_store import (
AssetStorePutBody,
AssetStoreResponse,
)
-from airflow.api_fastapi.execution_api.security import ExecutionAPIRoute
+from airflow.api_fastapi.execution_api.datamodels.token import TIToken
+from airflow.api_fastapi.execution_api.security import CurrentTIToken,
ExecutionAPIRoute
from airflow.models.asset import AssetModel
+from airflow.models.taskinstance import TaskInstance
from airflow.state import get_state_backend
+from airflow.state.metastore import MetastoreStoreBackend
+
+_TIWriterFields = tuple[str, str, str, int]
+
+
+def _fetch_ti_writer_fields(token: TIToken, session: SessionDep) ->
_TIWriterFields:
+ """Return (dag_id, run_id, task_id, map_index) for the TI identified by
the token."""
+ row = session.execute(
+ select(
+ TaskInstance.dag_id,
+ TaskInstance.run_id,
+ TaskInstance.task_id,
+ TaskInstance.map_index,
+ ).where(TaskInstance.id == token.id)
+ ).one_or_none()
+ if row is None:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail={"reason": "not_found", "message": f"Task instance
{token.id!r} not found"},
+ )
+ return row.dag_id, row.run_id, row.task_id, row.map_index
+
# TODO(AIP-103): enforce that the requesting task is registered with the asset
# (via task_inlet_asset_reference or task_outlet_asset_reference) before
@@ -97,16 +121,41 @@ def get_asset_store_by_name(
return AssetStoreResponse(value=json.loads(value))
+def _put_asset_store(
+ scope: AssetScope,
+ key: str,
+ body: AssetStorePutBody,
+ token: TIToken,
+ session: SessionDep,
+) -> None:
+ backend = get_state_backend()
+ if isinstance(backend, MetastoreStoreBackend):
+ dag_id, run_id, task_id, map_index = _fetch_ti_writer_fields(token,
session)
+ backend.set_asset_store(
+ scope,
+ key,
+ json.dumps(body.value),
+ kind=AssetStoreWriterKind.TASK,
+ dag_id=dag_id,
+ run_id=run_id,
+ task_id=task_id,
+ map_index=map_index,
+ session=session,
+ )
+ else:
+ backend.set(scope, key, json.dumps(body.value), session=session)
+
+
@router.put("/by-name/value", status_code=status.HTTP_204_NO_CONTENT)
def set_asset_store_by_name(
name: Annotated[str, Query(min_length=1)],
key: Annotated[str, Query(min_length=1)],
body: AssetStorePutBody,
session: SessionDep,
+ token: TIToken = CurrentTIToken,
) -> None:
"""Set an asset store value by asset name."""
- asset_id = _resolve_asset_id_by_name(name, session)
- get_state_backend().set(AssetScope(asset_id=asset_id), key,
json.dumps(body.value), session=session)
+ _put_asset_store(AssetScope(asset_id=_resolve_asset_id_by_name(name,
session)), key, body, token, session)
@router.delete("/by-name/value", status_code=status.HTTP_204_NO_CONTENT)
@@ -153,10 +202,10 @@ def set_asset_store_by_uri(
key: Annotated[str, Query(min_length=1)],
body: AssetStorePutBody,
session: SessionDep,
+ token: TIToken = CurrentTIToken,
) -> None:
"""Set an asset store value by asset URI."""
- asset_id = _resolve_asset_id_by_uri(uri, session)
- get_state_backend().set(AssetScope(asset_id=asset_id), key,
json.dumps(body.value), session=session)
+ _put_asset_store(AssetScope(asset_id=_resolve_asset_id_by_uri(uri,
session)), key, body, token, session)
@router.delete("/by-uri/value", status_code=status.HTTP_204_NO_CONTENT)
diff --git
a/airflow-core/src/airflow/migrations/versions/0112_3_3_0_add_task_store_and_asset_store_tables.py
b/airflow-core/src/airflow/migrations/versions/0112_3_3_0_add_task_store_and_asset_store_tables.py
index e9d88443312..f2f248edcf2 100644
---
a/airflow-core/src/airflow/migrations/versions/0112_3_3_0_add_task_store_and_asset_store_tables.py
+++
b/airflow-core/src/airflow/migrations/versions/0112_3_3_0_add_task_store_and_asset_store_tables.py
@@ -50,6 +50,11 @@ def upgrade():
sa.Column("key", sa.String(length=512), nullable=False),
sa.Column("value", sa.Text().with_variant(mysql.MEDIUMTEXT(),
"mysql"), nullable=False),
sa.Column("updated_at", UtcDateTime(), nullable=False),
+ sa.Column("last_updated_by_kind", sa.String(length=16), nullable=True),
+ sa.Column("last_updated_by_dag_id", StringID(), nullable=True),
+ sa.Column("last_updated_by_run_id", StringID(), nullable=True),
+ sa.Column("last_updated_by_task_id", StringID(), nullable=True),
+ sa.Column("last_updated_by_map_index", sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(
["asset_id"], ["asset.id"], name="asset_store_asset_fkey",
ondelete="CASCADE"
),
diff --git a/airflow-core/src/airflow/models/asset_store.py
b/airflow-core/src/airflow/models/asset_store.py
index 946553118d8..42102723b24 100644
--- a/airflow-core/src/airflow/models/asset_store.py
+++ b/airflow-core/src/airflow/models/asset_store.py
@@ -33,6 +33,10 @@ class AssetStoreModel(Base):
Not scoped to any DAG run — a watermark written in run 1 is readable by
run 2.
Rows survive until explicitly deleted or the asset itself is deleted.
+
+ ``last_updated_by_*`` columns record who last wrote this entry. They are
denormalized
+ (no FK) so that the references survives DAG run cleanup, and so cases like
watchers (``BaseEventTrigger``)
+ can write without a task instance.
"""
__tablename__ = "asset_store"
@@ -43,6 +47,12 @@ class AssetStoreModel(Base):
value: Mapped[str] = mapped_column(Text().with_variant(MEDIUMTEXT,
"mysql"), nullable=False)
updated_at: Mapped[datetime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
+ last_updated_by_kind: Mapped[str | None] = mapped_column(String(16),
nullable=True)
+ last_updated_by_dag_id: Mapped[str | None] = mapped_column(String(250,
**COLLATION_ARGS), nullable=True)
+ last_updated_by_run_id: Mapped[str | None] = mapped_column(String(250,
**COLLATION_ARGS), nullable=True)
+ last_updated_by_task_id: Mapped[str | None] = mapped_column(String(250,
**COLLATION_ARGS), nullable=True)
+ last_updated_by_map_index: Mapped[int | None] = mapped_column(Integer,
nullable=True)
+
__table_args__ = (
PrimaryKeyConstraint("asset_id", "key", name="asset_store_pkey"),
ForeignKeyConstraint(
diff --git a/airflow-core/src/airflow/state/metastore.py
b/airflow-core/src/airflow/state/metastore.py
index 137fce09a75..541752e2a05 100644
--- a/airflow-core/src/airflow/state/metastore.py
+++ b/airflow-core/src/airflow/state/metastore.py
@@ -26,7 +26,7 @@ from typing import TYPE_CHECKING
import structlog
from sqlalchemy import delete, select
-from airflow._shared.state import AssetScope, BaseStoreBackend, StoreScope,
TaskScope
+from airflow._shared.state import AssetScope, AssetStoreWriterKind,
BaseStoreBackend, StoreScope, TaskScope
from airflow._shared.timezones import timezone
from airflow.configuration import conf
from airflow.models.asset_store import AssetStoreModel
@@ -84,6 +84,28 @@ def _build_upsert_stmt(
return stmt
+def _build_asset_writer_fields(
+ kind: AssetStoreWriterKind | None,
+ dag_id: str | None,
+ run_id: str | None,
+ task_id: str | None,
+ map_index: int | None,
+ *,
+ value: str,
+ now: datetime,
+) -> tuple[dict, dict]:
+ kind_str = kind.value if kind is not None else None
+ writer_info = dict(
+ last_updated_by_kind=kind_str,
+ last_updated_by_dag_id=dag_id,
+ last_updated_by_run_id=run_id,
+ last_updated_by_task_id=task_id,
+ last_updated_by_map_index=map_index,
+ )
+ update_fields = dict(value=value, updated_at=now, **(writer_info if kind
is not None else {}))
+ return writer_info, update_fields
+
+
class MetastoreStoreBackend(BaseStoreBackend):
"""Default state backend for tasks and assets. Stores task and asset state
in the Airflow metadata database."""
@@ -279,18 +301,63 @@ class MetastoreStoreBackend(BaseStoreBackend):
)
return row.value if row is not None else None
- def _set_asset_store(self, scope: AssetScope, key: str, value: str, *,
session: Session) -> None:
+ def _set_asset_store(
+ self,
+ scope: AssetScope,
+ key: str,
+ value: str,
+ *,
+ kind: AssetStoreWriterKind | None = None,
+ dag_id: str | None = None,
+ run_id: str | None = None,
+ task_id: str | None = None,
+ map_index: int | None = None,
+ session: Session,
+ ) -> None:
now = timezone.utcnow()
- values = dict(asset_id=scope.asset_id, key=key, value=value,
updated_at=now)
+ writer_info, update_fields = _build_asset_writer_fields(
+ kind, dag_id, run_id, task_id, map_index, value=value, now=now
+ )
+ values = dict(asset_id=scope.asset_id, key=key, value=value,
updated_at=now, **writer_info)
stmt = _build_upsert_stmt(
get_dialect_name(session),
AssetStoreModel,
["asset_id", "key"],
values,
- dict(value=value, updated_at=now),
+ update_fields,
)
session.execute(stmt)
+ @provide_session
+ def set_asset_store(
+ self,
+ scope: AssetScope,
+ key: str,
+ value: str,
+ *,
+ kind: AssetStoreWriterKind,
+ dag_id: str | None = None,
+ run_id: str | None = None,
+ task_id: str | None = None,
+ map_index: int | None = None,
+ session: Session | None = NEW_SESSION,
+ ) -> None:
+ """Write an asset store entry, recording who made the write."""
+ kind.validate_writer_fields(dag_id, run_id, task_id, map_index)
+ if TYPE_CHECKING:
+ assert session is not None
+ self._set_asset_store(
+ scope,
+ key,
+ value,
+ kind=kind,
+ dag_id=dag_id,
+ run_id=run_id,
+ task_id=task_id,
+ map_index=map_index,
+ session=session,
+ )
+
def _delete_asset_store(self, scope: AssetScope, key: str, *, session:
Session) -> None:
session.execute(
delete(AssetStoreModel).where(
@@ -435,20 +502,61 @@ class MetastoreStoreBackend(BaseStoreBackend):
return row.value if row is not None else None
async def _aset_asset_store(
- self, scope: AssetScope, key: str, value: str, *, session: AsyncSession
+ self,
+ scope: AssetScope,
+ key: str,
+ value: str,
+ *,
+ kind: AssetStoreWriterKind | None = None,
+ dag_id: str | None = None,
+ run_id: str | None = None,
+ task_id: str | None = None,
+ map_index: int | None = None,
+ session: AsyncSession,
) -> None:
now = timezone.utcnow()
- values = dict(asset_id=scope.asset_id, key=key, value=value,
updated_at=now)
+ writer_info, update_fields = _build_asset_writer_fields(
+ kind, dag_id, run_id, task_id, map_index, value=value, now=now
+ )
+ values = dict(asset_id=scope.asset_id, key=key, value=value,
updated_at=now, **writer_info)
# get_dialect_name expects a sync Session; sync_session is the
underlying Session the async wrapper delegates to
stmt = _build_upsert_stmt(
get_dialect_name(session.sync_session),
AssetStoreModel,
["asset_id", "key"],
values,
- dict(value=value, updated_at=now),
+ update_fields,
)
await session.execute(stmt)
+ async def aset_asset_store(
+ self,
+ scope: AssetScope,
+ key: str,
+ value: str,
+ *,
+ kind: AssetStoreWriterKind,
+ dag_id: str | None = None,
+ run_id: str | None = None,
+ task_id: str | None = None,
+ map_index: int | None = None,
+ session: AsyncSession | None = None,
+ ) -> None:
+ """Write an asset store entry, recording who made the write."""
+ kind.validate_writer_fields(dag_id, run_id, task_id, map_index)
+ async with _async_session(session) as s:
+ await self._aset_asset_store(
+ scope,
+ key,
+ value,
+ kind=kind,
+ dag_id=dag_id,
+ run_id=run_id,
+ task_id=task_id,
+ map_index=map_index,
+ session=s,
+ )
+
async def _adelete_asset_store(self, scope: AssetScope, key: str, *,
session: AsyncSession) -> None:
await session.execute(
delete(AssetStoreModel).where(
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 14a60c09ed6..9acda604e28 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -414,6 +414,62 @@ export const $AssetStoreCollectionResponse = {
description: 'All asset store entries for an asset.'
} as const;
+export const $AssetStoreLastUpdatedBy = {
+ properties: {
+ kind: {
+ '$ref': '#/components/schemas/AssetStoreWriterKind'
+ },
+ dag_id: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Dag Id'
+ },
+ run_id: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Run Id'
+ },
+ task_id: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Task Id'
+ },
+ map_index: {
+ anyOf: [
+ {
+ type: 'integer'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Map Index'
+ }
+ },
+ type: 'object',
+ required: ['kind'],
+ title: 'AssetStoreLastUpdatedBy',
+ description: 'Writer info for the last write to an asset store entry.'
+} as const;
+
export const $AssetStoreResponse = {
properties: {
key: {
@@ -427,6 +483,16 @@ export const $AssetStoreResponse = {
type: 'string',
format: 'date-time',
title: 'Updated At'
+ },
+ last_updated_by: {
+ anyOf: [
+ {
+ '$ref': '#/components/schemas/AssetStoreLastUpdatedBy'
+ },
+ {
+ type: 'null'
+ }
+ ]
}
},
type: 'object',
@@ -435,6 +501,17 @@ export const $AssetStoreResponse = {
description: 'A single asset store key/value pair with metadata.'
} as const;
+export const $AssetStoreWriterKind = {
+ type: 'string',
+ enum: ['task', 'watcher', 'api'],
+ title: 'AssetStoreWriterKind',
+ description: `Identifies what kind of writer last updated an asset store
entry.
+
+\`\`TASK\`\` — written by a task via the execution API.
+\`\`WATCHER\`\` — written by a \`\`BaseEventTrigger\`\` (no task instance).
+\`\`API\`\` — written directly through the Core API (e.g. manual admin write).`
+} as const;
+
export const $AssetWatcherResponse = {
properties: {
name: {
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 16d14d2fe96..4f13f95a2f8 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -111,6 +111,17 @@ export type AssetStoreCollectionResponse = {
total_entries: number;
};
+/**
+ * Writer info for the last write to an asset store entry.
+ */
+export type AssetStoreLastUpdatedBy = {
+ kind: AssetStoreWriterKind;
+ dag_id?: string | null;
+ run_id?: string | null;
+ task_id?: string | null;
+ map_index?: number | null;
+};
+
/**
* A single asset store key/value pair with metadata.
*/
@@ -118,8 +129,18 @@ export type AssetStoreResponse = {
key: string;
value: JsonValue;
updated_at: string;
+ last_updated_by?: AssetStoreLastUpdatedBy | null;
};
+/**
+ * Identifies what kind of writer last updated an asset store entry.
+ *
+ * ``TASK`` — written by a task via the execution API.
+ * ``WATCHER`` — written by a ``BaseEventTrigger`` (no task instance).
+ * ``API`` — written directly through the Core API (e.g. manual admin write).
+ */
+export type AssetStoreWriterKind = 'task' | 'watcher' | 'api';
+
/**
* Asset watcher serializer for responses.
*/
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_store.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_store.py
index 8345386b2fd..988cf1ddbee 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_store.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_store.py
@@ -110,6 +110,28 @@ class TestListAssetState(TestAssetStateEndpoint):
assert "updated_at" in item
assert item["key"] == "watermark"
+ def test_last_updated_by_returned_when_set(self, test_client,
create_task_instance):
+ ti = create_task_instance()
+ row = AssetStoreModel(
+ asset_id=self.asset.id,
+ key="watermark",
+ value='"v"',
+ last_updated_by_kind="task",
+ last_updated_by_dag_id=ti.dag_id,
+ last_updated_by_run_id=ti.run_id,
+ last_updated_by_task_id=ti.task_id,
+ last_updated_by_map_index=ti.map_index,
+ )
+ self._session.add(row)
+ self._session.commit()
+
+ item = test_client.get(self._base_url).json()["asset_store"][0]
+ assert item["last_updated_by"]["kind"] == "task"
+ assert item["last_updated_by"]["dag_id"] == ti.dag_id
+ assert item["last_updated_by"]["run_id"] == ti.run_id
+ assert item["last_updated_by"]["task_id"] == ti.task_id
+ assert item["last_updated_by"]["map_index"] == ti.map_index
+
def test_pagination_limit(self, test_client):
for k in ("watermark", "file_count", "last_run"):
_create_asset_state(self._session, self.asset.id, k, "v")
@@ -145,6 +167,29 @@ class TestGetAssetState(TestAssetStateEndpoint):
assert data["key"] == "watermark"
assert data["value"] == "2026-05-01"
assert "updated_at" in data
+ assert data["last_updated_by"] is None
+
+ def test_last_updated_by_returned_in_get(self, test_client,
create_task_instance):
+ ti = create_task_instance()
+ row = AssetStoreModel(
+ asset_id=self.asset.id,
+ key="watermark",
+ value='"v"',
+ last_updated_by_kind="task",
+ last_updated_by_dag_id=ti.dag_id,
+ last_updated_by_run_id=ti.run_id,
+ last_updated_by_task_id=ti.task_id,
+ last_updated_by_map_index=ti.map_index,
+ )
+ self._session.add(row)
+ self._session.commit()
+
+ data = test_client.get(f"{self._base_url}/watermark").json()
+ assert data["last_updated_by"]["kind"] == "task"
+ assert data["last_updated_by"]["dag_id"] == ti.dag_id
+ assert data["last_updated_by"]["run_id"] == ti.run_id
+ assert data["last_updated_by"]["task_id"] == ti.task_id
+ assert data["last_updated_by"]["map_index"] == ti.map_index
def test_missing_key_returns_404(self, test_client):
assert test_client.get(f"{self._base_url}/nonexistent").status_code ==
404
@@ -209,6 +254,17 @@ class TestSetAssetState(TestAssetStateEndpoint):
assert row is not None
assert row.value == expected_db
+ def test_put_records_api_kind(self, test_client):
+ """PUT via the Core API sets last_updated_by.kind='api' in the
response."""
+ test_client.put(f"{self._base_url}/watermark", json={"value": "v"})
+
+ data = test_client.get(f"{self._base_url}/watermark").json()
+ assert data["last_updated_by"]["kind"] == "api"
+ assert data["last_updated_by"]["dag_id"] is None
+ assert data["last_updated_by"]["run_id"] is None
+ assert data["last_updated_by"]["task_id"] is None
+ assert data["last_updated_by"]["map_index"] is None
+
@pytest.mark.parametrize("value", [42, True, {"rows": 100}, [1, "two"],
"hello"])
def test_core_api_write_read_roundtrip(self, test_client, value):
"""Core API write then Core API read returns the same native value."""
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_store.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_store.py
index 325b9decce7..1b8cf5382c2 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_store.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_store.py
@@ -20,19 +20,35 @@ import json
from typing import TYPE_CHECKING
import pytest
+from fastapi import Request
from sqlalchemy import delete, select
+from airflow.api_fastapi.execution_api.datamodels.token import TIClaims,
TIToken
+from airflow.api_fastapi.execution_api.security import require_auth
from airflow.models.asset import AssetActive, AssetModel
from airflow.models.asset_store import AssetStoreModel
from airflow.utils.session import create_session
if TYPE_CHECKING:
+ from fastapi import FastAPI
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
+ from airflow.models.taskinstance import TaskInstance
+
+ from tests_common.pytest_plugin import CreateTaskInstance
+
pytestmark = pytest.mark.db_test
_BY_NAME_VALUE = "/execution/store/asset/by-name/value"
+
+
+def _create_asset_store_row(asset_id: int, key: str, value: str) -> None:
+ """Insert an AssetStoreModel row directly in the database."""
+ with create_session() as s:
+ s.add(AssetStoreModel(asset_id=asset_id, key=key,
value=json.dumps(value)))
+
+
_BY_NAME_CLEAR = "/execution/store/asset/by-name/clear"
_BY_URI_VALUE = "/execution/store/asset/by-uri/value"
_BY_URI_CLEAR = "/execution/store/asset/by-uri/clear"
@@ -66,9 +82,7 @@ def inactive_asset(session: Session) -> AssetModel:
class TestGetAssetStateByName:
def test_get_returns_value(self, client: TestClient, asset: AssetModel):
- client.put(
- _BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"},
json={"value": "2026-04-29"}
- )
+ _create_asset_store_row(asset.id, "watermark", "2026-04-29")
response = client.get(_BY_NAME_VALUE, params={"name": asset.name,
"key": "watermark"})
@@ -88,7 +102,7 @@ class TestGetAssetStateByName:
session.add(AssetActive.for_asset(slashed))
session.commit()
- client.put(_BY_NAME_VALUE, params={"name": slashed.name, "key": "wm"},
json={"value": "x"})
+ _create_asset_store_row(slashed.id, "wm", "x")
response = client.get(_BY_NAME_VALUE, params={"name": slashed.name,
"key": "wm"})
assert response.status_code == 200
@@ -101,6 +115,20 @@ class TestGetAssetStateByName:
class TestPutAssetStateByName:
+ @pytest.fixture(autouse=True)
+ def setup_ti_auth(
+ self,
+ exec_app: FastAPI,
+ create_task_instance: CreateTaskInstance,
+ ):
+ """Create a real TI and wire the mock auth to use its UUID so the
route can look up writer info."""
+ self._ti: TaskInstance = create_task_instance()
+
+ async def _auth(request: Request) -> TIToken:
+ return TIToken(id=self._ti.id, claims=TIClaims(scope="execution"))
+
+ exec_app.dependency_overrides[require_auth] = _auth
+
def test_put_creates_row(self, client: TestClient, asset: AssetModel,
session: Session):
response = client.put(
_BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"},
json={"value": "2026-04-29"}
@@ -117,6 +145,26 @@ class TestPutAssetStateByName:
# DB stores JSON-encoded string
assert row.value == '"2026-04-29"'
+ def test_put_records_writer(self, client: TestClient, asset: AssetModel,
session: Session):
+ """PUT writes about writer fields resolved from the JWT token's TI."""
+ response = client.put(
+ _BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"},
json={"value": "v"}
+ )
+ assert response.status_code == 204
+
+ row = session.scalar(
+ select(AssetStoreModel).where(
+ AssetStoreModel.asset_id == asset.id,
+ AssetStoreModel.key == "watermark",
+ )
+ )
+ assert row is not None
+ assert row.last_updated_by_kind == "task"
+ assert row.last_updated_by_dag_id == self._ti.dag_id
+ assert row.last_updated_by_run_id == self._ti.run_id
+ assert row.last_updated_by_task_id == self._ti.task_id
+ assert row.last_updated_by_map_index == self._ti.map_index
+
def test_put_int_value_roundtrip(self, client: TestClient, asset:
AssetModel):
response = client.put(
_BY_NAME_VALUE, params={"name": asset.name, "key": "total_runs"},
json={"value": 5}
@@ -191,9 +239,7 @@ class TestPutAssetStateByName:
class TestDeleteAssetStateByName:
def test_delete_removes_key(self, client: TestClient, asset: AssetModel):
- client.put(
- _BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"},
json={"value": "2026-04-29"}
- )
+ _create_asset_store_row(asset.id, "watermark", "2026-04-29")
response = client.delete(_BY_NAME_VALUE, params={"name": asset.name,
"key": "watermark"})
@@ -209,7 +255,7 @@ class TestDeleteAssetStateByName:
class TestClearAssetStateByName:
def test_clear_removes_all_keys(self, client: TestClient, asset:
AssetModel):
for k, v in [("watermark", "a"), ("last_id", "b"), ("schema_hash",
"c")]:
- client.put(_BY_NAME_VALUE, params={"name": asset.name, "key": k},
json={"value": v})
+ _create_asset_store_row(asset.id, k, v)
response = client.delete(_BY_NAME_CLEAR, params={"name": asset.name})
@@ -221,9 +267,7 @@ class TestClearAssetStateByName:
class TestGetAssetStateByUri:
def test_get_returns_value(self, client: TestClient, asset: AssetModel):
- client.put(
- _BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"},
json={"value": "2026-04-29"}
- )
+ _create_asset_store_row(asset.id, "watermark", "2026-04-29")
response = client.get(_BY_URI_VALUE, params={"uri": asset.uri, "key":
"watermark"})
@@ -242,6 +286,20 @@ class TestGetAssetStateByUri:
class TestPutAssetStateByUri:
+ @pytest.fixture(autouse=True)
+ def setup_ti_auth(
+ self,
+ exec_app: FastAPI,
+ create_task_instance: CreateTaskInstance,
+ ):
+ """Create a real TI and wire the mock auth to use its UUID so the
route can look up writer info."""
+ self._ti: TaskInstance = create_task_instance()
+
+ async def _auth(request: Request) -> TIToken:
+ return TIToken(id=self._ti.id, claims=TIClaims(scope="execution"))
+
+ exec_app.dependency_overrides[require_auth] = _auth
+
def test_put_creates_row(self, client: TestClient, asset: AssetModel,
session: Session):
response = client.put(
_BY_URI_VALUE, params={"uri": asset.uri, "key": "watermark"},
json={"value": "2026-04-29"}
@@ -257,6 +315,26 @@ class TestPutAssetStateByUri:
assert row is not None
assert row.value == '"2026-04-29"'
+ def test_put_records_writer(self, client: TestClient, asset: AssetModel,
session: Session):
+ """PUT writes writer fields resolved from the JWT token's TI."""
+ response = client.put(
+ _BY_URI_VALUE, params={"uri": asset.uri, "key": "watermark"},
json={"value": "v"}
+ )
+ assert response.status_code == 204
+
+ row = session.scalar(
+ select(AssetStoreModel).where(
+ AssetStoreModel.asset_id == asset.id,
+ AssetStoreModel.key == "watermark",
+ )
+ )
+ assert row is not None
+ assert row.last_updated_by_kind == "task"
+ assert row.last_updated_by_dag_id == self._ti.dag_id
+ assert row.last_updated_by_run_id == self._ti.run_id
+ assert row.last_updated_by_task_id == self._ti.task_id
+ assert row.last_updated_by_map_index == self._ti.map_index
+
def test_put_unknown_uri_returns_404(self, client: TestClient):
response = client.put(
_BY_URI_VALUE, params={"uri": "s3://nonexistent/path", "key":
"wm"}, json={"value": "x"}
@@ -267,7 +345,7 @@ class TestPutAssetStateByUri:
class TestDeleteAssetStateByUri:
def test_delete_removes_key(self, client: TestClient, asset: AssetModel):
- client.put(_BY_URI_VALUE, params={"uri": asset.uri, "key":
"watermark"}, json={"value": "2026-04-29"})
+ _create_asset_store_row(asset.id, "watermark", "2026-04-29")
response = client.delete(_BY_URI_VALUE, params={"uri": asset.uri,
"key": "watermark"})
@@ -278,7 +356,7 @@ class TestDeleteAssetStateByUri:
class TestClearAssetStateByUri:
def test_clear_removes_all_keys(self, client: TestClient, asset:
AssetModel):
for k, v in [("watermark", "a"), ("last_id", "b")]:
- client.put(_BY_URI_VALUE, params={"uri": asset.uri, "key": k},
json={"value": v})
+ _create_asset_store_row(asset.id, k, v)
response = client.delete(_BY_URI_CLEAR, params={"uri": asset.uri})
diff --git a/airflow-core/tests/unit/state/test_metastore.py
b/airflow-core/tests/unit/state/test_metastore.py
index 15a8984af64..1f7883f3e0f 100644
--- a/airflow-core/tests/unit/state/test_metastore.py
+++ b/airflow-core/tests/unit/state/test_metastore.py
@@ -25,6 +25,7 @@ from unittest.mock import patch
import pytest
from sqlalchemy import Delete, select
+from airflow._shared.state import AssetStoreWriterKind
from airflow._shared.timezones import timezone
from airflow.configuration import conf
from airflow.models.asset import AssetModel
@@ -429,6 +430,130 @@ class TestMetastoreStoreBackendAssetScope:
assert backend.get(scope2, "watermark", session=session) is None
+ def test_set_asset_store_writes_writer(
+ self, backend: MetastoreStoreBackend, asset_committed: AssetModel,
create_task_instance
+ ):
+ ti = create_task_instance()
+ scope = AssetScope(asset_id=asset_committed.id)
+ backend.set_asset_store(
+ scope,
+ "watermark",
+ "v",
+ kind=AssetStoreWriterKind.TASK,
+ dag_id=ti.dag_id,
+ run_id=ti.run_id,
+ task_id=ti.task_id,
+ map_index=ti.map_index,
+ )
+
+ with create_session() as s:
+ row =
s.scalar(select(AssetStoreModel).where(AssetStoreModel.asset_id ==
asset_committed.id))
+ assert row is not None
+ assert row.last_updated_by_kind == AssetStoreWriterKind.TASK.value
+ assert row.last_updated_by_dag_id == ti.dag_id
+ assert row.last_updated_by_run_id == ti.run_id
+ assert row.last_updated_by_task_id == ti.task_id
+ assert row.last_updated_by_map_index == ti.map_index
+
+ @pytest.mark.backend("postgres", "mysql", "sqlite")
+ def test_set_asset_store_upsert_updates_writer(
+ self, backend: MetastoreStoreBackend, asset_committed: AssetModel,
create_task_instance
+ ):
+ ti1 = create_task_instance(task_id="task1", dag_id="dag1")
+ ti2 = create_task_instance(task_id="task2", dag_id="dag2")
+ scope = AssetScope(asset_id=asset_committed.id)
+ backend.set_asset_store(
+ scope,
+ "watermark",
+ "v1",
+ kind=AssetStoreWriterKind.TASK,
+ dag_id=ti1.dag_id,
+ run_id=ti1.run_id,
+ task_id=ti1.task_id,
+ map_index=ti1.map_index,
+ )
+ backend.set_asset_store(
+ scope,
+ "watermark",
+ "v2",
+ kind=AssetStoreWriterKind.TASK,
+ dag_id=ti2.dag_id,
+ run_id=ti2.run_id,
+ task_id=ti2.task_id,
+ map_index=ti2.map_index,
+ )
+
+ with create_session() as s:
+ row =
s.scalar(select(AssetStoreModel).where(AssetStoreModel.asset_id ==
asset_committed.id))
+ assert row is not None
+ assert row.value == "v2"
+ assert row.last_updated_by_dag_id == ti2.dag_id
+ assert row.last_updated_by_task_id == ti2.task_id
+
+ def test_set_stores_null_writer(
+ self, session: Session, backend: MetastoreStoreBackend, asset:
AssetModel
+ ):
+ scope = AssetScope(asset_id=asset.id)
+ backend.set(scope, "watermark", "v", session=session)
+ session.flush()
+
+ row =
session.scalar(select(AssetStoreModel).where(AssetStoreModel.asset_id ==
asset.id))
+ assert row is not None
+ assert row.last_updated_by_kind is None
+ assert row.last_updated_by_dag_id is None
+ assert row.last_updated_by_task_id is None
+
+ def test_set_asset_store_task_kind_requires_all_fields(
+ self, backend: MetastoreStoreBackend, asset_committed: AssetModel,
create_task_instance
+ ):
+ ti = create_task_instance()
+ scope = AssetScope(asset_id=asset_committed.id)
+ with pytest.raises(ValueError, match="kind='task' requires"):
+ backend.set_asset_store(
+ scope,
+ "watermark",
+ "v",
+ kind=AssetStoreWriterKind.TASK,
+ dag_id=ti.dag_id,
+ run_id=ti.run_id,
+ task_id=None,
+ map_index=ti.map_index,
+ )
+
+ def test_set_asset_store_watcher_kind_rejects_task_fields(
+ self, backend: MetastoreStoreBackend, asset_committed: AssetModel,
create_task_instance
+ ):
+ ti = create_task_instance()
+ scope = AssetScope(asset_id=asset_committed.id)
+ with pytest.raises(ValueError, match="kind='watcher' must not carry
task fields"):
+ backend.set_asset_store(
+ scope,
+ "watermark",
+ "v",
+ kind=AssetStoreWriterKind.WATCHER,
+ dag_id=ti.dag_id,
+ run_id=None,
+ task_id=None,
+ map_index=None,
+ )
+
+ def test_set_asset_store_api_kind_rejects_task_fields(
+ self, backend: MetastoreStoreBackend, asset_committed: AssetModel,
create_task_instance
+ ):
+ ti = create_task_instance()
+ scope = AssetScope(asset_id=asset_committed.id)
+ with pytest.raises(ValueError, match="kind='api' must not carry task
fields"):
+ backend.set_asset_store(
+ scope,
+ "watermark",
+ "v",
+ kind=AssetStoreWriterKind.API,
+ dag_id=ti.dag_id,
+ run_id=None,
+ task_id=None,
+ map_index=None,
+ )
+
def test_cleanup_does_not_touch_asset_state(
self, session: Session, backend: MetastoreStoreBackend, asset:
AssetModel
):
@@ -510,6 +635,31 @@ class TestMetastoreStoreBackendAsync:
assert await backend.aget(scope, "watermark") is None
assert await backend.aget(scope, "file_count") is None
+ async def test_aset_asset_store_writes_writer(
+ self, backend: MetastoreStoreBackend, asset_committed: AssetModel,
create_task_instance
+ ):
+ ti = create_task_instance()
+ scope = AssetScope(asset_id=asset_committed.id)
+ await backend.aset_asset_store(
+ scope,
+ "watermark",
+ "v",
+ kind=AssetStoreWriterKind.TASK,
+ dag_id=ti.dag_id,
+ run_id=ti.run_id,
+ task_id=ti.task_id,
+ map_index=ti.map_index,
+ )
+
+ async with create_session_async() as s:
+ row = await s.scalar(
+ select(AssetStoreModel).where(AssetStoreModel.asset_id ==
asset_committed.id)
+ )
+ assert row is not None
+ assert row.last_updated_by_kind == AssetStoreWriterKind.TASK.value
+ assert row.last_updated_by_dag_id == ti.dag_id
+ assert row.last_updated_by_task_id == ti.task_id
+
async def test_aset_task_raises_for_missing_dag_run(self, backend:
MetastoreStoreBackend):
scope = TaskScope(dag_id="nonexistent_dag", run_id="nonexistent_run",
task_id=TASK_ID)
with pytest.raises(ValueError, match="No DagRun found"):
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 8fec9dd81a3..fff609bcaea 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -49,6 +49,20 @@ class AssetAliasResponse(BaseModel):
group: Annotated[str, Field(title="Group")]
+class AssetStoreWriterKind(str, Enum):
+ """
+ Identifies what kind of writer last updated an asset store entry.
+
+ ``TASK`` — written by a task via the execution API.
+ ``WATCHER`` — written by a ``BaseEventTrigger`` (no task instance).
+ ``API`` — written directly through the Core API (e.g. manual admin write).
+ """
+
+ TASK = "task"
+ WATCHER = "watcher"
+ API = "api"
+
+
class AssetWatcherResponse(BaseModel):
"""
Asset watcher serializer for responses.
@@ -1326,6 +1340,18 @@ class AssetStoreBody(BaseModel):
value: JsonValue
+class AssetStoreLastUpdatedBy(BaseModel):
+ """
+ Writer info for the last write to an asset store entry.
+ """
+
+ kind: AssetStoreWriterKind
+ dag_id: Annotated[str | None, Field(title="Dag Id")] = None
+ run_id: Annotated[str | None, Field(title="Run Id")] = None
+ task_id: Annotated[str | None, Field(title="Task Id")] = None
+ map_index: Annotated[int | None, Field(title="Map Index")] = None
+
+
class AssetStoreResponse(BaseModel):
"""
A single asset store key/value pair with metadata.
@@ -1334,6 +1360,7 @@ class AssetStoreResponse(BaseModel):
key: Annotated[str, Field(title="Key")]
value: JsonValue
updated_at: Annotated[datetime, Field(title="Updated At")]
+ last_updated_by: AssetStoreLastUpdatedBy | None = None
class BackfillPostBody(BaseModel):
diff --git a/shared/state/src/airflow_shared/state/__init__.py
b/shared/state/src/airflow_shared/state/__init__.py
index 7ae5ed9c5f2..3c963acd18d 100644
--- a/shared/state/src/airflow_shared/state/__init__.py
+++ b/shared/state/src/airflow_shared/state/__init__.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import json
from abc import ABC, abstractmethod
from dataclasses import dataclass
+from enum import Enum
from typing import TYPE_CHECKING
if TYPE_CHECKING:
@@ -73,6 +74,44 @@ class AssetScope:
StoreScope = TaskScope | AssetScope
+class AssetStoreWriterKind(str, Enum):
+ """
+ Identifies what kind of writer last updated an asset store entry.
+
+ ``TASK`` — written by a task via the execution API.
+ ``WATCHER`` — written by a ``BaseEventTrigger`` (no task instance).
+ ``API`` — written directly through the Core API (e.g. manual admin write).
+ """
+
+ TASK = "task"
+ WATCHER = "watcher"
+ API = "api"
+
+ def validate_writer_fields(
+ self,
+ dag_id: str | None,
+ run_id: str | None,
+ task_id: str | None,
+ map_index: int | None,
+ ) -> None:
+ task_fields = (dag_id, run_id, task_id, map_index)
+ match self:
+ case AssetStoreWriterKind.TASK:
+ if any(f is None for f in task_fields):
+ raise ValueError(
+ f"kind='task' requires dag_id, run_id, task_id, and
map_index to all be set; "
+ f"got dag_id={dag_id!r}, run_id={run_id!r},
task_id={task_id!r}, map_index={map_index!r}"
+ )
+ case AssetStoreWriterKind.WATCHER | AssetStoreWriterKind.API:
+ if any(f is not None for f in task_fields):
+ raise ValueError(
+ f"kind={self.value!r} must not carry task fields; "
+ f"got dag_id={dag_id!r}, run_id={run_id!r},
task_id={task_id!r}, map_index={map_index!r}"
+ )
+ case _:
+ raise AssertionError(f"Unhandled AssetStoreWriterKind:
{self!r}")
+
+
class BaseStoreBackend(ABC):
"""
Abstract backend for reading and writing task and asset state.