This is an automated email from the ASF dual-hosted git repository.

amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new adead2b2385 Record the writer info for every asset store write for 
better cross linkage (#67902)
adead2b2385 is described below

commit adead2b2385965921e745b2d028cc47bf77ee5a9
Author: Amogh Desai <[email protected]>
AuthorDate: Thu Jun 4 17:52:13 2026 +0530

    Record the writer info for every asset store write for better cross linkage 
(#67902)
---
 .../api_fastapi/core_api/datamodels/asset_store.py |  12 ++
 .../core_api/openapi/v2-rest-api-generated.yaml    |  44 ++++++
 .../core_api/routes/public/asset_store.py          |  52 ++++++-
 .../execution_api/routes/asset_store.py            |  61 ++++++++-
 ..._3_3_0_add_task_store_and_asset_store_tables.py |   5 +
 airflow-core/src/airflow/models/asset_store.py     |  10 ++
 airflow-core/src/airflow/state/metastore.py        | 122 ++++++++++++++++-
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts |  77 +++++++++++
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |  21 +++
 .../core_api/routes/public/test_asset_store.py     |  56 ++++++++
 .../versions/head/test_asset_store.py              | 104 ++++++++++++--
 airflow-core/tests/unit/state/test_metastore.py    | 150 +++++++++++++++++++++
 .../src/airflowctl/api/datamodels/generated.py     |  27 ++++
 shared/state/src/airflow_shared/state/__init__.py  |  39 ++++++
 14 files changed, 750 insertions(+), 30 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_store.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_store.py
index 5f5fd19388f..eecd6a6de6d 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_store.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_store.py
@@ -21,17 +21,29 @@ from datetime import datetime
 
 from pydantic import JsonValue, field_validator
 
+from airflow._shared.state import AssetStoreWriterKind
 from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
 
 _MAX_SERIALIZED_BYTES = 65535
 
 
+class AssetStoreLastUpdatedBy(BaseModel):
+    """Writer info for the last write to an asset store entry."""
+
+    kind: AssetStoreWriterKind
+    dag_id: str | None = None
+    run_id: str | None = None
+    task_id: str | None = None
+    map_index: int | None = None
+
+
 class AssetStoreResponse(BaseModel):
     """A single asset store key/value pair with metadata."""
 
     key: str
     value: JsonValue
     updated_at: datetime
+    last_updated_by: AssetStoreLastUpdatedBy | None = None
 
 
 class AssetStoreCollectionResponse(BaseModel):
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index d9e1067a20f..6c0ec008322 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -11620,6 +11620,35 @@ components:
       - total_entries
       title: AssetStoreCollectionResponse
       description: All asset store entries for an asset.
+    AssetStoreLastUpdatedBy:
+      properties:
+        kind:
+          $ref: '#/components/schemas/AssetStoreWriterKind'
+        dag_id:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Dag Id
+        run_id:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Run Id
+        task_id:
+          anyOf:
+          - type: string
+          - type: 'null'
+          title: Task Id
+        map_index:
+          anyOf:
+          - type: integer
+          - type: 'null'
+          title: Map Index
+      type: object
+      required:
+      - kind
+      title: AssetStoreLastUpdatedBy
+      description: Writer info for the last write to an asset store entry.
     AssetStoreResponse:
       properties:
         key:
@@ -11631,6 +11660,10 @@ components:
           type: string
           format: date-time
           title: Updated At
+        last_updated_by:
+          anyOf:
+          - $ref: '#/components/schemas/AssetStoreLastUpdatedBy'
+          - type: 'null'
       type: object
       required:
       - key
@@ -11638,6 +11671,17 @@ components:
       - updated_at
       title: AssetStoreResponse
       description: A single asset store key/value pair with metadata.
+    AssetStoreWriterKind:
+      type: string
+      enum:
+      - task
+      - watcher
+      - api
+      title: AssetStoreWriterKind
+      description: "Identifies what kind of writer last updated an asset store 
entry.\n\
+        \n``TASK`` \u2014 written by a task via the execution 
API.\n``WATCHER`` \u2014\
+        \ written by a ``BaseEventTrigger`` (no task instance).\n``API`` 
\u2014 written\
+        \ directly through the Core API (e.g. manual admin write)."
     AssetWatcherResponse:
       properties:
         name:
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_store.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_store.py
index 00f02bb664e..87b54c92ecd 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_store.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_store.py
@@ -22,13 +22,14 @@ from typing import Annotated
 from fastapi import Depends, HTTPException, status
 from sqlalchemy import select
 
-from airflow._shared.state import AssetScope
+from airflow._shared.state import AssetScope, AssetStoreWriterKind
 from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
 from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset
 from airflow.api_fastapi.common.router import AirflowRouter
 from airflow.api_fastapi.core_api.datamodels.asset_store import (
     AssetStoreBody,
     AssetStoreCollectionResponse,
+    AssetStoreLastUpdatedBy,
     AssetStoreResponse,
 )
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
@@ -73,6 +74,11 @@ def list_asset_store(
             AssetStoreModel.key,
             AssetStoreModel.value,
             AssetStoreModel.updated_at,
+            AssetStoreModel.last_updated_by_kind,
+            AssetStoreModel.last_updated_by_dag_id,
+            AssetStoreModel.last_updated_by_run_id,
+            AssetStoreModel.last_updated_by_task_id,
+            AssetStoreModel.last_updated_by_map_index,
         )
         .where(AssetStoreModel.asset_id == asset_id)
         .order_by(AssetStoreModel.key.asc())
@@ -87,7 +93,21 @@ def list_asset_store(
     )
     rows = session.execute(paginated).all()
     entries = [
-        AssetStoreResponse(key=r.key, value=json.loads(r.value), 
updated_at=r.updated_at) for r in rows
+        AssetStoreResponse(
+            key=r.key,
+            value=json.loads(r.value),
+            updated_at=r.updated_at,
+            last_updated_by=AssetStoreLastUpdatedBy(
+                kind=r.last_updated_by_kind,
+                dag_id=r.last_updated_by_dag_id,
+                run_id=r.last_updated_by_run_id,
+                task_id=r.last_updated_by_task_id,
+                map_index=r.last_updated_by_map_index,
+            )
+            if r.last_updated_by_kind is not None
+            else None,
+        )
+        for r in rows
     ]
     return AssetStoreCollectionResponse(asset_store=entries, 
total_entries=total_entries)
 
@@ -108,6 +128,11 @@ def get_asset_store(
             AssetStoreModel.key,
             AssetStoreModel.value,
             AssetStoreModel.updated_at,
+            AssetStoreModel.last_updated_by_kind,
+            AssetStoreModel.last_updated_by_dag_id,
+            AssetStoreModel.last_updated_by_run_id,
+            AssetStoreModel.last_updated_by_task_id,
+            AssetStoreModel.last_updated_by_map_index,
         ).where(
             AssetStoreModel.asset_id == asset_id,
             AssetStoreModel.key == key,
@@ -118,7 +143,20 @@ def get_asset_store(
             status_code=status.HTTP_404_NOT_FOUND,
             detail=f"Asset store key {key!r} not found",
         )
-    return AssetStoreResponse(key=row.key, value=json.loads(row.value), 
updated_at=row.updated_at)
+    return AssetStoreResponse(
+        key=row.key,
+        value=json.loads(row.value),
+        updated_at=row.updated_at,
+        last_updated_by=AssetStoreLastUpdatedBy(
+            kind=row.last_updated_by_kind,
+            dag_id=row.last_updated_by_dag_id,
+            run_id=row.last_updated_by_run_id,
+            task_id=row.last_updated_by_task_id,
+            map_index=row.last_updated_by_map_index,
+        )
+        if row.last_updated_by_kind is not None
+        else None,
+    )
 
 
 @asset_store_router.put(
@@ -134,7 +172,13 @@ def set_asset_store(
     session: SessionDep,
 ) -> None:
     """Set an asset store value. Creates or overwrites the key."""
-    _get_db_backend().set(AssetScope(asset_id=asset_id), key, 
json.dumps(body.value), session=session)
+    _get_db_backend().set_asset_store(
+        AssetScope(asset_id=asset_id),
+        key,
+        json.dumps(body.value),
+        kind=AssetStoreWriterKind.API,
+        session=session,
+    )
 
 
 @asset_store_router.delete(
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_store.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_store.py
index 5b94b1da2f5..925ad2efddd 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_store.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_store.py
@@ -35,15 +35,39 @@ from cadwyn import VersionedAPIRouter
 from fastapi import HTTPException, Query, status
 from sqlalchemy import select
 
-from airflow._shared.state import AssetScope
+from airflow._shared.state import AssetScope, AssetStoreWriterKind
 from airflow.api_fastapi.common.db.common import SessionDep
 from airflow.api_fastapi.execution_api.datamodels.asset_store import (
     AssetStorePutBody,
     AssetStoreResponse,
 )
-from airflow.api_fastapi.execution_api.security import ExecutionAPIRoute
+from airflow.api_fastapi.execution_api.datamodels.token import TIToken
+from airflow.api_fastapi.execution_api.security import CurrentTIToken, 
ExecutionAPIRoute
 from airflow.models.asset import AssetModel
+from airflow.models.taskinstance import TaskInstance
 from airflow.state import get_state_backend
+from airflow.state.metastore import MetastoreStoreBackend
+
+_TIWriterFields = tuple[str, str, str, int]
+
+
+def _fetch_ti_writer_fields(token: TIToken, session: SessionDep) -> 
_TIWriterFields:
+    """Return (dag_id, run_id, task_id, map_index) for the TI identified by 
the token."""
+    row = session.execute(
+        select(
+            TaskInstance.dag_id,
+            TaskInstance.run_id,
+            TaskInstance.task_id,
+            TaskInstance.map_index,
+        ).where(TaskInstance.id == token.id)
+    ).one_or_none()
+    if row is None:
+        raise HTTPException(
+            status_code=status.HTTP_404_NOT_FOUND,
+            detail={"reason": "not_found", "message": f"Task instance 
{token.id!r} not found"},
+        )
+    return row.dag_id, row.run_id, row.task_id, row.map_index
+
 
 # TODO(AIP-103): enforce that the requesting task is registered with the asset
 # (via task_inlet_asset_reference or task_outlet_asset_reference) before
@@ -97,16 +121,41 @@ def get_asset_store_by_name(
     return AssetStoreResponse(value=json.loads(value))
 
 
+def _put_asset_store(
+    scope: AssetScope,
+    key: str,
+    body: AssetStorePutBody,
+    token: TIToken,
+    session: SessionDep,
+) -> None:
+    backend = get_state_backend()
+    if isinstance(backend, MetastoreStoreBackend):
+        dag_id, run_id, task_id, map_index = _fetch_ti_writer_fields(token, 
session)
+        backend.set_asset_store(
+            scope,
+            key,
+            json.dumps(body.value),
+            kind=AssetStoreWriterKind.TASK,
+            dag_id=dag_id,
+            run_id=run_id,
+            task_id=task_id,
+            map_index=map_index,
+            session=session,
+        )
+    else:
+        backend.set(scope, key, json.dumps(body.value), session=session)
+
+
 @router.put("/by-name/value", status_code=status.HTTP_204_NO_CONTENT)
 def set_asset_store_by_name(
     name: Annotated[str, Query(min_length=1)],
     key: Annotated[str, Query(min_length=1)],
     body: AssetStorePutBody,
     session: SessionDep,
+    token: TIToken = CurrentTIToken,
 ) -> None:
     """Set an asset store value by asset name."""
-    asset_id = _resolve_asset_id_by_name(name, session)
-    get_state_backend().set(AssetScope(asset_id=asset_id), key, 
json.dumps(body.value), session=session)
+    _put_asset_store(AssetScope(asset_id=_resolve_asset_id_by_name(name, 
session)), key, body, token, session)
 
 
 @router.delete("/by-name/value", status_code=status.HTTP_204_NO_CONTENT)
@@ -153,10 +202,10 @@ def set_asset_store_by_uri(
     key: Annotated[str, Query(min_length=1)],
     body: AssetStorePutBody,
     session: SessionDep,
+    token: TIToken = CurrentTIToken,
 ) -> None:
     """Set an asset store value by asset URI."""
-    asset_id = _resolve_asset_id_by_uri(uri, session)
-    get_state_backend().set(AssetScope(asset_id=asset_id), key, 
json.dumps(body.value), session=session)
+    _put_asset_store(AssetScope(asset_id=_resolve_asset_id_by_uri(uri, 
session)), key, body, token, session)
 
 
 @router.delete("/by-uri/value", status_code=status.HTTP_204_NO_CONTENT)
diff --git 
a/airflow-core/src/airflow/migrations/versions/0112_3_3_0_add_task_store_and_asset_store_tables.py
 
b/airflow-core/src/airflow/migrations/versions/0112_3_3_0_add_task_store_and_asset_store_tables.py
index e9d88443312..f2f248edcf2 100644
--- 
a/airflow-core/src/airflow/migrations/versions/0112_3_3_0_add_task_store_and_asset_store_tables.py
+++ 
b/airflow-core/src/airflow/migrations/versions/0112_3_3_0_add_task_store_and_asset_store_tables.py
@@ -50,6 +50,11 @@ def upgrade():
         sa.Column("key", sa.String(length=512), nullable=False),
         sa.Column("value", sa.Text().with_variant(mysql.MEDIUMTEXT(), 
"mysql"), nullable=False),
         sa.Column("updated_at", UtcDateTime(), nullable=False),
+        sa.Column("last_updated_by_kind", sa.String(length=16), nullable=True),
+        sa.Column("last_updated_by_dag_id", StringID(), nullable=True),
+        sa.Column("last_updated_by_run_id", StringID(), nullable=True),
+        sa.Column("last_updated_by_task_id", StringID(), nullable=True),
+        sa.Column("last_updated_by_map_index", sa.Integer(), nullable=True),
         sa.ForeignKeyConstraint(
             ["asset_id"], ["asset.id"], name="asset_store_asset_fkey", 
ondelete="CASCADE"
         ),
diff --git a/airflow-core/src/airflow/models/asset_store.py 
b/airflow-core/src/airflow/models/asset_store.py
index 946553118d8..42102723b24 100644
--- a/airflow-core/src/airflow/models/asset_store.py
+++ b/airflow-core/src/airflow/models/asset_store.py
@@ -33,6 +33,10 @@ class AssetStoreModel(Base):
 
     Not scoped to any DAG run — a watermark written in run 1 is readable by 
run 2.
     Rows survive until explicitly deleted or the asset itself is deleted.
+
+    ``last_updated_by_*`` columns record who last wrote this entry. They are 
denormalized
+    (no FK) so that the references survives DAG run cleanup, and so cases like 
watchers (``BaseEventTrigger``)
+    can write without a task instance.
     """
 
     __tablename__ = "asset_store"
@@ -43,6 +47,12 @@ class AssetStoreModel(Base):
     value: Mapped[str] = mapped_column(Text().with_variant(MEDIUMTEXT, 
"mysql"), nullable=False)
     updated_at: Mapped[datetime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
 
+    last_updated_by_kind: Mapped[str | None] = mapped_column(String(16), 
nullable=True)
+    last_updated_by_dag_id: Mapped[str | None] = mapped_column(String(250, 
**COLLATION_ARGS), nullable=True)
+    last_updated_by_run_id: Mapped[str | None] = mapped_column(String(250, 
**COLLATION_ARGS), nullable=True)
+    last_updated_by_task_id: Mapped[str | None] = mapped_column(String(250, 
**COLLATION_ARGS), nullable=True)
+    last_updated_by_map_index: Mapped[int | None] = mapped_column(Integer, 
nullable=True)
+
     __table_args__ = (
         PrimaryKeyConstraint("asset_id", "key", name="asset_store_pkey"),
         ForeignKeyConstraint(
diff --git a/airflow-core/src/airflow/state/metastore.py 
b/airflow-core/src/airflow/state/metastore.py
index 137fce09a75..541752e2a05 100644
--- a/airflow-core/src/airflow/state/metastore.py
+++ b/airflow-core/src/airflow/state/metastore.py
@@ -26,7 +26,7 @@ from typing import TYPE_CHECKING
 import structlog
 from sqlalchemy import delete, select
 
-from airflow._shared.state import AssetScope, BaseStoreBackend, StoreScope, 
TaskScope
+from airflow._shared.state import AssetScope, AssetStoreWriterKind, 
BaseStoreBackend, StoreScope, TaskScope
 from airflow._shared.timezones import timezone
 from airflow.configuration import conf
 from airflow.models.asset_store import AssetStoreModel
@@ -84,6 +84,28 @@ def _build_upsert_stmt(
     return stmt
 
 
+def _build_asset_writer_fields(
+    kind: AssetStoreWriterKind | None,
+    dag_id: str | None,
+    run_id: str | None,
+    task_id: str | None,
+    map_index: int | None,
+    *,
+    value: str,
+    now: datetime,
+) -> tuple[dict, dict]:
+    kind_str = kind.value if kind is not None else None
+    writer_info = dict(
+        last_updated_by_kind=kind_str,
+        last_updated_by_dag_id=dag_id,
+        last_updated_by_run_id=run_id,
+        last_updated_by_task_id=task_id,
+        last_updated_by_map_index=map_index,
+    )
+    update_fields = dict(value=value, updated_at=now, **(writer_info if kind 
is not None else {}))
+    return writer_info, update_fields
+
+
 class MetastoreStoreBackend(BaseStoreBackend):
     """Default state backend for tasks and assets. Stores task and asset state 
in the Airflow metadata database."""
 
@@ -279,18 +301,63 @@ class MetastoreStoreBackend(BaseStoreBackend):
         )
         return row.value if row is not None else None
 
-    def _set_asset_store(self, scope: AssetScope, key: str, value: str, *, 
session: Session) -> None:
+    def _set_asset_store(
+        self,
+        scope: AssetScope,
+        key: str,
+        value: str,
+        *,
+        kind: AssetStoreWriterKind | None = None,
+        dag_id: str | None = None,
+        run_id: str | None = None,
+        task_id: str | None = None,
+        map_index: int | None = None,
+        session: Session,
+    ) -> None:
         now = timezone.utcnow()
-        values = dict(asset_id=scope.asset_id, key=key, value=value, 
updated_at=now)
+        writer_info, update_fields = _build_asset_writer_fields(
+            kind, dag_id, run_id, task_id, map_index, value=value, now=now
+        )
+        values = dict(asset_id=scope.asset_id, key=key, value=value, 
updated_at=now, **writer_info)
         stmt = _build_upsert_stmt(
             get_dialect_name(session),
             AssetStoreModel,
             ["asset_id", "key"],
             values,
-            dict(value=value, updated_at=now),
+            update_fields,
         )
         session.execute(stmt)
 
+    @provide_session
+    def set_asset_store(
+        self,
+        scope: AssetScope,
+        key: str,
+        value: str,
+        *,
+        kind: AssetStoreWriterKind,
+        dag_id: str | None = None,
+        run_id: str | None = None,
+        task_id: str | None = None,
+        map_index: int | None = None,
+        session: Session | None = NEW_SESSION,
+    ) -> None:
+        """Write an asset store entry, recording who made the write."""
+        kind.validate_writer_fields(dag_id, run_id, task_id, map_index)
+        if TYPE_CHECKING:
+            assert session is not None
+        self._set_asset_store(
+            scope,
+            key,
+            value,
+            kind=kind,
+            dag_id=dag_id,
+            run_id=run_id,
+            task_id=task_id,
+            map_index=map_index,
+            session=session,
+        )
+
     def _delete_asset_store(self, scope: AssetScope, key: str, *, session: 
Session) -> None:
         session.execute(
             delete(AssetStoreModel).where(
@@ -435,20 +502,61 @@ class MetastoreStoreBackend(BaseStoreBackend):
         return row.value if row is not None else None
 
     async def _aset_asset_store(
-        self, scope: AssetScope, key: str, value: str, *, session: AsyncSession
+        self,
+        scope: AssetScope,
+        key: str,
+        value: str,
+        *,
+        kind: AssetStoreWriterKind | None = None,
+        dag_id: str | None = None,
+        run_id: str | None = None,
+        task_id: str | None = None,
+        map_index: int | None = None,
+        session: AsyncSession,
     ) -> None:
         now = timezone.utcnow()
-        values = dict(asset_id=scope.asset_id, key=key, value=value, 
updated_at=now)
+        writer_info, update_fields = _build_asset_writer_fields(
+            kind, dag_id, run_id, task_id, map_index, value=value, now=now
+        )
+        values = dict(asset_id=scope.asset_id, key=key, value=value, 
updated_at=now, **writer_info)
         # get_dialect_name expects a sync Session; sync_session is the 
underlying Session the async wrapper delegates to
         stmt = _build_upsert_stmt(
             get_dialect_name(session.sync_session),
             AssetStoreModel,
             ["asset_id", "key"],
             values,
-            dict(value=value, updated_at=now),
+            update_fields,
         )
         await session.execute(stmt)
 
+    async def aset_asset_store(
+        self,
+        scope: AssetScope,
+        key: str,
+        value: str,
+        *,
+        kind: AssetStoreWriterKind,
+        dag_id: str | None = None,
+        run_id: str | None = None,
+        task_id: str | None = None,
+        map_index: int | None = None,
+        session: AsyncSession | None = None,
+    ) -> None:
+        """Write an asset store entry, recording who made the write."""
+        kind.validate_writer_fields(dag_id, run_id, task_id, map_index)
+        async with _async_session(session) as s:
+            await self._aset_asset_store(
+                scope,
+                key,
+                value,
+                kind=kind,
+                dag_id=dag_id,
+                run_id=run_id,
+                task_id=task_id,
+                map_index=map_index,
+                session=s,
+            )
+
     async def _adelete_asset_store(self, scope: AssetScope, key: str, *, 
session: AsyncSession) -> None:
         await session.execute(
             delete(AssetStoreModel).where(
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 14a60c09ed6..9acda604e28 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -414,6 +414,62 @@ export const $AssetStoreCollectionResponse = {
     description: 'All asset store entries for an asset.'
 } as const;
 
+export const $AssetStoreLastUpdatedBy = {
+    properties: {
+        kind: {
+            '$ref': '#/components/schemas/AssetStoreWriterKind'
+        },
+        dag_id: {
+            anyOf: [
+                {
+                    type: 'string'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Dag Id'
+        },
+        run_id: {
+            anyOf: [
+                {
+                    type: 'string'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Run Id'
+        },
+        task_id: {
+            anyOf: [
+                {
+                    type: 'string'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Task Id'
+        },
+        map_index: {
+            anyOf: [
+                {
+                    type: 'integer'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Map Index'
+        }
+    },
+    type: 'object',
+    required: ['kind'],
+    title: 'AssetStoreLastUpdatedBy',
+    description: 'Writer info for the last write to an asset store entry.'
+} as const;
+
 export const $AssetStoreResponse = {
     properties: {
         key: {
@@ -427,6 +483,16 @@ export const $AssetStoreResponse = {
             type: 'string',
             format: 'date-time',
             title: 'Updated At'
+        },
+        last_updated_by: {
+            anyOf: [
+                {
+                    '$ref': '#/components/schemas/AssetStoreLastUpdatedBy'
+                },
+                {
+                    type: 'null'
+                }
+            ]
         }
     },
     type: 'object',
@@ -435,6 +501,17 @@ export const $AssetStoreResponse = {
     description: 'A single asset store key/value pair with metadata.'
 } as const;
 
+export const $AssetStoreWriterKind = {
+    type: 'string',
+    enum: ['task', 'watcher', 'api'],
+    title: 'AssetStoreWriterKind',
+    description: `Identifies what kind of writer last updated an asset store 
entry.
+
+\`\`TASK\`\` — written by a task via the execution API.
+\`\`WATCHER\`\` — written by a \`\`BaseEventTrigger\`\` (no task instance).
+\`\`API\`\` — written directly through the Core API (e.g. manual admin write).`
+} as const;
+
 export const $AssetWatcherResponse = {
     properties: {
         name: {
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 16d14d2fe96..4f13f95a2f8 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -111,6 +111,17 @@ export type AssetStoreCollectionResponse = {
     total_entries: number;
 };
 
+/**
+ * Writer info for the last write to an asset store entry.
+ */
+export type AssetStoreLastUpdatedBy = {
+    kind: AssetStoreWriterKind;
+    dag_id?: string | null;
+    run_id?: string | null;
+    task_id?: string | null;
+    map_index?: number | null;
+};
+
 /**
  * A single asset store key/value pair with metadata.
  */
@@ -118,8 +129,18 @@ export type AssetStoreResponse = {
     key: string;
     value: JsonValue;
     updated_at: string;
+    last_updated_by?: AssetStoreLastUpdatedBy | null;
 };
 
+/**
+ * Identifies what kind of writer last updated an asset store entry.
+ *
+ * ``TASK`` — written by a task via the execution API.
+ * ``WATCHER`` — written by a ``BaseEventTrigger`` (no task instance).
+ * ``API`` — written directly through the Core API (e.g. manual admin write).
+ */
+export type AssetStoreWriterKind = 'task' | 'watcher' | 'api';
+
 /**
  * Asset watcher serializer for responses.
  */
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_store.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_store.py
index 8345386b2fd..988cf1ddbee 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_store.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_store.py
@@ -110,6 +110,28 @@ class TestListAssetState(TestAssetStateEndpoint):
         assert "updated_at" in item
         assert item["key"] == "watermark"
 
+    def test_last_updated_by_returned_when_set(self, test_client, 
create_task_instance):
+        ti = create_task_instance()
+        row = AssetStoreModel(
+            asset_id=self.asset.id,
+            key="watermark",
+            value='"v"',
+            last_updated_by_kind="task",
+            last_updated_by_dag_id=ti.dag_id,
+            last_updated_by_run_id=ti.run_id,
+            last_updated_by_task_id=ti.task_id,
+            last_updated_by_map_index=ti.map_index,
+        )
+        self._session.add(row)
+        self._session.commit()
+
+        item = test_client.get(self._base_url).json()["asset_store"][0]
+        assert item["last_updated_by"]["kind"] == "task"
+        assert item["last_updated_by"]["dag_id"] == ti.dag_id
+        assert item["last_updated_by"]["run_id"] == ti.run_id
+        assert item["last_updated_by"]["task_id"] == ti.task_id
+        assert item["last_updated_by"]["map_index"] == ti.map_index
+
     def test_pagination_limit(self, test_client):
         for k in ("watermark", "file_count", "last_run"):
             _create_asset_state(self._session, self.asset.id, k, "v")
@@ -145,6 +167,29 @@ class TestGetAssetState(TestAssetStateEndpoint):
         assert data["key"] == "watermark"
         assert data["value"] == "2026-05-01"
         assert "updated_at" in data
+        assert data["last_updated_by"] is None
+
+    def test_last_updated_by_returned_in_get(self, test_client, 
create_task_instance):
+        ti = create_task_instance()
+        row = AssetStoreModel(
+            asset_id=self.asset.id,
+            key="watermark",
+            value='"v"',
+            last_updated_by_kind="task",
+            last_updated_by_dag_id=ti.dag_id,
+            last_updated_by_run_id=ti.run_id,
+            last_updated_by_task_id=ti.task_id,
+            last_updated_by_map_index=ti.map_index,
+        )
+        self._session.add(row)
+        self._session.commit()
+
+        data = test_client.get(f"{self._base_url}/watermark").json()
+        assert data["last_updated_by"]["kind"] == "task"
+        assert data["last_updated_by"]["dag_id"] == ti.dag_id
+        assert data["last_updated_by"]["run_id"] == ti.run_id
+        assert data["last_updated_by"]["task_id"] == ti.task_id
+        assert data["last_updated_by"]["map_index"] == ti.map_index
 
     def test_missing_key_returns_404(self, test_client):
         assert test_client.get(f"{self._base_url}/nonexistent").status_code == 
404
@@ -209,6 +254,17 @@ class TestSetAssetState(TestAssetStateEndpoint):
         assert row is not None
         assert row.value == expected_db
 
+    def test_put_records_api_kind(self, test_client):
+        """PUT via the Core API sets last_updated_by.kind='api' in the 
response."""
+        test_client.put(f"{self._base_url}/watermark", json={"value": "v"})
+
+        data = test_client.get(f"{self._base_url}/watermark").json()
+        assert data["last_updated_by"]["kind"] == "api"
+        assert data["last_updated_by"]["dag_id"] is None
+        assert data["last_updated_by"]["run_id"] is None
+        assert data["last_updated_by"]["task_id"] is None
+        assert data["last_updated_by"]["map_index"] is None
+
     @pytest.mark.parametrize("value", [42, True, {"rows": 100}, [1, "two"], 
"hello"])
     def test_core_api_write_read_roundtrip(self, test_client, value):
         """Core API write then Core API read returns the same native value."""
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_store.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_store.py
index 325b9decce7..1b8cf5382c2 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_store.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_store.py
@@ -20,19 +20,35 @@ import json
 from typing import TYPE_CHECKING
 
 import pytest
+from fastapi import Request
 from sqlalchemy import delete, select
 
+from airflow.api_fastapi.execution_api.datamodels.token import TIClaims, 
TIToken
+from airflow.api_fastapi.execution_api.security import require_auth
 from airflow.models.asset import AssetActive, AssetModel
 from airflow.models.asset_store import AssetStoreModel
 from airflow.utils.session import create_session
 
 if TYPE_CHECKING:
+    from fastapi import FastAPI
     from fastapi.testclient import TestClient
     from sqlalchemy.orm import Session
 
+    from airflow.models.taskinstance import TaskInstance
+
+    from tests_common.pytest_plugin import CreateTaskInstance
+
 pytestmark = pytest.mark.db_test
 
 _BY_NAME_VALUE = "/execution/store/asset/by-name/value"
+
+
+def _create_asset_store_row(asset_id: int, key: str, value: str) -> None:
+    """Insert an AssetStoreModel row directly in the database."""
+    with create_session() as s:
+        s.add(AssetStoreModel(asset_id=asset_id, key=key, 
value=json.dumps(value)))
+
+
 _BY_NAME_CLEAR = "/execution/store/asset/by-name/clear"
 _BY_URI_VALUE = "/execution/store/asset/by-uri/value"
 _BY_URI_CLEAR = "/execution/store/asset/by-uri/clear"
@@ -66,9 +82,7 @@ def inactive_asset(session: Session) -> AssetModel:
 
 class TestGetAssetStateByName:
     def test_get_returns_value(self, client: TestClient, asset: AssetModel):
-        client.put(
-            _BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"}, 
json={"value": "2026-04-29"}
-        )
+        _create_asset_store_row(asset.id, "watermark", "2026-04-29")
 
         response = client.get(_BY_NAME_VALUE, params={"name": asset.name, 
"key": "watermark"})
 
@@ -88,7 +102,7 @@ class TestGetAssetStateByName:
         session.add(AssetActive.for_asset(slashed))
         session.commit()
 
-        client.put(_BY_NAME_VALUE, params={"name": slashed.name, "key": "wm"}, 
json={"value": "x"})
+        _create_asset_store_row(slashed.id, "wm", "x")
         response = client.get(_BY_NAME_VALUE, params={"name": slashed.name, 
"key": "wm"})
 
         assert response.status_code == 200
@@ -101,6 +115,20 @@ class TestGetAssetStateByName:
 
 
 class TestPutAssetStateByName:
+    @pytest.fixture(autouse=True)
+    def setup_ti_auth(
+        self,
+        exec_app: FastAPI,
+        create_task_instance: CreateTaskInstance,
+    ):
+        """Create a real TI and wire the mock auth to use its UUID so the 
route can look up writer info."""
+        self._ti: TaskInstance = create_task_instance()
+
+        async def _auth(request: Request) -> TIToken:
+            return TIToken(id=self._ti.id, claims=TIClaims(scope="execution"))
+
+        exec_app.dependency_overrides[require_auth] = _auth
+
     def test_put_creates_row(self, client: TestClient, asset: AssetModel, 
session: Session):
         response = client.put(
             _BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"}, 
json={"value": "2026-04-29"}
@@ -117,6 +145,26 @@ class TestPutAssetStateByName:
         # DB stores JSON-encoded string
         assert row.value == '"2026-04-29"'
 
+    def test_put_records_writer(self, client: TestClient, asset: AssetModel, 
session: Session):
+        """PUT writes about writer fields resolved from the JWT token's TI."""
+        response = client.put(
+            _BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"}, 
json={"value": "v"}
+        )
+        assert response.status_code == 204
+
+        row = session.scalar(
+            select(AssetStoreModel).where(
+                AssetStoreModel.asset_id == asset.id,
+                AssetStoreModel.key == "watermark",
+            )
+        )
+        assert row is not None
+        assert row.last_updated_by_kind == "task"
+        assert row.last_updated_by_dag_id == self._ti.dag_id
+        assert row.last_updated_by_run_id == self._ti.run_id
+        assert row.last_updated_by_task_id == self._ti.task_id
+        assert row.last_updated_by_map_index == self._ti.map_index
+
     def test_put_int_value_roundtrip(self, client: TestClient, asset: 
AssetModel):
         response = client.put(
             _BY_NAME_VALUE, params={"name": asset.name, "key": "total_runs"}, 
json={"value": 5}
@@ -191,9 +239,7 @@ class TestPutAssetStateByName:
 
 class TestDeleteAssetStateByName:
     def test_delete_removes_key(self, client: TestClient, asset: AssetModel):
-        client.put(
-            _BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"}, 
json={"value": "2026-04-29"}
-        )
+        _create_asset_store_row(asset.id, "watermark", "2026-04-29")
 
         response = client.delete(_BY_NAME_VALUE, params={"name": asset.name, 
"key": "watermark"})
 
@@ -209,7 +255,7 @@ class TestDeleteAssetStateByName:
 class TestClearAssetStateByName:
     def test_clear_removes_all_keys(self, client: TestClient, asset: 
AssetModel):
         for k, v in [("watermark", "a"), ("last_id", "b"), ("schema_hash", 
"c")]:
-            client.put(_BY_NAME_VALUE, params={"name": asset.name, "key": k}, 
json={"value": v})
+            _create_asset_store_row(asset.id, k, v)
 
         response = client.delete(_BY_NAME_CLEAR, params={"name": asset.name})
 
@@ -221,9 +267,7 @@ class TestClearAssetStateByName:
 
 class TestGetAssetStateByUri:
     def test_get_returns_value(self, client: TestClient, asset: AssetModel):
-        client.put(
-            _BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"}, 
json={"value": "2026-04-29"}
-        )
+        _create_asset_store_row(asset.id, "watermark", "2026-04-29")
 
         response = client.get(_BY_URI_VALUE, params={"uri": asset.uri, "key": 
"watermark"})
 
@@ -242,6 +286,20 @@ class TestGetAssetStateByUri:
 
 
 class TestPutAssetStateByUri:
+    @pytest.fixture(autouse=True)
+    def setup_ti_auth(
+        self,
+        exec_app: FastAPI,
+        create_task_instance: CreateTaskInstance,
+    ):
+        """Create a real TI and wire the mock auth to use its UUID so the 
route can look up writer info."""
+        self._ti: TaskInstance = create_task_instance()
+
+        async def _auth(request: Request) -> TIToken:
+            return TIToken(id=self._ti.id, claims=TIClaims(scope="execution"))
+
+        exec_app.dependency_overrides[require_auth] = _auth
+
     def test_put_creates_row(self, client: TestClient, asset: AssetModel, 
session: Session):
         response = client.put(
             _BY_URI_VALUE, params={"uri": asset.uri, "key": "watermark"}, 
json={"value": "2026-04-29"}
@@ -257,6 +315,26 @@ class TestPutAssetStateByUri:
         assert row is not None
         assert row.value == '"2026-04-29"'
 
+    def test_put_records_writer(self, client: TestClient, asset: AssetModel, 
session: Session):
+        """PUT writes writer fields resolved from the JWT token's TI."""
+        response = client.put(
+            _BY_URI_VALUE, params={"uri": asset.uri, "key": "watermark"}, 
json={"value": "v"}
+        )
+        assert response.status_code == 204
+
+        row = session.scalar(
+            select(AssetStoreModel).where(
+                AssetStoreModel.asset_id == asset.id,
+                AssetStoreModel.key == "watermark",
+            )
+        )
+        assert row is not None
+        assert row.last_updated_by_kind == "task"
+        assert row.last_updated_by_dag_id == self._ti.dag_id
+        assert row.last_updated_by_run_id == self._ti.run_id
+        assert row.last_updated_by_task_id == self._ti.task_id
+        assert row.last_updated_by_map_index == self._ti.map_index
+
     def test_put_unknown_uri_returns_404(self, client: TestClient):
         response = client.put(
             _BY_URI_VALUE, params={"uri": "s3://nonexistent/path", "key": 
"wm"}, json={"value": "x"}
@@ -267,7 +345,7 @@ class TestPutAssetStateByUri:
 
 class TestDeleteAssetStateByUri:
     def test_delete_removes_key(self, client: TestClient, asset: AssetModel):
-        client.put(_BY_URI_VALUE, params={"uri": asset.uri, "key": 
"watermark"}, json={"value": "2026-04-29"})
+        _create_asset_store_row(asset.id, "watermark", "2026-04-29")
 
         response = client.delete(_BY_URI_VALUE, params={"uri": asset.uri, 
"key": "watermark"})
 
@@ -278,7 +356,7 @@ class TestDeleteAssetStateByUri:
 class TestClearAssetStateByUri:
     def test_clear_removes_all_keys(self, client: TestClient, asset: 
AssetModel):
         for k, v in [("watermark", "a"), ("last_id", "b")]:
-            client.put(_BY_URI_VALUE, params={"uri": asset.uri, "key": k}, 
json={"value": v})
+            _create_asset_store_row(asset.id, k, v)
 
         response = client.delete(_BY_URI_CLEAR, params={"uri": asset.uri})
 
diff --git a/airflow-core/tests/unit/state/test_metastore.py 
b/airflow-core/tests/unit/state/test_metastore.py
index 15a8984af64..1f7883f3e0f 100644
--- a/airflow-core/tests/unit/state/test_metastore.py
+++ b/airflow-core/tests/unit/state/test_metastore.py
@@ -25,6 +25,7 @@ from unittest.mock import patch
 import pytest
 from sqlalchemy import Delete, select
 
+from airflow._shared.state import AssetStoreWriterKind
 from airflow._shared.timezones import timezone
 from airflow.configuration import conf
 from airflow.models.asset import AssetModel
@@ -429,6 +430,130 @@ class TestMetastoreStoreBackendAssetScope:
 
         assert backend.get(scope2, "watermark", session=session) is None
 
+    def test_set_asset_store_writes_writer(
+        self, backend: MetastoreStoreBackend, asset_committed: AssetModel, 
create_task_instance
+    ):
+        ti = create_task_instance()
+        scope = AssetScope(asset_id=asset_committed.id)
+        backend.set_asset_store(
+            scope,
+            "watermark",
+            "v",
+            kind=AssetStoreWriterKind.TASK,
+            dag_id=ti.dag_id,
+            run_id=ti.run_id,
+            task_id=ti.task_id,
+            map_index=ti.map_index,
+        )
+
+        with create_session() as s:
+            row = 
s.scalar(select(AssetStoreModel).where(AssetStoreModel.asset_id == 
asset_committed.id))
+        assert row is not None
+        assert row.last_updated_by_kind == AssetStoreWriterKind.TASK.value
+        assert row.last_updated_by_dag_id == ti.dag_id
+        assert row.last_updated_by_run_id == ti.run_id
+        assert row.last_updated_by_task_id == ti.task_id
+        assert row.last_updated_by_map_index == ti.map_index
+
+    @pytest.mark.backend("postgres", "mysql", "sqlite")
+    def test_set_asset_store_upsert_updates_writer(
+        self, backend: MetastoreStoreBackend, asset_committed: AssetModel, 
create_task_instance
+    ):
+        ti1 = create_task_instance(task_id="task1", dag_id="dag1")
+        ti2 = create_task_instance(task_id="task2", dag_id="dag2")
+        scope = AssetScope(asset_id=asset_committed.id)
+        backend.set_asset_store(
+            scope,
+            "watermark",
+            "v1",
+            kind=AssetStoreWriterKind.TASK,
+            dag_id=ti1.dag_id,
+            run_id=ti1.run_id,
+            task_id=ti1.task_id,
+            map_index=ti1.map_index,
+        )
+        backend.set_asset_store(
+            scope,
+            "watermark",
+            "v2",
+            kind=AssetStoreWriterKind.TASK,
+            dag_id=ti2.dag_id,
+            run_id=ti2.run_id,
+            task_id=ti2.task_id,
+            map_index=ti2.map_index,
+        )
+
+        with create_session() as s:
+            row = 
s.scalar(select(AssetStoreModel).where(AssetStoreModel.asset_id == 
asset_committed.id))
+        assert row is not None
+        assert row.value == "v2"
+        assert row.last_updated_by_dag_id == ti2.dag_id
+        assert row.last_updated_by_task_id == ti2.task_id
+
+    def test_set_stores_null_writer(
+        self, session: Session, backend: MetastoreStoreBackend, asset: 
AssetModel
+    ):
+        scope = AssetScope(asset_id=asset.id)
+        backend.set(scope, "watermark", "v", session=session)
+        session.flush()
+
+        row = 
session.scalar(select(AssetStoreModel).where(AssetStoreModel.asset_id == 
asset.id))
+        assert row is not None
+        assert row.last_updated_by_kind is None
+        assert row.last_updated_by_dag_id is None
+        assert row.last_updated_by_task_id is None
+
+    def test_set_asset_store_task_kind_requires_all_fields(
+        self, backend: MetastoreStoreBackend, asset_committed: AssetModel, 
create_task_instance
+    ):
+        ti = create_task_instance()
+        scope = AssetScope(asset_id=asset_committed.id)
+        with pytest.raises(ValueError, match="kind='task' requires"):
+            backend.set_asset_store(
+                scope,
+                "watermark",
+                "v",
+                kind=AssetStoreWriterKind.TASK,
+                dag_id=ti.dag_id,
+                run_id=ti.run_id,
+                task_id=None,
+                map_index=ti.map_index,
+            )
+
+    def test_set_asset_store_watcher_kind_rejects_task_fields(
+        self, backend: MetastoreStoreBackend, asset_committed: AssetModel, 
create_task_instance
+    ):
+        ti = create_task_instance()
+        scope = AssetScope(asset_id=asset_committed.id)
+        with pytest.raises(ValueError, match="kind='watcher' must not carry 
task fields"):
+            backend.set_asset_store(
+                scope,
+                "watermark",
+                "v",
+                kind=AssetStoreWriterKind.WATCHER,
+                dag_id=ti.dag_id,
+                run_id=None,
+                task_id=None,
+                map_index=None,
+            )
+
+    def test_set_asset_store_api_kind_rejects_task_fields(
+        self, backend: MetastoreStoreBackend, asset_committed: AssetModel, 
create_task_instance
+    ):
+        ti = create_task_instance()
+        scope = AssetScope(asset_id=asset_committed.id)
+        with pytest.raises(ValueError, match="kind='api' must not carry task 
fields"):
+            backend.set_asset_store(
+                scope,
+                "watermark",
+                "v",
+                kind=AssetStoreWriterKind.API,
+                dag_id=ti.dag_id,
+                run_id=None,
+                task_id=None,
+                map_index=None,
+            )
+
     def test_cleanup_does_not_touch_asset_state(
         self, session: Session, backend: MetastoreStoreBackend, asset: 
AssetModel
     ):
@@ -510,6 +635,31 @@ class TestMetastoreStoreBackendAsync:
         assert await backend.aget(scope, "watermark") is None
         assert await backend.aget(scope, "file_count") is None
 
+    async def test_aset_asset_store_writes_writer(
+        self, backend: MetastoreStoreBackend, asset_committed: AssetModel, 
create_task_instance
+    ):
+        ti = create_task_instance()
+        scope = AssetScope(asset_id=asset_committed.id)
+        await backend.aset_asset_store(
+            scope,
+            "watermark",
+            "v",
+            kind=AssetStoreWriterKind.TASK,
+            dag_id=ti.dag_id,
+            run_id=ti.run_id,
+            task_id=ti.task_id,
+            map_index=ti.map_index,
+        )
+
+        async with create_session_async() as s:
+            row = await s.scalar(
+                select(AssetStoreModel).where(AssetStoreModel.asset_id == 
asset_committed.id)
+            )
+        assert row is not None
+        assert row.last_updated_by_kind == AssetStoreWriterKind.TASK.value
+        assert row.last_updated_by_dag_id == ti.dag_id
+        assert row.last_updated_by_task_id == ti.task_id
+
     async def test_aset_task_raises_for_missing_dag_run(self, backend: 
MetastoreStoreBackend):
         scope = TaskScope(dag_id="nonexistent_dag", run_id="nonexistent_run", 
task_id=TASK_ID)
         with pytest.raises(ValueError, match="No DagRun found"):
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py 
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 8fec9dd81a3..fff609bcaea 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -49,6 +49,20 @@ class AssetAliasResponse(BaseModel):
     group: Annotated[str, Field(title="Group")]
 
 
+class AssetStoreWriterKind(str, Enum):
+    """
+    Identifies what kind of writer last updated an asset store entry.
+
+    ``TASK`` — written by a task via the execution API.
+    ``WATCHER`` — written by a ``BaseEventTrigger`` (no task instance).
+    ``API`` — written directly through the Core API (e.g. manual admin write).
+    """
+
+    TASK = "task"
+    WATCHER = "watcher"
+    API = "api"
+
+
 class AssetWatcherResponse(BaseModel):
     """
     Asset watcher serializer for responses.
@@ -1326,6 +1340,18 @@ class AssetStoreBody(BaseModel):
     value: JsonValue
 
 
+class AssetStoreLastUpdatedBy(BaseModel):
+    """
+    Writer info for the last write to an asset store entry.
+    """
+
+    kind: AssetStoreWriterKind
+    dag_id: Annotated[str | None, Field(title="Dag Id")] = None
+    run_id: Annotated[str | None, Field(title="Run Id")] = None
+    task_id: Annotated[str | None, Field(title="Task Id")] = None
+    map_index: Annotated[int | None, Field(title="Map Index")] = None
+
+
 class AssetStoreResponse(BaseModel):
     """
     A single asset store key/value pair with metadata.
@@ -1334,6 +1360,7 @@ class AssetStoreResponse(BaseModel):
     key: Annotated[str, Field(title="Key")]
     value: JsonValue
     updated_at: Annotated[datetime, Field(title="Updated At")]
+    last_updated_by: AssetStoreLastUpdatedBy | None = None
 
 
 class BackfillPostBody(BaseModel):
diff --git a/shared/state/src/airflow_shared/state/__init__.py 
b/shared/state/src/airflow_shared/state/__init__.py
index 7ae5ed9c5f2..3c963acd18d 100644
--- a/shared/state/src/airflow_shared/state/__init__.py
+++ b/shared/state/src/airflow_shared/state/__init__.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 import json
 from abc import ABC, abstractmethod
 from dataclasses import dataclass
+from enum import Enum
 from typing import TYPE_CHECKING
 
 if TYPE_CHECKING:
@@ -73,6 +74,44 @@ class AssetScope:
 StoreScope = TaskScope | AssetScope
 
 
+class AssetStoreWriterKind(str, Enum):
+    """
+    Identifies what kind of writer last updated an asset store entry.
+
+    ``TASK`` — written by a task via the execution API.
+    ``WATCHER`` — written by a ``BaseEventTrigger`` (no task instance).
+    ``API`` — written directly through the Core API (e.g. manual admin write).
+    """
+
+    TASK = "task"
+    WATCHER = "watcher"
+    API = "api"
+
+    def validate_writer_fields(
+        self,
+        dag_id: str | None,
+        run_id: str | None,
+        task_id: str | None,
+        map_index: int | None,
+    ) -> None:
+        task_fields = (dag_id, run_id, task_id, map_index)
+        match self:
+            case AssetStoreWriterKind.TASK:
+                if any(f is None for f in task_fields):
+                    raise ValueError(
+                        f"kind='task' requires dag_id, run_id, task_id, and 
map_index to all be set; "
+                        f"got dag_id={dag_id!r}, run_id={run_id!r}, 
task_id={task_id!r}, map_index={map_index!r}"
+                    )
+            case AssetStoreWriterKind.WATCHER | AssetStoreWriterKind.API:
+                if any(f is not None for f in task_fields):
+                    raise ValueError(
+                        f"kind={self.value!r} must not carry task fields; "
+                        f"got dag_id={dag_id!r}, run_id={run_id!r}, 
task_id={task_id!r}, map_index={map_index!r}"
+                    )
+            case _:
+                raise AssertionError(f"Unhandled AssetStoreWriterKind: 
{self!r}")
+
+
 class BaseStoreBackend(ABC):
     """
     Abstract backend for reading and writing task and asset state.

Reply via email to