jroachgolf84 commented on code in PR #68900:
URL: https://github.com/apache/airflow/pull/68900#discussion_r3467071368


##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state_store.py:
##########
@@ -380,3 +380,66 @@ def test_get_inactive_asset_by_name_returns_404(self, 
client: TestClient, inacti
     def test_get_inactive_asset_by_uri_returns_404(self, client: TestClient, 
inactive_asset: AssetModel):
         response = client.get(_BY_URI_VALUE, params={"uri": 
inactive_asset.uri, "key": "watermark"})
         assert response.status_code == 404
+
+
+class TestPutAssetStateWatcherWrite:
+    """PUT routes record WATCHER kind when no TI backs the token 
(WATCHER/triggerer path)."""
+
+    @pytest.fixture(autouse=True)
+    def setup_null_uuid_auth(self, exec_app: FastAPI):
+        from uuid import UUID
+
+        async def _watcher_auth(request: Request) -> TIToken:
+            return TIToken(
+                id=UUID("00000000-0000-0000-0000-000000000000"), 
claims=TIClaims(scope="execution")
+            )
+
+        exec_app.dependency_overrides[require_auth] = _watcher_auth
+
+    def test_put_by_name_creates_row(self, client: TestClient, asset: 
AssetModel, session):
+        response = client.put(
+            _BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"}, 
json={"value": "2026-01-01"}
+        )
+
+        assert response.status_code == 204
+        row = session.scalar(
+            select(AssetStateStoreModel).where(
+                AssetStateStoreModel.asset_id == asset.id,
+                AssetStateStoreModel.key == "watermark",
+            )
+        )
+        assert row is not None
+        assert row.value == '"2026-01-01"'
+
+    def test_put_by_name_records_watcher_kind(self, client: TestClient, asset: 
AssetModel, session):
+        client.put(
+            _BY_NAME_VALUE, params={"name": asset.name, "key": "watermark"}, 
json={"value": "2026-01-01"}
+        )
+
+        row = session.scalar(
+            select(AssetStateStoreModel).where(
+                AssetStateStoreModel.asset_id == asset.id,
+                AssetStateStoreModel.key == "watermark",
+            )
+        )
+        assert row is not None
+        assert row.last_updated_by_kind == "watcher"
+        assert row.last_updated_by_dag_id is None
+        assert row.last_updated_by_run_id is None
+        assert row.last_updated_by_task_id is None
+        assert row.last_updated_by_map_index is None
+
+    def test_put_by_uri_records_watcher_kind(self, client: TestClient, asset: 
AssetModel, session):
+        response = client.put(
+            _BY_URI_VALUE, params={"uri": asset.uri, "key": "watermark"}, 
json={"value": "2026-06-01"}
+        )
+
+        assert response.status_code == 204
+        row = session.scalar(
+            select(AssetStateStoreModel).where(
+                AssetStateStoreModel.asset_id == asset.id,
+                AssetStateStoreModel.key == "watermark",
+            )
+        )
+        assert row is not None
+        assert row.last_updated_by_kind == "watcher"

Review Comment:
   Resolved in next commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to