This is an automated email from the ASF dual-hosted git repository.

amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f9e4e6efd10 AIP-103: Add Core API endpoints for task state and asset 
state (#67041)
f9e4e6efd10 is described below

commit f9e4e6efd10bcf99cabf5455b50574a1b64078c5
Author: Amogh Desai <[email protected]>
AuthorDate: Wed May 20 16:18:21 2026 +0530

    AIP-103: Add Core API endpoints for task state and asset state (#67041)
---
 .../api_fastapi/core_api/datamodels/asset_state.py |  44 ++
 .../api_fastapi/core_api/datamodels/task_state.py  |  45 ++
 .../core_api/openapi/v2-rest-api-generated.yaml    | 744 +++++++++++++++++++++
 .../api_fastapi/core_api/routes/public/__init__.py |   4 +
 .../core_api/routes/public/asset_state.py          | 163 +++++
 .../core_api/routes/public/task_state.py           | 210 ++++++
 .../src/airflow/ui/openapi-gen/queries/common.ts   |  44 +-
 .../ui/openapi-gen/queries/ensureQueryData.ts      |  70 +-
 .../src/airflow/ui/openapi-gen/queries/prefetch.ts |  70 +-
 .../src/airflow/ui/openapi-gen/queries/queries.ts  | 201 +++++-
 .../src/airflow/ui/openapi-gen/queries/suspense.ts |  70 +-
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts | 126 ++++
 .../ui/openapi-gen/requests/services.gen.ts        | 321 ++++++++-
 .../airflow/ui/openapi-gen/requests/types.gen.ts   | 395 +++++++++++
 .../core_api/routes/public/test_asset_state.py     | 255 +++++++
 .../core_api/routes/public/test_task_state.py      | 293 ++++++++
 .../src/airflowctl/api/datamodels/generated.py     |  61 ++
 17 files changed, 3109 insertions(+), 7 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_state.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_state.py
new file mode 100644
index 00000000000..379d7802909
--- /dev/null
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_state.py
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from pydantic import Field
+
+from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
+
+
+class AssetStateResponse(BaseModel):
+    """A single asset state key/value pair with metadata."""
+
+    key: str
+    value: str
+    updated_at: datetime
+
+
+class AssetStateCollectionResponse(BaseModel):
+    """All asset state entries for an asset."""
+
+    asset_states: list[AssetStateResponse]
+    total_entries: int
+
+
+class AssetStateBody(StrictBaseModel):
+    """Request body for setting an asset state value."""
+
+    value: str = Field(max_length=65535)
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_state.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_state.py
new file mode 100644
index 00000000000..856de74a087
--- /dev/null
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_state.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from pydantic import Field
+
+from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
+
+
+class TaskStateResponse(BaseModel):
+    """A single task state key/value pair with metadata."""
+
+    key: str
+    value: str
+    updated_at: datetime
+    expires_at: datetime | None
+
+
+class TaskStateCollectionResponse(BaseModel):
+    """All task state entries for a task instance."""
+
+    task_states: list[TaskStateResponse]
+    total_entries: int
+
+
+class TaskStateBody(StrictBaseModel):
+    """Request body for setting a task state value."""
+
+    value: str = Field(max_length=65535)
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 880860ee801..a4434151f3d 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -5529,6 +5529,649 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
+  /api/v2/assets/{asset_id}/states:
+    get:
+      tags:
+      - Asset State
+      summary: List Asset States
+      description: List all state entries for an asset.
+      operationId: list_asset_states
+      security:
+      - OAuth2PasswordBearer: []
+      - HTTPBearer: []
+      parameters:
+      - name: asset_id
+        in: path
+        required: true
+        schema:
+          type: integer
+          title: Asset Id
+      - name: limit
+        in: query
+        required: false
+        schema:
+          type: integer
+          minimum: 0
+          default: 50
+          title: Limit
+      - name: offset
+        in: query
+        required: false
+        schema:
+          type: integer
+          minimum: 0
+          default: 0
+          title: Offset
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/AssetStateCollectionResponse'
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
+    delete:
+      tags:
+      - Asset State
+      summary: Clear Asset State
+      description: Delete all state keys for an asset.
+      operationId: clear_asset_state
+      security:
+      - OAuth2PasswordBearer: []
+      - HTTPBearer: []
+      parameters:
+      - name: asset_id
+        in: path
+        required: true
+        schema:
+          type: integer
+          title: Asset Id
+      responses:
+        '204':
+          description: Successful Response
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
+  /api/v2/assets/{asset_id}/states/{key}:
+    get:
+      tags:
+      - Asset State
+      summary: Get Asset State
+      description: Get a single asset state entry.
+      operationId: get_asset_state
+      security:
+      - OAuth2PasswordBearer: []
+      - HTTPBearer: []
+      parameters:
+      - name: key
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Key
+      - name: asset_id
+        in: path
+        required: true
+        schema:
+          type: integer
+          title: Asset Id
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/AssetStateResponse'
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
+    put:
+      tags:
+      - Asset State
+      summary: Set Asset State
+      description: Set an asset state value. Creates or overwrites the key.
+      operationId: set_asset_state
+      security:
+      - OAuth2PasswordBearer: []
+      - HTTPBearer: []
+      parameters:
+      - name: key
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Key
+      - name: asset_id
+        in: path
+        required: true
+        schema:
+          type: integer
+          title: Asset Id
+      requestBody:
+        required: true
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/AssetStateBody'
+      responses:
+        '204':
+          description: Successful Response
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
+    delete:
+      tags:
+      - Asset State
+      summary: Delete Asset State
+      description: Delete a single asset state key. No-op if the key does not 
exist.
+      operationId: delete_asset_state
+      security:
+      - OAuth2PasswordBearer: []
+      - HTTPBearer: []
+      parameters:
+      - name: key
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Key
+      - name: asset_id
+        in: path
+        required: true
+        schema:
+          type: integer
+          title: Asset Id
+      responses:
+        '204':
+          description: Successful Response
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
+  /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states:
+    get:
+      tags:
+      - Task State
+      summary: List Task States
+      description: List all task state entries for a task instance.
+      operationId: list_task_states
+      security:
+      - OAuth2PasswordBearer: []
+      - HTTPBearer: []
+      parameters:
+      - name: dag_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Id
+      - name: dag_run_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Run Id
+      - name: task_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Task Id
+      - name: map_index
+        in: query
+        required: false
+        schema:
+          type: integer
+          minimum: -1
+          default: -1
+          title: Map Index
+      - name: limit
+        in: query
+        required: false
+        schema:
+          type: integer
+          minimum: 0
+          default: 50
+          title: Limit
+      - name: offset
+        in: query
+        required: false
+        schema:
+          type: integer
+          minimum: 0
+          default: 0
+          title: Offset
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/TaskStateCollectionResponse'
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
+    delete:
+      tags:
+      - Task State
+      summary: Clear Task State
+      description: 'Delete all task state keys for a task instance.
+
+
+        When ``all_map_indices=true``, state is cleared for every map index of 
the
+        task and
+
+        the ``map_index`` parameter is ignored.'
+      operationId: clear_task_state
+      security:
+      - OAuth2PasswordBearer: []
+      - HTTPBearer: []
+      parameters:
+      - name: dag_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Id
+      - name: dag_run_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Run Id
+      - name: task_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Task Id
+      - name: map_index
+        in: query
+        required: false
+        schema:
+          type: integer
+          minimum: -1
+          default: -1
+          title: Map Index
+      - name: all_map_indices
+        in: query
+        required: false
+        schema:
+          type: boolean
+          default: false
+          title: All Map Indices
+      responses:
+        '204':
+          description: Successful Response
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
+  
/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states/{key}:
+    get:
+      tags:
+      - Task State
+      summary: Get Task State
+      description: Get a single task state entry.
+      operationId: get_task_state
+      security:
+      - OAuth2PasswordBearer: []
+      - HTTPBearer: []
+      parameters:
+      - name: dag_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Id
+      - name: dag_run_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Run Id
+      - name: task_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Task Id
+      - name: key
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Key
+      - name: map_index
+        in: query
+        required: false
+        schema:
+          type: integer
+          minimum: -1
+          default: -1
+          title: Map Index
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/TaskStateResponse'
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
+    put:
+      tags:
+      - Task State
+      summary: Set Task State
+      description: Set a task state value. Creates or overwrites the key.
+      operationId: set_task_state
+      security:
+      - OAuth2PasswordBearer: []
+      - HTTPBearer: []
+      parameters:
+      - name: dag_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Id
+      - name: dag_run_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Run Id
+      - name: task_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Task Id
+      - name: key
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Key
+      - name: map_index
+        in: query
+        required: false
+        schema:
+          type: integer
+          minimum: -1
+          default: -1
+          title: Map Index
+      requestBody:
+        required: true
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/TaskStateBody'
+      responses:
+        '204':
+          description: Successful Response
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
+    delete:
+      tags:
+      - Task State
+      summary: Delete Task State
+      description: Delete a single task state key. No-op if the key does not 
exist.
+      operationId: delete_task_state
+      security:
+      - OAuth2PasswordBearer: []
+      - HTTPBearer: []
+      parameters:
+      - name: dag_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Id
+      - name: dag_run_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Run Id
+      - name: task_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Task Id
+      - name: key
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Key
+      - name: map_index
+        in: query
+        required: false
+        schema:
+          type: integer
+          minimum: -1
+          default: -1
+          title: Map Index
+      responses:
+        '204':
+          description: Successful Response
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
   
/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}:
     get:
       tags:
@@ -10667,6 +11310,53 @@ components:
       - watchers
       title: AssetResponse
       description: Asset serializer for responses.
+    AssetStateBody:
+      properties:
+        value:
+          type: string
+          maxLength: 65535
+          title: Value
+      additionalProperties: false
+      type: object
+      required:
+      - value
+      title: AssetStateBody
+      description: Request body for setting an asset state value.
+    AssetStateCollectionResponse:
+      properties:
+        asset_states:
+          items:
+            $ref: '#/components/schemas/AssetStateResponse'
+          type: array
+          title: Asset States
+        total_entries:
+          type: integer
+          title: Total Entries
+      type: object
+      required:
+      - asset_states
+      - total_entries
+      title: AssetStateCollectionResponse
+      description: All asset state entries for an asset.
+    AssetStateResponse:
+      properties:
+        key:
+          type: string
+          title: Key
+        value:
+          type: string
+          title: Value
+        updated_at:
+          type: string
+          format: date-time
+          title: Updated At
+      type: object
+      required:
+      - key
+      - value
+      - updated_at
+      title: AssetStateResponse
+      description: A single asset state key/value pair with metadata.
     AssetWatcherResponse:
       properties:
         name:
@@ -14945,6 +15635,60 @@ components:
       - extra_links
       title: TaskResponse
       description: Task serializer for responses.
+    TaskStateBody:
+      properties:
+        value:
+          type: string
+          maxLength: 65535
+          title: Value
+      additionalProperties: false
+      type: object
+      required:
+      - value
+      title: TaskStateBody
+      description: Request body for setting a task state value.
+    TaskStateCollectionResponse:
+      properties:
+        task_states:
+          items:
+            $ref: '#/components/schemas/TaskStateResponse'
+          type: array
+          title: Task States
+        total_entries:
+          type: integer
+          title: Total Entries
+      type: object
+      required:
+      - task_states
+      - total_entries
+      title: TaskStateCollectionResponse
+      description: All task state entries for a task instance.
+    TaskStateResponse:
+      properties:
+        key:
+          type: string
+          title: Key
+        value:
+          type: string
+          title: Value
+        updated_at:
+          type: string
+          format: date-time
+          title: Updated At
+        expires_at:
+          anyOf:
+          - type: string
+            format: date-time
+          - type: 'null'
+          title: Expires At
+      type: object
+      required:
+      - key
+      - value
+      - updated_at
+      - expires_at
+      title: TaskStateResponse
+      description: A single task state key/value pair with metadata.
     TimeDelta:
       properties:
         __type:
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py
index b590424de4e..4e6ebba1178 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py
@@ -21,6 +21,7 @@ from fastapi import Depends, status
 
 from airflow.api_fastapi.common.router import AirflowRouter
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.routes.public.asset_state import 
asset_state_router
 from airflow.api_fastapi.core_api.routes.public.assets import assets_router
 from airflow.api_fastapi.core_api.routes.public.auth import auth_router
 from airflow.api_fastapi.core_api.routes.public.backfills import 
backfills_router
@@ -45,6 +46,7 @@ from airflow.api_fastapi.core_api.routes.public.plugins 
import plugins_router
 from airflow.api_fastapi.core_api.routes.public.pools import pools_router
 from airflow.api_fastapi.core_api.routes.public.providers import 
providers_router
 from airflow.api_fastapi.core_api.routes.public.task_instances import 
task_instances_router
+from airflow.api_fastapi.core_api.routes.public.task_state import 
task_state_router
 from airflow.api_fastapi.core_api.routes.public.tasks import tasks_router
 from airflow.api_fastapi.core_api.routes.public.variables import 
variables_router
 from airflow.api_fastapi.core_api.routes.public.version import version_router
@@ -79,6 +81,8 @@ authenticated_router.include_router(job_router)
 authenticated_router.include_router(plugins_router)
 authenticated_router.include_router(pools_router)
 authenticated_router.include_router(providers_router)
+authenticated_router.include_router(asset_state_router)
+authenticated_router.include_router(task_state_router)
 authenticated_router.include_router(xcom_router)
 authenticated_router.include_router(task_instances_router)
 authenticated_router.include_router(tasks_router)
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py
new file mode 100644
index 00000000000..43580e0c762
--- /dev/null
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, status
+from sqlalchemy import select
+
+from airflow._shared.state import AssetScope
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.asset_state import (
+    AssetStateBody,
+    AssetStateCollectionResponse,
+    AssetStateResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.security import requires_access_asset
+from airflow.models.asset import AssetModel
+from airflow.models.asset_state import AssetStateModel
+from airflow.state import get_state_backend
+
+asset_state_router = AirflowRouter(
+    tags=["Asset State"],
+    prefix="/assets/{asset_id}/states",
+)
+
+
+def _get_asset_or_404(asset_id: int, session: SessionDep) -> int:
+    exists = session.scalar(select(AssetModel.id).where(AssetModel.id == 
asset_id))
+    if exists is None:
+        raise HTTPException(
+            status_code=status.HTTP_404_NOT_FOUND,
+            detail=f"Asset with id {asset_id!r} not found",
+        )
+    return asset_id
+
+
+AssetIdDep = Annotated[int, Depends(_get_asset_or_404)]
+
+
+@asset_state_router.get(
+    "",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_asset(method="GET"))],
+)
+def list_asset_states(
+    asset_id: AssetIdDep,
+    limit: QueryLimit,
+    offset: QueryOffset,
+    session: SessionDep,
+) -> AssetStateCollectionResponse:
+    """List all state entries for an asset."""
+    base = (
+        select(
+            AssetStateModel.key,
+            AssetStateModel.value,
+            AssetStateModel.updated_at,
+        )
+        .where(AssetStateModel.asset_id == asset_id)
+        .order_by(AssetStateModel.key.asc())
+    )
+    paginated, total_entries = paginated_select(
+        statement=base,
+        filters=None,
+        order_by=None,
+        offset=offset,
+        limit=limit,
+        session=session,
+    )
+    rows = session.execute(paginated).all()
+    entries = [AssetStateResponse(key=r.key, value=r.value, 
updated_at=r.updated_at) for r in rows]
+    return AssetStateCollectionResponse(asset_states=entries, 
total_entries=total_entries)
+
+
+@asset_state_router.get(
+    "/{key:path}",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_asset(method="GET"))],
+)
+def get_asset_state(
+    asset_id: AssetIdDep,
+    key: str,
+    session: SessionDep,
+) -> AssetStateResponse:
+    """Get a single asset state entry."""
+    row = session.execute(
+        select(
+            AssetStateModel.key,
+            AssetStateModel.value,
+            AssetStateModel.updated_at,
+        ).where(
+            AssetStateModel.asset_id == asset_id,
+            AssetStateModel.key == key,
+        )
+    ).one_or_none()
+    if row is None:
+        raise HTTPException(
+            status_code=status.HTTP_404_NOT_FOUND,
+            detail=f"Asset state key {key!r} not found",
+        )
+    return AssetStateResponse(key=row.key, value=row.value, 
updated_at=row.updated_at)
+
+
+@asset_state_router.put(
+    "/{key:path}",
+    status_code=status.HTTP_204_NO_CONTENT,
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_asset(method="PUT"))],
+)
+def set_asset_state(
+    asset_id: AssetIdDep,
+    key: str,
+    body: AssetStateBody,
+    session: SessionDep,
+) -> None:
+    """Set an asset state value. Creates or overwrites the key."""
+    get_state_backend().set(AssetScope(asset_id=asset_id), key, body.value, 
session=session)
+
+
+@asset_state_router.delete(
+    "/{key:path}",
+    status_code=status.HTTP_204_NO_CONTENT,
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_asset(method="DELETE"))],
+)
+def delete_asset_state(
+    asset_id: AssetIdDep,
+    key: str,
+    session: SessionDep,
+) -> None:
+    """Delete a single asset state key. No-op if the key does not exist."""
+    get_state_backend().delete(AssetScope(asset_id=asset_id), key, 
session=session)
+
+
+@asset_state_router.delete(
+    "",
+    status_code=status.HTTP_204_NO_CONTENT,
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_asset(method="DELETE"))],
+)
+def clear_asset_state(
+    asset_id: AssetIdDep,
+    session: SessionDep,
+) -> None:
+    """Delete all state keys for an asset."""
+    get_state_backend().clear(AssetScope(asset_id=asset_id), session=session)
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py
new file mode 100644
index 00000000000..138380232a8
--- /dev/null
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py
@@ -0,0 +1,210 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, Query, status
+from sqlalchemy import select
+
+from airflow._shared.state import TaskScope
+from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.task_state import (
+    TaskStateBody,
+    TaskStateCollectionResponse,
+    TaskStateResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.security import requires_access_dag
+from airflow.models.task_state import TaskStateModel
+from airflow.models.taskinstance import TaskInstance as TI
+from airflow.state import get_state_backend
+
+task_state_router = AirflowRouter(
+    tags=["Task State"],
+    
prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states",
+)
+
+
+def _get_scope(dag_id: str, dag_run_id: str, task_id: str, map_index: int) -> 
TaskScope:
+    return TaskScope(dag_id=dag_id, run_id=dag_run_id, task_id=task_id, 
map_index=map_index)
+
+
+@task_state_router.get(
+    "",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_dag(method="GET", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def list_task_states(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    limit: QueryLimit,
+    offset: QueryOffset,
+    session: SessionDep,
+    map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> TaskStateCollectionResponse:
+    """List all task state entries for a task instance."""
+    base = (
+        select(
+            TaskStateModel.key,
+            TaskStateModel.value,
+            TaskStateModel.updated_at,
+            TaskStateModel.expires_at,
+        )
+        .where(
+            TaskStateModel.dag_id == dag_id,
+            TaskStateModel.run_id == dag_run_id,
+            TaskStateModel.task_id == task_id,
+            TaskStateModel.map_index == map_index,
+        )
+        .order_by(TaskStateModel.key.asc())
+    )
+    paginated, total_entries = paginated_select(
+        statement=base,
+        filters=None,
+        order_by=None,
+        offset=offset,
+        limit=limit,
+        session=session,
+    )
+    rows = session.execute(paginated).all()
+    entries = [
+        TaskStateResponse(key=r.key, value=r.value, updated_at=r.updated_at, 
expires_at=r.expires_at)
+        for r in rows
+    ]
+    return TaskStateCollectionResponse(task_states=entries, 
total_entries=total_entries)
+
+
+@task_state_router.get(
+    "/{key:path}",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_dag(method="GET", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def get_task_state(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    key: str,
+    session: SessionDep,
+    map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> TaskStateResponse:
+    """Get a single task state entry."""
+    row = session.execute(
+        select(
+            TaskStateModel.key,
+            TaskStateModel.value,
+            TaskStateModel.updated_at,
+            TaskStateModel.expires_at,
+        ).where(
+            TaskStateModel.dag_id == dag_id,
+            TaskStateModel.run_id == dag_run_id,
+            TaskStateModel.task_id == task_id,
+            TaskStateModel.map_index == map_index,
+            TaskStateModel.key == key,
+        )
+    ).one_or_none()
+    if row is None:
+        raise HTTPException(
+            status_code=status.HTTP_404_NOT_FOUND,
+            detail=f"Task state key {key!r} not found",
+        )
+    return TaskStateResponse(
+        key=row.key, value=row.value, updated_at=row.updated_at, 
expires_at=row.expires_at
+    )
+
+
+@task_state_router.put(
+    "/{key:path}",
+    status_code=status.HTTP_204_NO_CONTENT,
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_dag(method="PUT", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def set_task_state(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    key: str,
+    body: TaskStateBody,
+    session: SessionDep,
+    map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> None:
+    """Set a task state value. Creates or overwrites the key."""
+    ti_exists = session.scalar(
+        select(TI.task_id).where(
+            TI.dag_id == dag_id,
+            TI.run_id == dag_run_id,
+            TI.task_id == task_id,
+            TI.map_index == map_index,
+        )
+    )
+    if ti_exists is None:
+        raise HTTPException(
+            status_code=status.HTTP_404_NOT_FOUND,
+            detail=f"Task instance not found for dag_id={dag_id!r}, 
run_id={dag_run_id!r}, task_id={task_id!r}, map_index={map_index}",
+        )
+    scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
+    try:
+        get_state_backend().set(scope, key, body.value, session=session)
+    except ValueError as e:
+        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, 
detail=str(e)) from e
+
+
+@task_state_router.delete(
+    "/{key:path}",
+    status_code=status.HTTP_204_NO_CONTENT,
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_dag(method="DELETE", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def delete_task_state(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    key: str,
+    session: SessionDep,
+    map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> None:
+    """Delete a single task state key. No-op if the key does not exist."""
+    scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
+    get_state_backend().delete(scope, key, session=session)
+
+
+@task_state_router.delete(
+    "",
+    status_code=status.HTTP_204_NO_CONTENT,
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_dag(method="DELETE", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def clear_task_state(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    session: SessionDep,
+    map_index: Annotated[int, Query(ge=-1)] = -1,
+    all_map_indices: Annotated[bool, Query()] = False,
+) -> None:
+    """
+    Delete all task state keys for a task instance.
+
+    When ``all_map_indices=true``, state is cleared for every map index of the 
task and
+    the ``map_index`` parameter is ignored.
+    """
+    scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
+    get_state_backend().clear(scope, all_map_indices=all_map_indices, 
session=session)
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
index 127b09e8761..812a41d9d08 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
@@ -1,7 +1,7 @@
 // generated with @7nohe/[email protected] 
 
 import { UseQueryResult } from "@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService, 
ConfigService, ConnectionService, DagParsingService, DagRunService, DagService, 
DagSourceService, DagStatsService, DagVersionService, DagWarningService, 
DashboardService, DeadlinesService, DependenciesService, EventLogService, 
ExperimentalService, ExtraLinksService, GanttService, GridService, 
ImportErrorService, JobService, LoginService, MonitorService, 
PartitionedDagRunService, PluginService, PoolService, Provide [...]
+import { AssetService, AssetStateService, AuthLinksService, BackfillService, 
CalendarService, ConfigService, ConnectionService, DagParsingService, 
DagRunService, DagService, DagSourceService, DagStatsService, 
DagVersionService, DagWarningService, DashboardService, DeadlinesService, 
DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, 
GanttService, GridService, ImportErrorService, JobService, LoginService, 
MonitorService, PartitionedDagRunService, PluginService, P [...]
 import { DagRunState, DagWarningType } from "../requests/types.gen";
 export type AssetServiceGetAssetsDefaultResponse = Awaited<ReturnType<typeof 
AssetService.getAssets>>;
 export type AssetServiceGetAssetsQueryResult<TData = 
AssetServiceGetAssetsDefaultResponse, TError = unknown> = UseQueryResult<TData, 
TError>;
@@ -727,6 +727,42 @@ export const UseProviderServiceGetProvidersKeyFn = ({ 
limit, offset }: {
   limit?: number;
   offset?: number;
 } = {}, queryKey?: Array<unknown>) => [useProviderServiceGetProvidersKey, 
...(queryKey ?? [{ limit, offset }])];
+export type AssetStateServiceListAssetStatesDefaultResponse = 
Awaited<ReturnType<typeof AssetStateService.listAssetStates>>;
+export type AssetStateServiceListAssetStatesQueryResult<TData = 
AssetStateServiceListAssetStatesDefaultResponse, TError = unknown> = 
UseQueryResult<TData, TError>;
+export const useAssetStateServiceListAssetStatesKey = 
"AssetStateServiceListAssetStates";
+export const UseAssetStateServiceListAssetStatesKeyFn = ({ assetId, limit, 
offset }: {
+  assetId: number;
+  limit?: number;
+  offset?: number;
+}, queryKey?: Array<unknown>) => [useAssetStateServiceListAssetStatesKey, 
...(queryKey ?? [{ assetId, limit, offset }])];
+export type AssetStateServiceGetAssetStateDefaultResponse = 
Awaited<ReturnType<typeof AssetStateService.getAssetState>>;
+export type AssetStateServiceGetAssetStateQueryResult<TData = 
AssetStateServiceGetAssetStateDefaultResponse, TError = unknown> = 
UseQueryResult<TData, TError>;
+export const useAssetStateServiceGetAssetStateKey = 
"AssetStateServiceGetAssetState";
+export const UseAssetStateServiceGetAssetStateKeyFn = ({ assetId, key }: {
+  assetId: number;
+  key: string;
+}, queryKey?: Array<unknown>) => [useAssetStateServiceGetAssetStateKey, 
...(queryKey ?? [{ assetId, key }])];
+export type TaskStateServiceListTaskStatesDefaultResponse = 
Awaited<ReturnType<typeof TaskStateService.listTaskStates>>;
+export type TaskStateServiceListTaskStatesQueryResult<TData = 
TaskStateServiceListTaskStatesDefaultResponse, TError = unknown> = 
UseQueryResult<TData, TError>;
+export const useTaskStateServiceListTaskStatesKey = 
"TaskStateServiceListTaskStates";
+export const UseTaskStateServiceListTaskStatesKeyFn = ({ dagId, dagRunId, 
limit, mapIndex, offset, taskId }: {
+  dagId: string;
+  dagRunId: string;
+  limit?: number;
+  mapIndex?: number;
+  offset?: number;
+  taskId: string;
+}, queryKey?: Array<unknown>) => [useTaskStateServiceListTaskStatesKey, 
...(queryKey ?? [{ dagId, dagRunId, limit, mapIndex, offset, taskId }])];
+export type TaskStateServiceGetTaskStateDefaultResponse = 
Awaited<ReturnType<typeof TaskStateService.getTaskState>>;
+export type TaskStateServiceGetTaskStateQueryResult<TData = 
TaskStateServiceGetTaskStateDefaultResponse, TError = unknown> = 
UseQueryResult<TData, TError>;
+export const useTaskStateServiceGetTaskStateKey = 
"TaskStateServiceGetTaskState";
+export const UseTaskStateServiceGetTaskStateKeyFn = ({ dagId, dagRunId, key, 
mapIndex, taskId }: {
+  dagId: string;
+  dagRunId: string;
+  key: string;
+  mapIndex?: number;
+  taskId: string;
+}, queryKey?: Array<unknown>) => [useTaskStateServiceGetTaskStateKey, 
...(queryKey ?? [{ dagId, dagRunId, key, mapIndex, taskId }])];
 export type XcomServiceGetXcomEntryDefaultResponse = Awaited<ReturnType<typeof 
XcomService.getXcomEntry>>;
 export type XcomServiceGetXcomEntryQueryResult<TData = 
XcomServiceGetXcomEntryDefaultResponse, TError = unknown> = 
UseQueryResult<TData, TError>;
 export const useXcomServiceGetXcomEntryKey = "XcomServiceGetXcomEntry";
@@ -1012,6 +1048,8 @@ export type AuthLinksServiceGenerateTokenMutationResult = 
Awaited<ReturnType<typ
 export type BackfillServicePauseBackfillMutationResult = 
Awaited<ReturnType<typeof BackfillService.pauseBackfill>>;
 export type BackfillServiceUnpauseBackfillMutationResult = 
Awaited<ReturnType<typeof BackfillService.unpauseBackfill>>;
 export type BackfillServiceCancelBackfillMutationResult = 
Awaited<ReturnType<typeof BackfillService.cancelBackfill>>;
+export type AssetStateServiceSetAssetStateMutationResult = 
Awaited<ReturnType<typeof AssetStateService.setAssetState>>;
+export type TaskStateServiceSetTaskStateMutationResult = 
Awaited<ReturnType<typeof TaskStateService.setTaskState>>;
 export type DagParsingServiceReparseDagFileMutationResult = 
Awaited<ReturnType<typeof DagParsingService.reparseDagFile>>;
 export type ConnectionServicePatchConnectionMutationResult = 
Awaited<ReturnType<typeof ConnectionService.patchConnection>>;
 export type ConnectionServiceBulkConnectionsMutationResult = 
Awaited<ReturnType<typeof ConnectionService.bulkConnections>>;
@@ -1039,5 +1077,9 @@ export type DagRunServiceDeleteDagRunMutationResult = 
Awaited<ReturnType<typeof
 export type DagServiceDeleteDagMutationResult = Awaited<ReturnType<typeof 
DagService.deleteDag>>;
 export type TaskInstanceServiceDeleteTaskInstanceMutationResult = 
Awaited<ReturnType<typeof TaskInstanceService.deleteTaskInstance>>;
 export type PoolServiceDeletePoolMutationResult = Awaited<ReturnType<typeof 
PoolService.deletePool>>;
+export type AssetStateServiceClearAssetStateMutationResult = 
Awaited<ReturnType<typeof AssetStateService.clearAssetState>>;
+export type AssetStateServiceDeleteAssetStateMutationResult = 
Awaited<ReturnType<typeof AssetStateService.deleteAssetState>>;
+export type TaskStateServiceClearTaskStateMutationResult = 
Awaited<ReturnType<typeof TaskStateService.clearTaskState>>;
+export type TaskStateServiceDeleteTaskStateMutationResult = 
Awaited<ReturnType<typeof TaskStateService.deleteTaskState>>;
 export type XcomServiceDeleteXcomEntryMutationResult = 
Awaited<ReturnType<typeof XcomService.deleteXcomEntry>>;
 export type VariableServiceDeleteVariableMutationResult = 
Awaited<ReturnType<typeof VariableService.deleteVariable>>;
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
index 835c1c46dbf..32d74fed6e9 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
@@ -1,7 +1,7 @@
 // generated with @7nohe/[email protected] 
 
 import { type QueryClient } from "@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService, 
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService, 
DagStatsService, DagVersionService, DagWarningService, DashboardService, 
DeadlinesService, DependenciesService, EventLogService, ExperimentalService, 
ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, 
LoginService, MonitorService, PartitionedDagRunService, PluginService, 
PoolService, ProviderService, Structure [...]
+import { AssetService, AssetStateService, AuthLinksService, BackfillService, 
CalendarService, ConfigService, ConnectionService, DagRunService, DagService, 
DagSourceService, DagStatsService, DagVersionService, DagWarningService, 
DashboardService, DeadlinesService, DependenciesService, EventLogService, 
ExperimentalService, ExtraLinksService, GanttService, GridService, 
ImportErrorService, JobService, LoginService, MonitorService, 
PartitionedDagRunService, PluginService, PoolService, Provide [...]
 import { DagRunState, DagWarningType } from "../requests/types.gen";
 import * as Common from "./common";
 /**
@@ -1476,6 +1476,74 @@ export const ensureUseProviderServiceGetProvidersData = 
(queryClient: QueryClien
   offset?: number;
 } = {}) => queryClient.ensureQueryData({ queryKey: 
Common.UseProviderServiceGetProvidersKeyFn({ limit, offset }), queryFn: () => 
ProviderService.getProviders({ limit, offset }) });
 /**
+* List Asset States
+* List all state entries for an asset.
+* @param data The data for the request.
+* @param data.assetId
+* @param data.limit
+* @param data.offset
+* @returns AssetStateCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const ensureUseAssetStateServiceListAssetStatesData = (queryClient: 
QueryClient, { assetId, limit, offset }: {
+  assetId: number;
+  limit?: number;
+  offset?: number;
+}) => queryClient.ensureQueryData({ queryKey: 
Common.UseAssetStateServiceListAssetStatesKeyFn({ assetId, limit, offset }), 
queryFn: () => AssetStateService.listAssetStates({ assetId, limit, offset }) });
+/**
+* Get Asset State
+* Get a single asset state entry.
+* @param data The data for the request.
+* @param data.key
+* @param data.assetId
+* @returns AssetStateResponse Successful Response
+* @throws ApiError
+*/
+export const ensureUseAssetStateServiceGetAssetStateData = (queryClient: 
QueryClient, { assetId, key }: {
+  assetId: number;
+  key: string;
+}) => queryClient.ensureQueryData({ queryKey: 
Common.UseAssetStateServiceGetAssetStateKeyFn({ assetId, key }), queryFn: () => 
AssetStateService.getAssetState({ assetId, key }) });
+/**
+* List Task States
+* List all task state entries for a task instance.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.mapIndex
+* @param data.limit
+* @param data.offset
+* @returns TaskStateCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const ensureUseTaskStateServiceListTaskStatesData = (queryClient: 
QueryClient, { dagId, dagRunId, limit, mapIndex, offset, taskId }: {
+  dagId: string;
+  dagRunId: string;
+  limit?: number;
+  mapIndex?: number;
+  offset?: number;
+  taskId: string;
+}) => queryClient.ensureQueryData({ queryKey: 
Common.UseTaskStateServiceListTaskStatesKeyFn({ dagId, dagRunId, limit, 
mapIndex, offset, taskId }), queryFn: () => TaskStateService.listTaskStates({ 
dagId, dagRunId, limit, mapIndex, offset, taskId }) });
+/**
+* Get Task State
+* Get a single task state entry.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.key
+* @param data.mapIndex
+* @returns TaskStateResponse Successful Response
+* @throws ApiError
+*/
+export const ensureUseTaskStateServiceGetTaskStateData = (queryClient: 
QueryClient, { dagId, dagRunId, key, mapIndex, taskId }: {
+  dagId: string;
+  dagRunId: string;
+  key: string;
+  mapIndex?: number;
+  taskId: string;
+}) => queryClient.ensureQueryData({ queryKey: 
Common.UseTaskStateServiceGetTaskStateKeyFn({ dagId, dagRunId, key, mapIndex, 
taskId }), queryFn: () => TaskStateService.getTaskState({ dagId, dagRunId, key, 
mapIndex, taskId }) });
+/**
 * Get Xcom Entry
 * Get an XCom entry.
 * @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
index 06e09a390f4..ae012539160 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -1,7 +1,7 @@
 // generated with @7nohe/[email protected] 
 
 import { type QueryClient } from "@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService, 
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService, 
DagStatsService, DagVersionService, DagWarningService, DashboardService, 
DeadlinesService, DependenciesService, EventLogService, ExperimentalService, 
ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, 
LoginService, MonitorService, PartitionedDagRunService, PluginService, 
PoolService, ProviderService, Structure [...]
+import { AssetService, AssetStateService, AuthLinksService, BackfillService, 
CalendarService, ConfigService, ConnectionService, DagRunService, DagService, 
DagSourceService, DagStatsService, DagVersionService, DagWarningService, 
DashboardService, DeadlinesService, DependenciesService, EventLogService, 
ExperimentalService, ExtraLinksService, GanttService, GridService, 
ImportErrorService, JobService, LoginService, MonitorService, 
PartitionedDagRunService, PluginService, PoolService, Provide [...]
 import { DagRunState, DagWarningType } from "../requests/types.gen";
 import * as Common from "./common";
 /**
@@ -1476,6 +1476,74 @@ export const prefetchUseProviderServiceGetProviders = 
(queryClient: QueryClient,
   offset?: number;
 } = {}) => queryClient.prefetchQuery({ queryKey: 
Common.UseProviderServiceGetProvidersKeyFn({ limit, offset }), queryFn: () => 
ProviderService.getProviders({ limit, offset }) });
 /**
+* List Asset States
+* List all state entries for an asset.
+* @param data The data for the request.
+* @param data.assetId
+* @param data.limit
+* @param data.offset
+* @returns AssetStateCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const prefetchUseAssetStateServiceListAssetStates = (queryClient: 
QueryClient, { assetId, limit, offset }: {
+  assetId: number;
+  limit?: number;
+  offset?: number;
+}) => queryClient.prefetchQuery({ queryKey: 
Common.UseAssetStateServiceListAssetStatesKeyFn({ assetId, limit, offset }), 
queryFn: () => AssetStateService.listAssetStates({ assetId, limit, offset }) });
+/**
+* Get Asset State
+* Get a single asset state entry.
+* @param data The data for the request.
+* @param data.key
+* @param data.assetId
+* @returns AssetStateResponse Successful Response
+* @throws ApiError
+*/
+export const prefetchUseAssetStateServiceGetAssetState = (queryClient: 
QueryClient, { assetId, key }: {
+  assetId: number;
+  key: string;
+}) => queryClient.prefetchQuery({ queryKey: 
Common.UseAssetStateServiceGetAssetStateKeyFn({ assetId, key }), queryFn: () => 
AssetStateService.getAssetState({ assetId, key }) });
+/**
+* List Task States
+* List all task state entries for a task instance.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.mapIndex
+* @param data.limit
+* @param data.offset
+* @returns TaskStateCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const prefetchUseTaskStateServiceListTaskStates = (queryClient: 
QueryClient, { dagId, dagRunId, limit, mapIndex, offset, taskId }: {
+  dagId: string;
+  dagRunId: string;
+  limit?: number;
+  mapIndex?: number;
+  offset?: number;
+  taskId: string;
+}) => queryClient.prefetchQuery({ queryKey: 
Common.UseTaskStateServiceListTaskStatesKeyFn({ dagId, dagRunId, limit, 
mapIndex, offset, taskId }), queryFn: () => TaskStateService.listTaskStates({ 
dagId, dagRunId, limit, mapIndex, offset, taskId }) });
+/**
+* Get Task State
+* Get a single task state entry.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.key
+* @param data.mapIndex
+* @returns TaskStateResponse Successful Response
+* @throws ApiError
+*/
+export const prefetchUseTaskStateServiceGetTaskState = (queryClient: 
QueryClient, { dagId, dagRunId, key, mapIndex, taskId }: {
+  dagId: string;
+  dagRunId: string;
+  key: string;
+  mapIndex?: number;
+  taskId: string;
+}) => queryClient.prefetchQuery({ queryKey: 
Common.UseTaskStateServiceGetTaskStateKeyFn({ dagId, dagRunId, key, mapIndex, 
taskId }), queryFn: () => TaskStateService.getTaskState({ dagId, dagRunId, key, 
mapIndex, taskId }) });
+/**
 * Get Xcom Entry
 * Get an XCom entry.
 * @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index fa3a1faf0f9..17ee1009ae0 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -1,8 +1,8 @@
 // generated with @7nohe/[email protected] 
 
 import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from 
"@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService, 
ConfigService, ConnectionService, DagParsingService, DagRunService, DagService, 
DagSourceService, DagStatsService, DagVersionService, DagWarningService, 
DashboardService, DeadlinesService, DependenciesService, EventLogService, 
ExperimentalService, ExtraLinksService, GanttService, GridService, 
ImportErrorService, JobService, LoginService, MonitorService, 
PartitionedDagRunService, PluginService, PoolService, Provide [...]
-import { BackfillPostBody, BulkBody_BulkTaskInstanceBody_, 
BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, 
ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, 
DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, 
DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody, 
PoolBody, PoolPatchBody, TaskInstancesBatchBody, TriggerDAGRunPostBody, 
UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody [...]
+import { AssetService, AssetStateService, AuthLinksService, BackfillService, 
CalendarService, ConfigService, ConnectionService, DagParsingService, 
DagRunService, DagService, DagSourceService, DagStatsService, 
DagVersionService, DagWarningService, DashboardService, DeadlinesService, 
DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, 
GanttService, GridService, ImportErrorService, JobService, LoginService, 
MonitorService, PartitionedDagRunService, PluginService, P [...]
+import { AssetStateBody, BackfillPostBody, BulkBody_BulkTaskInstanceBody_, 
BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, 
ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, 
DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, 
DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody, 
PoolBody, PoolPatchBody, TaskInstancesBatchBody, TaskStateBody, 
TriggerDAGRunPostBody, UpdateHITLDetailPayload, VariableBody, [...]
 import * as Common from "./common";
 /**
 * Get Assets
@@ -1476,6 +1476,74 @@ export const useProviderServiceGetProviders = <TData = 
Common.ProviderServiceGet
   offset?: number;
 } = {}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: 
Common.UseProviderServiceGetProvidersKeyFn({ limit, offset }, queryKey), 
queryFn: () => ProviderService.getProviders({ limit, offset }) as TData, 
...options });
 /**
+* List Asset States
+* List all state entries for an asset.
+* @param data The data for the request.
+* @param data.assetId
+* @param data.limit
+* @param data.offset
+* @returns AssetStateCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const useAssetStateServiceListAssetStates = <TData = 
Common.AssetStateServiceListAssetStatesDefaultResponse, TError = unknown, 
TQueryKey extends Array<unknown> = unknown[]>({ assetId, limit, offset }: {
+  assetId: number;
+  limit?: number;
+  offset?: number;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: 
Common.UseAssetStateServiceListAssetStatesKeyFn({ assetId, limit, offset }, 
queryKey), queryFn: () => AssetStateService.listAssetStates({ assetId, limit, 
offset }) as TData, ...options });
+/**
+* Get Asset State
+* Get a single asset state entry.
+* @param data The data for the request.
+* @param data.key
+* @param data.assetId
+* @returns AssetStateResponse Successful Response
+* @throws ApiError
+*/
+export const useAssetStateServiceGetAssetState = <TData = 
Common.AssetStateServiceGetAssetStateDefaultResponse, TError = unknown, 
TQueryKey extends Array<unknown> = unknown[]>({ assetId, key }: {
+  assetId: number;
+  key: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: 
Common.UseAssetStateServiceGetAssetStateKeyFn({ assetId, key }, queryKey), 
queryFn: () => AssetStateService.getAssetState({ assetId, key }) as TData, 
...options });
+/**
+* List Task States
+* List all task state entries for a task instance.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.mapIndex
+* @param data.limit
+* @param data.offset
+* @returns TaskStateCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const useTaskStateServiceListTaskStates = <TData = 
Common.TaskStateServiceListTaskStatesDefaultResponse, TError = unknown, 
TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, limit, 
mapIndex, offset, taskId }: {
+  dagId: string;
+  dagRunId: string;
+  limit?: number;
+  mapIndex?: number;
+  offset?: number;
+  taskId: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: 
Common.UseTaskStateServiceListTaskStatesKeyFn({ dagId, dagRunId, limit, 
mapIndex, offset, taskId }, queryKey), queryFn: () => 
TaskStateService.listTaskStates({ dagId, dagRunId, limit, mapIndex, offset, 
taskId }) as TData, ...options });
+/**
+* Get Task State
+* Get a single task state entry.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.key
+* @param data.mapIndex
+* @returns TaskStateResponse Successful Response
+* @throws ApiError
+*/
+export const useTaskStateServiceGetTaskState = <TData = 
Common.TaskStateServiceGetTaskStateDefaultResponse, TError = unknown, TQueryKey 
extends Array<unknown> = unknown[]>({ dagId, dagRunId, key, mapIndex, taskId }: 
{
+  dagId: string;
+  dagRunId: string;
+  key: string;
+  mapIndex?: number;
+  taskId: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: 
Common.UseTaskStateServiceGetTaskStateKeyFn({ dagId, dagRunId, key, mapIndex, 
taskId }, queryKey), queryFn: () => TaskStateService.getTaskState({ dagId, 
dagRunId, key, mapIndex, taskId }) as TData, ...options });
+/**
 * Get Xcom Entry
 * Get an XCom entry.
 * @param data The data for the request.
@@ -2300,6 +2368,53 @@ export const useBackfillServiceCancelBackfill = <TData = 
Common.BackfillServiceC
   backfillId: number;
 }, TContext>({ mutationFn: ({ backfillId }) => 
BackfillService.cancelBackfill({ backfillId }) as unknown as Promise<TData>, 
...options });
 /**
+* Set Asset State
+* Set an asset state value. Creates or overwrites the key.
+* @param data The data for the request.
+* @param data.key
+* @param data.assetId
+* @param data.requestBody
+* @returns void Successful Response
+* @throws ApiError
+*/
+export const useAssetStateServiceSetAssetState = <TData = 
Common.AssetStateServiceSetAssetStateMutationResult, TError = unknown, TContext 
= unknown>(options?: Omit<UseMutationOptions<TData, TError, {
+  assetId: number;
+  key: string;
+  requestBody: AssetStateBody;
+}, TContext>, "mutationFn">) => useMutation<TData, TError, {
+  assetId: number;
+  key: string;
+  requestBody: AssetStateBody;
+}, TContext>({ mutationFn: ({ assetId, key, requestBody }) => 
AssetStateService.setAssetState({ assetId, key, requestBody }) as unknown as 
Promise<TData>, ...options });
+/**
+* Set Task State
+* Set a task state value. Creates or overwrites the key.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.key
+* @param data.requestBody
+* @param data.mapIndex
+* @returns void Successful Response
+* @throws ApiError
+*/
+export const useTaskStateServiceSetTaskState = <TData = 
Common.TaskStateServiceSetTaskStateMutationResult, TError = unknown, TContext = 
unknown>(options?: Omit<UseMutationOptions<TData, TError, {
+  dagId: string;
+  dagRunId: string;
+  key: string;
+  mapIndex?: number;
+  requestBody: TaskStateBody;
+  taskId: string;
+}, TContext>, "mutationFn">) => useMutation<TData, TError, {
+  dagId: string;
+  dagRunId: string;
+  key: string;
+  mapIndex?: number;
+  requestBody: TaskStateBody;
+  taskId: string;
+}, TContext>({ mutationFn: ({ dagId, dagRunId, key, mapIndex, requestBody, 
taskId }) => TaskStateService.setTaskState({ dagId, dagRunId, key, mapIndex, 
requestBody, taskId }) as unknown as Promise<TData>, ...options });
+/**
 * Reparse Dag File
 * Request re-parsing a Dag file.
 * @param data The data for the request.
@@ -2854,6 +2969,88 @@ export const usePoolServiceDeletePool = <TData = 
Common.PoolServiceDeletePoolMut
   poolName: string;
 }, TContext>({ mutationFn: ({ poolName }) => PoolService.deletePool({ poolName 
}) as unknown as Promise<TData>, ...options });
 /**
+* Clear Asset State
+* Delete all state keys for an asset.
+* @param data The data for the request.
+* @param data.assetId
+* @returns void Successful Response
+* @throws ApiError
+*/
+export const useAssetStateServiceClearAssetState = <TData = 
Common.AssetStateServiceClearAssetStateMutationResult, TError = unknown, 
TContext = unknown>(options?: Omit<UseMutationOptions<TData, TError, {
+  assetId: number;
+}, TContext>, "mutationFn">) => useMutation<TData, TError, {
+  assetId: number;
+}, TContext>({ mutationFn: ({ assetId }) => 
AssetStateService.clearAssetState({ assetId }) as unknown as Promise<TData>, 
...options });
+/**
+* Delete Asset State
+* Delete a single asset state key. No-op if the key does not exist.
+* @param data The data for the request.
+* @param data.key
+* @param data.assetId
+* @returns void Successful Response
+* @throws ApiError
+*/
+export const useAssetStateServiceDeleteAssetState = <TData = 
Common.AssetStateServiceDeleteAssetStateMutationResult, TError = unknown, 
TContext = unknown>(options?: Omit<UseMutationOptions<TData, TError, {
+  assetId: number;
+  key: string;
+}, TContext>, "mutationFn">) => useMutation<TData, TError, {
+  assetId: number;
+  key: string;
+}, TContext>({ mutationFn: ({ assetId, key }) => 
AssetStateService.deleteAssetState({ assetId, key }) as unknown as 
Promise<TData>, ...options });
+/**
+* Clear Task State
+* Delete all task state keys for a task instance.
+*
+* When ``all_map_indices=true``, state is cleared for every map index of the 
task and
+* the ``map_index`` parameter is ignored.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.mapIndex
+* @param data.allMapIndices
+* @returns void Successful Response
+* @throws ApiError
+*/
+export const useTaskStateServiceClearTaskState = <TData = 
Common.TaskStateServiceClearTaskStateMutationResult, TError = unknown, TContext 
= unknown>(options?: Omit<UseMutationOptions<TData, TError, {
+  allMapIndices?: boolean;
+  dagId: string;
+  dagRunId: string;
+  mapIndex?: number;
+  taskId: string;
+}, TContext>, "mutationFn">) => useMutation<TData, TError, {
+  allMapIndices?: boolean;
+  dagId: string;
+  dagRunId: string;
+  mapIndex?: number;
+  taskId: string;
+}, TContext>({ mutationFn: ({ allMapIndices, dagId, dagRunId, mapIndex, taskId 
}) => TaskStateService.clearTaskState({ allMapIndices, dagId, dagRunId, 
mapIndex, taskId }) as unknown as Promise<TData>, ...options });
+/**
+* Delete Task State
+* Delete a single task state key. No-op if the key does not exist.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.key
+* @param data.mapIndex
+* @returns void Successful Response
+* @throws ApiError
+*/
+export const useTaskStateServiceDeleteTaskState = <TData = 
Common.TaskStateServiceDeleteTaskStateMutationResult, TError = unknown, 
TContext = unknown>(options?: Omit<UseMutationOptions<TData, TError, {
+  dagId: string;
+  dagRunId: string;
+  key: string;
+  mapIndex?: number;
+  taskId: string;
+}, TContext>, "mutationFn">) => useMutation<TData, TError, {
+  dagId: string;
+  dagRunId: string;
+  key: string;
+  mapIndex?: number;
+  taskId: string;
+}, TContext>({ mutationFn: ({ dagId, dagRunId, key, mapIndex, taskId }) => 
TaskStateService.deleteTaskState({ dagId, dagRunId, key, mapIndex, taskId }) as 
unknown as Promise<TData>, ...options });
+/**
 * Delete Xcom Entry
 * Delete an XCom entry.
 * @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts 
b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
index 34fa5aee502..8cdfee041b6 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
@@ -1,7 +1,7 @@
 // generated with @7nohe/[email protected] 
 
 import { UseQueryOptions, useSuspenseQuery } from "@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService, 
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService, 
DagStatsService, DagVersionService, DagWarningService, DashboardService, 
DeadlinesService, DependenciesService, EventLogService, ExperimentalService, 
ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, 
LoginService, MonitorService, PartitionedDagRunService, PluginService, 
PoolService, ProviderService, Structure [...]
+import { AssetService, AssetStateService, AuthLinksService, BackfillService, 
CalendarService, ConfigService, ConnectionService, DagRunService, DagService, 
DagSourceService, DagStatsService, DagVersionService, DagWarningService, 
DashboardService, DeadlinesService, DependenciesService, EventLogService, 
ExperimentalService, ExtraLinksService, GanttService, GridService, 
ImportErrorService, JobService, LoginService, MonitorService, 
PartitionedDagRunService, PluginService, PoolService, Provide [...]
 import { DagRunState, DagWarningType } from "../requests/types.gen";
 import * as Common from "./common";
 /**
@@ -1476,6 +1476,74 @@ export const useProviderServiceGetProvidersSuspense = 
<TData = Common.ProviderSe
   offset?: number;
 } = {}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: 
Common.UseProviderServiceGetProvidersKeyFn({ limit, offset }, queryKey), 
queryFn: () => ProviderService.getProviders({ limit, offset }) as TData, 
...options });
 /**
+* List Asset States
+* List all state entries for an asset.
+* @param data The data for the request.
+* @param data.assetId
+* @param data.limit
+* @param data.offset
+* @returns AssetStateCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const useAssetStateServiceListAssetStatesSuspense = <TData = 
Common.AssetStateServiceListAssetStatesDefaultResponse, TError = unknown, 
TQueryKey extends Array<unknown> = unknown[]>({ assetId, limit, offset }: {
+  assetId: number;
+  limit?: number;
+  offset?: number;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: 
Common.UseAssetStateServiceListAssetStatesKeyFn({ assetId, limit, offset }, 
queryKey), queryFn: () => AssetStateService.listAssetStates({ assetId, limit, 
offset }) as TData, ...options });
+/**
+* Get Asset State
+* Get a single asset state entry.
+* @param data The data for the request.
+* @param data.key
+* @param data.assetId
+* @returns AssetStateResponse Successful Response
+* @throws ApiError
+*/
+export const useAssetStateServiceGetAssetStateSuspense = <TData = 
Common.AssetStateServiceGetAssetStateDefaultResponse, TError = unknown, 
TQueryKey extends Array<unknown> = unknown[]>({ assetId, key }: {
+  assetId: number;
+  key: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: 
Common.UseAssetStateServiceGetAssetStateKeyFn({ assetId, key }, queryKey), 
queryFn: () => AssetStateService.getAssetState({ assetId, key }) as TData, 
...options });
+/**
+* List Task States
+* List all task state entries for a task instance.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.mapIndex
+* @param data.limit
+* @param data.offset
+* @returns TaskStateCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const useTaskStateServiceListTaskStatesSuspense = <TData = 
Common.TaskStateServiceListTaskStatesDefaultResponse, TError = unknown, 
TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, limit, 
mapIndex, offset, taskId }: {
+  dagId: string;
+  dagRunId: string;
+  limit?: number;
+  mapIndex?: number;
+  offset?: number;
+  taskId: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: 
Common.UseTaskStateServiceListTaskStatesKeyFn({ dagId, dagRunId, limit, 
mapIndex, offset, taskId }, queryKey), queryFn: () => 
TaskStateService.listTaskStates({ dagId, dagRunId, limit, mapIndex, offset, 
taskId }) as TData, ...options });
+/**
+* Get Task State
+* Get a single task state entry.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.key
+* @param data.mapIndex
+* @returns TaskStateResponse Successful Response
+* @throws ApiError
+*/
+export const useTaskStateServiceGetTaskStateSuspense = <TData = 
Common.TaskStateServiceGetTaskStateDefaultResponse, TError = unknown, TQueryKey 
extends Array<unknown> = unknown[]>({ dagId, dagRunId, key, mapIndex, taskId }: 
{
+  dagId: string;
+  dagRunId: string;
+  key: string;
+  mapIndex?: number;
+  taskId: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, 
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: 
Common.UseTaskStateServiceGetTaskStateKeyFn({ dagId, dagRunId, key, mapIndex, 
taskId }, queryKey), queryFn: () => TaskStateService.getTaskState({ dagId, 
dagRunId, key, mapIndex, taskId }) as TData, ...options });
+/**
 * Get Xcom Entry
 * Get an XCom entry.
 * @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 99b1c493455..9fc0f4c7fb5 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -381,6 +381,63 @@ export const $AssetResponse = {
     description: 'Asset serializer for responses.'
 } as const;
 
+export const $AssetStateBody = {
+    properties: {
+        value: {
+            type: 'string',
+            maxLength: 65535,
+            title: 'Value'
+        }
+    },
+    additionalProperties: false,
+    type: 'object',
+    required: ['value'],
+    title: 'AssetStateBody',
+    description: 'Request body for setting an asset state value.'
+} as const;
+
+export const $AssetStateCollectionResponse = {
+    properties: {
+        asset_states: {
+            items: {
+                '$ref': '#/components/schemas/AssetStateResponse'
+            },
+            type: 'array',
+            title: 'Asset States'
+        },
+        total_entries: {
+            type: 'integer',
+            title: 'Total Entries'
+        }
+    },
+    type: 'object',
+    required: ['asset_states', 'total_entries'],
+    title: 'AssetStateCollectionResponse',
+    description: 'All asset state entries for an asset.'
+} as const;
+
+export const $AssetStateResponse = {
+    properties: {
+        key: {
+            type: 'string',
+            title: 'Key'
+        },
+        value: {
+            type: 'string',
+            title: 'Value'
+        },
+        updated_at: {
+            type: 'string',
+            format: 'date-time',
+            title: 'Updated At'
+        }
+    },
+    type: 'object',
+    required: ['key', 'value', 'updated_at'],
+    title: 'AssetStateResponse',
+    description: 'A single asset state key/value pair with metadata.'
+} as const;
+
 export const $AssetWatcherResponse = {
     properties: {
         name: {
@@ -6738,6 +6795,75 @@ export const $TaskResponse = {
     description: 'Task serializer for responses.'
 } as const;
 
+export const $TaskStateBody = {
+    properties: {
+        value: {
+            type: 'string',
+            maxLength: 65535,
+            title: 'Value'
+        }
+    },
+    additionalProperties: false,
+    type: 'object',
+    required: ['value'],
+    title: 'TaskStateBody',
+    description: 'Request body for setting a task state value.'
+} as const;
+
+export const $TaskStateCollectionResponse = {
+    properties: {
+        task_states: {
+            items: {
+                '$ref': '#/components/schemas/TaskStateResponse'
+            },
+            type: 'array',
+            title: 'Task States'
+        },
+        total_entries: {
+            type: 'integer',
+            title: 'Total Entries'
+        }
+    },
+    type: 'object',
+    required: ['task_states', 'total_entries'],
+    title: 'TaskStateCollectionResponse',
+    description: 'All task state entries for a task instance.'
+} as const;
+
+export const $TaskStateResponse = {
+    properties: {
+        key: {
+            type: 'string',
+            title: 'Key'
+        },
+        value: {
+            type: 'string',
+            title: 'Value'
+        },
+        updated_at: {
+            type: 'string',
+            format: 'date-time',
+            title: 'Updated At'
+        },
+        expires_at: {
+            anyOf: [
+                {
+                    type: 'string',
+                    format: 'date-time'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Expires At'
+        }
+    },
+    type: 'object',
+    required: ['key', 'value', 'updated_at', 'expires_at'],
+    title: 'TaskStateResponse',
+    description: 'A single task state key/value pair with metadata.'
+} as const;
+
 export const $TimeDelta = {
     properties: {
         __type: {
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index 1e24595089b..bc1f4471470 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -3,7 +3,7 @@
 import type { CancelablePromise } from './core/CancelablePromise';
 import { OpenAPI } from './core/OpenAPI';
 import { request as __request } from './core/request';
-import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, 
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, 
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, 
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, 
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, 
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, 
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, 
Dele [...]
+import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, 
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, 
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, 
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, 
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, 
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, 
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, 
Dele [...]
 
 export class AssetService {
     /**
@@ -3505,6 +3505,325 @@ export class ProviderService {
     
 }
 
+export class AssetStateService {
+    /**
+     * List Asset States
+     * List all state entries for an asset.
+     * @param data The data for the request.
+     * @param data.assetId
+     * @param data.limit
+     * @param data.offset
+     * @returns AssetStateCollectionResponse Successful Response
+     * @throws ApiError
+     */
+    public static listAssetStates(data: ListAssetStatesData): 
CancelablePromise<ListAssetStatesResponse> {
+        return __request(OpenAPI, {
+            method: 'GET',
+            url: '/api/v2/assets/{asset_id}/states',
+            path: {
+                asset_id: data.assetId
+            },
+            query: {
+                limit: data.limit,
+                offset: data.offset
+            },
+            errors: {
+                401: 'Unauthorized',
+                403: 'Forbidden',
+                404: 'Not Found',
+                422: 'Validation Error'
+            }
+        });
+    }
+    
+    /**
+     * Clear Asset State
+     * Delete all state keys for an asset.
+     * @param data The data for the request.
+     * @param data.assetId
+     * @returns void Successful Response
+     * @throws ApiError
+     */
+    public static clearAssetState(data: ClearAssetStateData): 
CancelablePromise<ClearAssetStateResponse> {
+        return __request(OpenAPI, {
+            method: 'DELETE',
+            url: '/api/v2/assets/{asset_id}/states',
+            path: {
+                asset_id: data.assetId
+            },
+            errors: {
+                401: 'Unauthorized',
+                403: 'Forbidden',
+                404: 'Not Found',
+                422: 'Validation Error'
+            }
+        });
+    }
+    
+    /**
+     * Get Asset State
+     * Get a single asset state entry.
+     * @param data The data for the request.
+     * @param data.key
+     * @param data.assetId
+     * @returns AssetStateResponse Successful Response
+     * @throws ApiError
+     */
+    public static getAssetState(data: GetAssetStateData): 
CancelablePromise<GetAssetStateResponse> {
+        return __request(OpenAPI, {
+            method: 'GET',
+            url: '/api/v2/assets/{asset_id}/states/{key}',
+            path: {
+                key: data.key,
+                asset_id: data.assetId
+            },
+            errors: {
+                401: 'Unauthorized',
+                403: 'Forbidden',
+                404: 'Not Found',
+                422: 'Validation Error'
+            }
+        });
+    }
+    
+    /**
+     * Set Asset State
+     * Set an asset state value. Creates or overwrites the key.
+     * @param data The data for the request.
+     * @param data.key
+     * @param data.assetId
+     * @param data.requestBody
+     * @returns void Successful Response
+     * @throws ApiError
+     */
+    public static setAssetState(data: SetAssetStateData): 
CancelablePromise<SetAssetStateResponse> {
+        return __request(OpenAPI, {
+            method: 'PUT',
+            url: '/api/v2/assets/{asset_id}/states/{key}',
+            path: {
+                key: data.key,
+                asset_id: data.assetId
+            },
+            body: data.requestBody,
+            mediaType: 'application/json',
+            errors: {
+                401: 'Unauthorized',
+                403: 'Forbidden',
+                404: 'Not Found',
+                422: 'Validation Error'
+            }
+        });
+    }
+    
+    /**
+     * Delete Asset State
+     * Delete a single asset state key. No-op if the key does not exist.
+     * @param data The data for the request.
+     * @param data.key
+     * @param data.assetId
+     * @returns void Successful Response
+     * @throws ApiError
+     */
+    public static deleteAssetState(data: DeleteAssetStateData): 
CancelablePromise<DeleteAssetStateResponse> {
+        return __request(OpenAPI, {
+            method: 'DELETE',
+            url: '/api/v2/assets/{asset_id}/states/{key}',
+            path: {
+                key: data.key,
+                asset_id: data.assetId
+            },
+            errors: {
+                401: 'Unauthorized',
+                403: 'Forbidden',
+                404: 'Not Found',
+                422: 'Validation Error'
+            }
+        });
+    }
+    
+}
+
+export class TaskStateService {
+    /**
+     * List Task States
+     * List all task state entries for a task instance.
+     * @param data The data for the request.
+     * @param data.dagId
+     * @param data.dagRunId
+     * @param data.taskId
+     * @param data.mapIndex
+     * @param data.limit
+     * @param data.offset
+     * @returns TaskStateCollectionResponse Successful Response
+     * @throws ApiError
+     */
+    public static listTaskStates(data: ListTaskStatesData): 
CancelablePromise<ListTaskStatesResponse> {
+        return __request(OpenAPI, {
+            method: 'GET',
+            url: 
'/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states',
+            path: {
+                dag_id: data.dagId,
+                dag_run_id: data.dagRunId,
+                task_id: data.taskId
+            },
+            query: {
+                map_index: data.mapIndex,
+                limit: data.limit,
+                offset: data.offset
+            },
+            errors: {
+                401: 'Unauthorized',
+                403: 'Forbidden',
+                404: 'Not Found',
+                422: 'Validation Error'
+            }
+        });
+    }
+    
+    /**
+     * Clear Task State
+     * Delete all task state keys for a task instance.
+     *
+     * When ``all_map_indices=true``, state is cleared for every map index of 
the task and
+     * the ``map_index`` parameter is ignored.
+     * @param data The data for the request.
+     * @param data.dagId
+     * @param data.dagRunId
+     * @param data.taskId
+     * @param data.mapIndex
+     * @param data.allMapIndices
+     * @returns void Successful Response
+     * @throws ApiError
+     */
+    public static clearTaskState(data: ClearTaskStateData): 
CancelablePromise<ClearTaskStateResponse> {
+        return __request(OpenAPI, {
+            method: 'DELETE',
+            url: 
'/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states',
+            path: {
+                dag_id: data.dagId,
+                dag_run_id: data.dagRunId,
+                task_id: data.taskId
+            },
+            query: {
+                map_index: data.mapIndex,
+                all_map_indices: data.allMapIndices
+            },
+            errors: {
+                401: 'Unauthorized',
+                403: 'Forbidden',
+                404: 'Not Found',
+                422: 'Validation Error'
+            }
+        });
+    }
+    
+    /**
+     * Get Task State
+     * Get a single task state entry.
+     * @param data The data for the request.
+     * @param data.dagId
+     * @param data.dagRunId
+     * @param data.taskId
+     * @param data.key
+     * @param data.mapIndex
+     * @returns TaskStateResponse Successful Response
+     * @throws ApiError
+     */
+    public static getTaskState(data: GetTaskStateData): 
CancelablePromise<GetTaskStateResponse> {
+        return __request(OpenAPI, {
+            method: 'GET',
+            url: 
'/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states/{key}',
+            path: {
+                dag_id: data.dagId,
+                dag_run_id: data.dagRunId,
+                task_id: data.taskId,
+                key: data.key
+            },
+            query: {
+                map_index: data.mapIndex
+            },
+            errors: {
+                401: 'Unauthorized',
+                403: 'Forbidden',
+                404: 'Not Found',
+                422: 'Validation Error'
+            }
+        });
+    }
+    
+    /**
+     * Set Task State
+     * Set a task state value. Creates or overwrites the key.
+     * @param data The data for the request.
+     * @param data.dagId
+     * @param data.dagRunId
+     * @param data.taskId
+     * @param data.key
+     * @param data.requestBody
+     * @param data.mapIndex
+     * @returns void Successful Response
+     * @throws ApiError
+     */
+    public static setTaskState(data: SetTaskStateData): 
CancelablePromise<SetTaskStateResponse> {
+        return __request(OpenAPI, {
+            method: 'PUT',
+            url: 
'/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states/{key}',
+            path: {
+                dag_id: data.dagId,
+                dag_run_id: data.dagRunId,
+                task_id: data.taskId,
+                key: data.key
+            },
+            query: {
+                map_index: data.mapIndex
+            },
+            body: data.requestBody,
+            mediaType: 'application/json',
+            errors: {
+                401: 'Unauthorized',
+                403: 'Forbidden',
+                404: 'Not Found',
+                422: 'Validation Error'
+            }
+        });
+    }
+    
+    /**
+     * Delete Task State
+     * Delete a single task state key. No-op if the key does not exist.
+     * @param data The data for the request.
+     * @param data.dagId
+     * @param data.dagRunId
+     * @param data.taskId
+     * @param data.key
+     * @param data.mapIndex
+     * @returns void Successful Response
+     * @throws ApiError
+     */
+    public static deleteTaskState(data: DeleteTaskStateData): 
CancelablePromise<DeleteTaskStateResponse> {
+        return __request(OpenAPI, {
+            method: 'DELETE',
+            url: 
'/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states/{key}',
+            path: {
+                dag_id: data.dagId,
+                dag_run_id: data.dagRunId,
+                task_id: data.taskId,
+                key: data.key
+            },
+            query: {
+                map_index: data.mapIndex
+            },
+            errors: {
+                401: 'Unauthorized',
+                403: 'Forbidden',
+                404: 'Not Found',
+                422: 'Validation Error'
+            }
+        });
+    }
+    
+}
+
 export class XcomService {
     /**
      * Get Xcom Entry
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 5db38b8e961..ee025fc11a5 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -96,6 +96,30 @@ export type AssetResponse = {
     last_asset_event?: LastAssetEventResponse | null;
 };
 
+/**
+ * Request body for setting an asset state value.
+ */
+export type AssetStateBody = {
+    value: string;
+};
+
+/**
+ * All asset state entries for an asset.
+ */
+export type AssetStateCollectionResponse = {
+    asset_states: Array<AssetStateResponse>;
+    total_entries: number;
+};
+
+/**
+ * A single asset state key/value pair with metadata.
+ */
+export type AssetStateResponse = {
+    key: string;
+    value: string;
+    updated_at: string;
+};
+
 /**
  * Asset watcher serializer for responses.
  */
@@ -1631,6 +1655,31 @@ export type TaskResponse = {
     readonly extra_links: Array<(string)>;
 };
 
+/**
+ * Request body for setting a task state value.
+ */
+export type TaskStateBody = {
+    value: string;
+};
+
+/**
+ * All task state entries for a task instance.
+ */
+export type TaskStateCollectionResponse = {
+    task_states: Array<TaskStateResponse>;
+    total_entries: number;
+};
+
+/**
+ * A single task state key/value pair with metadata.
+ */
+export type TaskStateResponse = {
+    key: string;
+    value: string;
+    updated_at: string;
+    expires_at: string | null;
+};
+
 /**
  * TimeDelta can be used to interact with datetime.timedelta objects.
  */
@@ -3775,6 +3824,94 @@ export type GetProvidersData = {
 
 export type GetProvidersResponse = ProviderCollectionResponse;
 
+export type ListAssetStatesData = {
+    assetId: number;
+    limit?: number;
+    offset?: number;
+};
+
+export type ListAssetStatesResponse = AssetStateCollectionResponse;
+
+export type ClearAssetStateData = {
+    assetId: number;
+};
+
+export type ClearAssetStateResponse = void;
+
+export type GetAssetStateData = {
+    assetId: number;
+    key: string;
+};
+
+export type GetAssetStateResponse = AssetStateResponse;
+
+export type SetAssetStateData = {
+    assetId: number;
+    key: string;
+    requestBody: AssetStateBody;
+};
+
+export type SetAssetStateResponse = void;
+
+export type DeleteAssetStateData = {
+    assetId: number;
+    key: string;
+};
+
+export type DeleteAssetStateResponse = void;
+
+export type ListTaskStatesData = {
+    dagId: string;
+    dagRunId: string;
+    limit?: number;
+    mapIndex?: number;
+    offset?: number;
+    taskId: string;
+};
+
+export type ListTaskStatesResponse = TaskStateCollectionResponse;
+
+export type ClearTaskStateData = {
+    allMapIndices?: boolean;
+    dagId: string;
+    dagRunId: string;
+    mapIndex?: number;
+    taskId: string;
+};
+
+export type ClearTaskStateResponse = void;
+
+export type GetTaskStateData = {
+    dagId: string;
+    dagRunId: string;
+    key: string;
+    mapIndex?: number;
+    taskId: string;
+};
+
+export type GetTaskStateResponse = TaskStateResponse;
+
+export type SetTaskStateData = {
+    dagId: string;
+    dagRunId: string;
+    key: string;
+    mapIndex?: number;
+    requestBody: TaskStateBody;
+    taskId: string;
+};
+
+export type SetTaskStateResponse = void;
+
+export type DeleteTaskStateData = {
+    dagId: string;
+    dagRunId: string;
+    key: string;
+    mapIndex?: number;
+    taskId: string;
+};
+
+export type DeleteTaskStateResponse = void;
+
 export type GetXcomEntryData = {
     dagId: string;
     dagRunId: string;
@@ -6810,6 +6947,264 @@ export type $OpenApiTs = {
             };
         };
     };
+    '/api/v2/assets/{asset_id}/states': {
+        get: {
+            req: ListAssetStatesData;
+            res: {
+                /**
+                 * Successful Response
+                 */
+                200: AssetStateCollectionResponse;
+                /**
+                 * Unauthorized
+                 */
+                401: HTTPExceptionResponse;
+                /**
+                 * Forbidden
+                 */
+                403: HTTPExceptionResponse;
+                /**
+                 * Not Found
+                 */
+                404: HTTPExceptionResponse;
+                /**
+                 * Validation Error
+                 */
+                422: HTTPValidationError;
+            };
+        };
+        delete: {
+            req: ClearAssetStateData;
+            res: {
+                /**
+                 * Successful Response
+                 */
+                204: void;
+                /**
+                 * Unauthorized
+                 */
+                401: HTTPExceptionResponse;
+                /**
+                 * Forbidden
+                 */
+                403: HTTPExceptionResponse;
+                /**
+                 * Not Found
+                 */
+                404: HTTPExceptionResponse;
+                /**
+                 * Validation Error
+                 */
+                422: HTTPValidationError;
+            };
+        };
+    };
+    '/api/v2/assets/{asset_id}/states/{key}': {
+        get: {
+            req: GetAssetStateData;
+            res: {
+                /**
+                 * Successful Response
+                 */
+                200: AssetStateResponse;
+                /**
+                 * Unauthorized
+                 */
+                401: HTTPExceptionResponse;
+                /**
+                 * Forbidden
+                 */
+                403: HTTPExceptionResponse;
+                /**
+                 * Not Found
+                 */
+                404: HTTPExceptionResponse;
+                /**
+                 * Validation Error
+                 */
+                422: HTTPValidationError;
+            };
+        };
+        put: {
+            req: SetAssetStateData;
+            res: {
+                /**
+                 * Successful Response
+                 */
+                204: void;
+                /**
+                 * Unauthorized
+                 */
+                401: HTTPExceptionResponse;
+                /**
+                 * Forbidden
+                 */
+                403: HTTPExceptionResponse;
+                /**
+                 * Not Found
+                 */
+                404: HTTPExceptionResponse;
+                /**
+                 * Validation Error
+                 */
+                422: HTTPValidationError;
+            };
+        };
+        delete: {
+            req: DeleteAssetStateData;
+            res: {
+                /**
+                 * Successful Response
+                 */
+                204: void;
+                /**
+                 * Unauthorized
+                 */
+                401: HTTPExceptionResponse;
+                /**
+                 * Forbidden
+                 */
+                403: HTTPExceptionResponse;
+                /**
+                 * Not Found
+                 */
+                404: HTTPExceptionResponse;
+                /**
+                 * Validation Error
+                 */
+                422: HTTPValidationError;
+            };
+        };
+    };
+    
'/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states': {
+        get: {
+            req: ListTaskStatesData;
+            res: {
+                /**
+                 * Successful Response
+                 */
+                200: TaskStateCollectionResponse;
+                /**
+                 * Unauthorized
+                 */
+                401: HTTPExceptionResponse;
+                /**
+                 * Forbidden
+                 */
+                403: HTTPExceptionResponse;
+                /**
+                 * Not Found
+                 */
+                404: HTTPExceptionResponse;
+                /**
+                 * Validation Error
+                 */
+                422: HTTPValidationError;
+            };
+        };
+        delete: {
+            req: ClearTaskStateData;
+            res: {
+                /**
+                 * Successful Response
+                 */
+                204: void;
+                /**
+                 * Unauthorized
+                 */
+                401: HTTPExceptionResponse;
+                /**
+                 * Forbidden
+                 */
+                403: HTTPExceptionResponse;
+                /**
+                 * Not Found
+                 */
+                404: HTTPExceptionResponse;
+                /**
+                 * Validation Error
+                 */
+                422: HTTPValidationError;
+            };
+        };
+    };
+    
'/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states/{key}':
 {
+        get: {
+            req: GetTaskStateData;
+            res: {
+                /**
+                 * Successful Response
+                 */
+                200: TaskStateResponse;
+                /**
+                 * Unauthorized
+                 */
+                401: HTTPExceptionResponse;
+                /**
+                 * Forbidden
+                 */
+                403: HTTPExceptionResponse;
+                /**
+                 * Not Found
+                 */
+                404: HTTPExceptionResponse;
+                /**
+                 * Validation Error
+                 */
+                422: HTTPValidationError;
+            };
+        };
+        put: {
+            req: SetTaskStateData;
+            res: {
+                /**
+                 * Successful Response
+                 */
+                204: void;
+                /**
+                 * Unauthorized
+                 */
+                401: HTTPExceptionResponse;
+                /**
+                 * Forbidden
+                 */
+                403: HTTPExceptionResponse;
+                /**
+                 * Not Found
+                 */
+                404: HTTPExceptionResponse;
+                /**
+                 * Validation Error
+                 */
+                422: HTTPValidationError;
+            };
+        };
+        delete: {
+            req: DeleteTaskStateData;
+            res: {
+                /**
+                 * Successful Response
+                 */
+                204: void;
+                /**
+                 * Unauthorized
+                 */
+                401: HTTPExceptionResponse;
+                /**
+                 * Forbidden
+                 */
+                403: HTTPExceptionResponse;
+                /**
+                 * Not Found
+                 */
+                404: HTTPExceptionResponse;
+                /**
+                 * Validation Error
+                 */
+                422: HTTPValidationError;
+            };
+        };
+    };
     
'/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}':
 {
         get: {
             req: GetXcomEntryData;
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_state.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_state.py
new file mode 100644
index 00000000000..c53fbb99ee9
--- /dev/null
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_state.py
@@ -0,0 +1,255 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.models.asset import AssetModel
+from airflow.models.asset_state import AssetStateModel
+
+from tests_common.test_utils.db import clear_db_assets
+
+pytestmark = pytest.mark.db_test
+
+ASSET_URI = "s3://bucket/watermarks"
+ASSET_NAME = "test_asset"
+
+
+def _create_asset(session) -> AssetModel:
+    asset = AssetModel(uri=ASSET_URI, name=ASSET_NAME, group="test")
+    session.add(asset)
+    session.flush()
+    return asset
+
+
+def _create_asset_state(session, asset_id: int, key: str, value: str) -> None:
+    row = AssetStateModel(asset_id=asset_id, key=key, value=value)
+    session.add(row)
+    session.flush()
+
+
+class TestAssetStateEndpoint:
+    @staticmethod
+    def clear_db():
+        clear_db_assets()
+
+    @pytest.fixture(autouse=True)
+    def setup(self, session):
+        self.clear_db()
+        self.asset = _create_asset(session)
+        session.commit()
+        self._session = session
+        self._base_url = f"/assets/{self.asset.id}/states"
+
+    def teardown_method(self):
+        self.clear_db()
+
+
+class TestAssetNotFound(TestAssetStateEndpoint):
+    """All endpoints return 404 when asset_id does not exist."""
+
+    def test_list_unknown_asset_returns_404(self, test_client):
+        assert test_client.get("/assets/99999/states").status_code == 404
+
+    def test_get_unknown_asset_returns_404(self, test_client):
+        assert test_client.get("/assets/99999/states/key").status_code == 404
+
+    def test_set_unknown_asset_returns_404(self, test_client):
+        assert test_client.put("/assets/99999/states/key", json={"value": 
"v"}).status_code == 404
+
+    def test_delete_unknown_asset_returns_404(self, test_client):
+        assert test_client.delete("/assets/99999/states/key").status_code == 
404
+
+    def test_clear_unknown_asset_returns_404(self, test_client):
+        assert test_client.delete("/assets/99999/states").status_code == 404
+
+
+class TestListAssetState(TestAssetStateEndpoint):
+    def test_returns_empty_list_when_no_state(self, test_client):
+        response = test_client.get(self._base_url)
+        assert response.status_code == 200
+        assert response.json() == {"asset_states": [], "total_entries": 0}
+
+    def test_returns_all_keys(self, test_client):
+        _create_asset_state(self._session, self.asset.id, "watermark", 
"2026-05-01")
+        _create_asset_state(self._session, self.asset.id, "file_count", "42")
+        self._session.commit()
+
+        response = test_client.get(self._base_url)
+        assert response.status_code == 200
+        data = response.json()
+        assert data["total_entries"] == 2
+        keys = {item["key"]: item["value"] for item in data["asset_states"]}
+        assert keys == {"watermark": "2026-05-01", "file_count": "42"}
+
+    def test_returns_metadata_fields(self, test_client):
+        _create_asset_state(self._session, self.asset.id, "watermark", 
"2026-05-01")
+        self._session.commit()
+
+        item = test_client.get(self._base_url).json()["asset_states"][0]
+        assert "updated_at" in item
+        assert item["key"] == "watermark"
+
+    def test_pagination_limit(self, test_client):
+        for k in ("watermark", "file_count", "last_run"):
+            _create_asset_state(self._session, self.asset.id, k, "v")
+        self._session.commit()
+
+        response = test_client.get(f"{self._base_url}?limit=2")
+        data = response.json()
+        assert data["total_entries"] == 3
+        assert len(data["asset_states"]) == 2
+
+    def test_pagination_offset(self, test_client):
+        for k in ("watermark", "file_count", "last_run"):
+            _create_asset_state(self._session, self.asset.id, k, "v")
+        self._session.commit()
+
+        response = test_client.get(f"{self._base_url}?limit=2&offset=2")
+        data = response.json()
+        assert data["total_entries"] == 3
+        assert len(data["asset_states"]) == 1
+
+    def test_unauthorized_returns_401(self, unauthenticated_test_client):
+        assert unauthenticated_test_client.get(self._base_url).status_code == 
401
+
+
+class TestGetAssetState(TestAssetStateEndpoint):
+    def test_returns_value(self, test_client):
+        _create_asset_state(self._session, self.asset.id, "watermark", 
"2026-05-01")
+        self._session.commit()
+
+        response = test_client.get(f"{self._base_url}/watermark")
+        assert response.status_code == 200
+        data = response.json()
+        assert data["key"] == "watermark"
+        assert data["value"] == "2026-05-01"
+        assert "updated_at" in data
+
+    def test_missing_key_returns_404(self, test_client):
+        assert test_client.get(f"{self._base_url}/nonexistent").status_code == 
404
+
+    def test_key_with_slash_is_supported(self, test_client):
+        """Keys containing slashes must work — route uses {key:path}."""
+        _create_asset_state(self._session, self.asset.id, "partition/date", 
"2026-05-01")
+        self._session.commit()
+
+        response = test_client.get(f"{self._base_url}/partition/date")
+        assert response.status_code == 200
+        assert response.json()["key"] == "partition/date"
+
+    def test_unauthorized_returns_401(self, unauthenticated_test_client):
+        assert 
unauthenticated_test_client.get(f"{self._base_url}/watermark").status_code == 
401
+
+
+class TestSetAssetState(TestAssetStateEndpoint):
+    def test_creates_new_key(self, test_client):
+        response = test_client.put(f"{self._base_url}/watermark", 
json={"value": "2026-05-01"})
+        assert response.status_code == 204
+
+        assert test_client.get(f"{self._base_url}/watermark").json()["value"] 
== "2026-05-01"
+
+    def test_overwrites_existing_key(self, test_client):
+        test_client.put(f"{self._base_url}/watermark", json={"value": "v1"})
+        test_client.put(f"{self._base_url}/watermark", json={"value": "v2"})
+
+        assert test_client.get(f"{self._base_url}/watermark").json()["value"] 
== "v2"
+
+    def test_empty_body_returns_422(self, test_client):
+        assert test_client.put(f"{self._base_url}/watermark", 
json={}).status_code == 422
+
+    def test_oversized_value_returns_422(self, test_client):
+        assert test_client.put(f"{self._base_url}/watermark", json={"value": 
"x" * 65536}).status_code == 422
+
+    def test_key_with_slash_is_supported(self, test_client):
+        response = test_client.put(f"{self._base_url}/partition/date", 
json={"value": "2026-05-01"})
+        assert response.status_code == 204
+        assert 
test_client.get(f"{self._base_url}/partition/date").json()["key"] == 
"partition/date"
+
+    def test_unauthorized_returns_401(self, unauthenticated_test_client):
+        assert (
+            unauthenticated_test_client.put(f"{self._base_url}/watermark", 
json={"value": "v"}).status_code
+            == 401
+        )
+
+
+class TestDeleteAssetState(TestAssetStateEndpoint):
+    def test_deletes_key(self, test_client):
+        _create_asset_state(self._session, self.asset.id, "watermark", 
"2026-05-01")
+        self._session.commit()
+
+        assert test_client.delete(f"{self._base_url}/watermark").status_code 
== 204
+        assert test_client.get(f"{self._base_url}/watermark").status_code == 
404
+
+    def test_delete_noop_for_missing_key(self, test_client):
+        assert test_client.delete(f"{self._base_url}/nonexistent").status_code 
== 204
+
+    def test_only_deletes_target_key(self, test_client):
+        _create_asset_state(self._session, self.asset.id, "watermark", "a")
+        _create_asset_state(self._session, self.asset.id, "file_count", "b")
+        self._session.commit()
+
+        test_client.delete(f"{self._base_url}/watermark")
+
+        assert test_client.get(f"{self._base_url}/watermark").status_code == 
404
+        assert test_client.get(f"{self._base_url}/file_count").json()["value"] 
== "b"
+
+    def test_key_with_slash_is_supported(self, test_client):
+        _create_asset_state(self._session, self.asset.id, "partition/date", 
"v")
+        self._session.commit()
+
+        assert 
test_client.delete(f"{self._base_url}/partition/date").status_code == 204
+        assert test_client.get(f"{self._base_url}/partition/date").status_code 
== 404
+
+    def test_unauthorized_returns_401(self, unauthenticated_test_client):
+        assert 
unauthenticated_test_client.delete(f"{self._base_url}/watermark").status_code 
== 401
+
+
+class TestClearAssetState(TestAssetStateEndpoint):
+    def test_clears_all_keys(self, test_client):
+        for k, v in [("watermark", "a"), ("file_count", "b"), ("last_run", 
"c")]:
+            _create_asset_state(self._session, self.asset.id, k, v)
+        self._session.commit()
+
+        assert test_client.delete(self._base_url).status_code == 204
+        assert test_client.get(self._base_url).json()["total_entries"] == 0
+
+    def test_clear_is_noop_when_no_state(self, test_client):
+        assert test_client.delete(self._base_url).status_code == 204
+
+    def test_clear_does_not_affect_other_assets(self, test_client):
+        other_asset = AssetModel(uri="s3://other/asset", name="other_asset", 
group="test")
+        self._session.add(other_asset)
+        self._session.flush()
+        _create_asset_state(self._session, self.asset.id, "watermark", "mine")
+        _create_asset_state(self._session, other_asset.id, "watermark", 
"theirs")
+        self._session.commit()
+
+        test_client.delete(self._base_url)
+
+        other_url = f"/assets/{other_asset.id}/states"
+        assert test_client.get(f"{other_url}/watermark").json()["value"] == 
"theirs"
+
+    def test_clears_slash_keyed_entries(self, test_client):
+        _create_asset_state(self._session, self.asset.id, "partition/date", 
"v")
+        self._session.commit()
+
+        assert test_client.delete(self._base_url).status_code == 204
+        assert test_client.get(self._base_url).json()["total_entries"] == 0
+
+    def test_unauthorized_returns_401(self, unauthenticated_test_client):
+        assert unauthenticated_test_client.delete(self._base_url).status_code 
== 401
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_state.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_state.py
new file mode 100644
index 00000000000..c53212fa5e4
--- /dev/null
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_state.py
@@ -0,0 +1,293 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+from sqlalchemy import select
+
+from airflow._shared.timezones import timezone
+from airflow.models.dagrun import DagRun
+from airflow.models.task_state import TaskStateModel
+from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.utils.types import DagRunType
+
+from tests_common.test_utils.db import clear_db_dag_bundles, clear_db_dags, 
clear_db_runs
+
+pytestmark = pytest.mark.db_test
+
+DAG_ID = "test_dag"
+TASK_ID = "test_task"
+LOGICAL_DATE = timezone.datetime(2026, 1, 1)
+RUN_ID = DagRun.generate_run_id(run_type=DagRunType.MANUAL, 
logical_date=LOGICAL_DATE, run_after=LOGICAL_DATE)
+
+BASE_URL = f"/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/states"
+
+
+def _create_dag_run(dag_maker, session):
+    with dag_maker(DAG_ID, schedule=None, start_date=LOGICAL_DATE):
+        EmptyOperator(task_id=TASK_ID)
+    dag_maker.create_dagrun(run_id=RUN_ID, run_type=DagRunType.MANUAL, 
logical_date=LOGICAL_DATE)
+    dag_maker.sync_dagbag_to_db()
+    session.merge(dag_maker.dag_model)
+    session.commit()
+
+
+def _create_task_state(session, key: str, value: str, dag_run: DagRun) -> None:
+    row = TaskStateModel(
+        dag_run_id=dag_run.id,
+        dag_id=DAG_ID,
+        run_id=RUN_ID,
+        task_id=TASK_ID,
+        map_index=-1,
+        key=key,
+        value=value,
+    )
+    session.add(row)
+    session.flush()
+
+
+class TestTaskStateEndpoint:
+    @staticmethod
+    def clear_db():
+        clear_db_dags()
+        clear_db_runs()
+        clear_db_dag_bundles()
+
+    @pytest.fixture(autouse=True)
+    def setup(self, dag_maker, session):
+        self.clear_db()
+        _create_dag_run(dag_maker, session)
+        self.dag_run = session.scalar(select(DagRun).where(DagRun.run_id == 
RUN_ID))
+        self._session = session
+
+    def teardown_method(self):
+        self.clear_db()
+
+
+class TestListTaskState(TestTaskStateEndpoint):
+    def test_returns_empty_list_when_no_state(self, test_client):
+        response = test_client.get(BASE_URL)
+        assert response.status_code == 200
+        assert response.json() == {"task_states": [], "total_entries": 0}
+
+    def test_returns_all_keys(self, test_client):
+        _create_task_state(self._session, "job_id", "spark_001", self.dag_run)
+        _create_task_state(self._session, "checkpoint", "step_3", self.dag_run)
+        self._session.commit()
+
+        response = test_client.get(BASE_URL)
+        assert response.status_code == 200
+        data = response.json()
+        assert data["total_entries"] == 2
+        keys = {item["key"]: item["value"] for item in data["task_states"]}
+        assert keys == {"job_id": "spark_001", "checkpoint": "step_3"}
+
+    def test_returns_state_metadata_fields(self, test_client):
+        _create_task_state(self._session, "job_id", "spark_001", self.dag_run)
+        self._session.commit()
+
+        response = test_client.get(BASE_URL)
+        item = response.json()["task_states"][0]
+        assert "updated_at" in item
+        assert "expires_at" in item
+
+    def test_map_index_isolation(self, test_client):
+        """map_index=-1 (default) doesn't return rows for other map indices."""
+        row = TaskStateModel(
+            dag_run_id=self.dag_run.id,
+            dag_id=DAG_ID,
+            run_id=RUN_ID,
+            task_id=TASK_ID,
+            map_index=0,
+            key="job_id",
+            value="mapped_app",
+        )
+        self._session.add(row)
+        self._session.commit()
+
+        response = test_client.get(BASE_URL)
+        assert response.json()["total_entries"] == 0
+
+    def test_pagination_limit(self, test_client):
+        for k in ("a", "b", "c"):
+            _create_task_state(self._session, k, "v", self.dag_run)
+        self._session.commit()
+
+        response = test_client.get(f"{BASE_URL}?limit=2")
+        data = response.json()
+        assert data["total_entries"] == 3
+        assert len(data["task_states"]) == 2
+
+    def test_pagination_offset(self, test_client):
+        for k in ("a", "b", "c"):
+            _create_task_state(self._session, k, "v", self.dag_run)
+        self._session.commit()
+
+        response = test_client.get(f"{BASE_URL}?limit=2&offset=2")
+        data = response.json()
+        assert data["total_entries"] == 3
+        assert len(data["task_states"]) == 1
+
+    def test_unauthorized_returns_401(self, unauthenticated_test_client):
+        assert unauthenticated_test_client.get(BASE_URL).status_code == 401
+
+
+class TestGetTaskState(TestTaskStateEndpoint):
+    def test_returns_value(self, test_client):
+        _create_task_state(self._session, "job_id", "spark_001", self.dag_run)
+        self._session.commit()
+
+        response = test_client.get(f"{BASE_URL}/job_id")
+        assert response.status_code == 200
+        data = response.json()
+        assert data["key"] == "job_id"
+        assert data["value"] == "spark_001"
+
+    def test_missing_key_returns_404(self, test_client):
+        response = test_client.get(f"{BASE_URL}/nonexistent")
+        assert response.status_code == 404
+
+    def test_key_with_slash_is_supported(self, test_client):
+        """Keys containing slashes must work — route uses {key:path}."""
+        _create_task_state(self._session, "workflow/step_1", "v", self.dag_run)
+        self._session.commit()
+
+        response = test_client.get(f"{BASE_URL}/workflow/step_1")
+        assert response.status_code == 200
+        assert response.json()["key"] == "workflow/step_1"
+
+    def test_unauthorized_returns_401(self, unauthenticated_test_client):
+        assert 
unauthenticated_test_client.get(f"{BASE_URL}/job_id").status_code == 401
+
+
+class TestSetTaskState(TestTaskStateEndpoint):
+    def test_creates_new_key(self, test_client):
+        response = test_client.put(f"{BASE_URL}/job_id", json={"value": 
"spark_001"})
+        assert response.status_code == 204
+
+        get_resp = test_client.get(f"{BASE_URL}/job_id")
+        assert get_resp.json()["value"] == "spark_001"
+
+    def test_overwrites_existing_key(self, test_client):
+        test_client.put(f"{BASE_URL}/job_id", json={"value": "v1"})
+        test_client.put(f"{BASE_URL}/job_id", json={"value": "v2"})
+
+        assert test_client.get(f"{BASE_URL}/job_id").json()["value"] == "v2"
+
+    def test_empty_body_returns_422(self, test_client):
+        assert test_client.put(f"{BASE_URL}/job_id", json={}).status_code == 
422
+
+    def test_oversized_value_returns_422(self, test_client):
+        assert test_client.put(f"{BASE_URL}/job_id", json={"value": "x" * 
65536}).status_code == 422
+
+    def test_set_nonexistent_dag_run_returns_404(self, test_client):
+        """set() raises ValueError when DagRun doesn't exist — should surface 
as 404."""
+        bad_url = 
f"/dags/{DAG_ID}/dagRuns/nonexistent_run/taskInstances/{TASK_ID}/states/job_id"
+        response = test_client.put(bad_url, json={"value": "v"})
+        assert response.status_code == 404
+
+    def test_set_nonexistent_task_id_returns_404(self, test_client):
+        """set() returns 404 when task_id doesn not match any TaskInstance in 
the run."""
+        bad_url = 
f"/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/nonexistent_task/states/job_id"
+        response = test_client.put(bad_url, json={"value": "v"})
+        assert response.status_code == 404
+
+    def test_key_with_slash_is_supported(self, test_client):
+        response = test_client.put(f"{BASE_URL}/workflow/step_1", 
json={"value": "v"})
+        assert response.status_code == 204
+        assert test_client.get(f"{BASE_URL}/workflow/step_1").json()["key"] == 
"workflow/step_1"
+
+    def test_unauthorized_returns_401(self, unauthenticated_test_client):
+        assert unauthenticated_test_client.put(f"{BASE_URL}/job_id", 
json={"value": "v"}).status_code == 401
+
+
+class TestDeleteTaskState(TestTaskStateEndpoint):
+    def test_deletes_key(self, test_client):
+        _create_task_state(self._session, "job_id", "spark_001", self.dag_run)
+        self._session.commit()
+
+        assert test_client.delete(f"{BASE_URL}/job_id").status_code == 204
+        assert test_client.get(f"{BASE_URL}/job_id").status_code == 404
+
+    def test_delete_noop_for_missing_key(self, test_client):
+        assert test_client.delete(f"{BASE_URL}/nonexistent").status_code == 204
+
+    def test_only_deletes_target_key(self, test_client):
+        _create_task_state(self._session, "job_id", "a", self.dag_run)
+        _create_task_state(self._session, "checkpoint", "b", self.dag_run)
+        self._session.commit()
+
+        test_client.delete(f"{BASE_URL}/job_id")
+
+        assert test_client.get(f"{BASE_URL}/job_id").status_code == 404
+        assert test_client.get(f"{BASE_URL}/checkpoint").json()["value"] == "b"
+
+    def test_key_with_slash_is_supported(self, test_client):
+        _create_task_state(self._session, "workflow/step_1", "v", self.dag_run)
+        self._session.commit()
+
+        assert test_client.delete(f"{BASE_URL}/workflow/step_1").status_code 
== 204
+        assert test_client.get(f"{BASE_URL}/workflow/step_1").status_code == 
404
+
+    def test_unauthorized_returns_401(self, unauthenticated_test_client):
+        assert 
unauthenticated_test_client.delete(f"{BASE_URL}/job_id").status_code == 401
+
+
+class TestClearTaskState(TestTaskStateEndpoint):
+    def test_clears_all_keys(self, test_client):
+        for k, v in [("job_id", "a"), ("checkpoint", "b"), ("retry_count", 
"c")]:
+            _create_task_state(self._session, k, v, self.dag_run)
+        self._session.commit()
+
+        assert test_client.delete(BASE_URL).status_code == 204
+        assert test_client.get(BASE_URL).json()["total_entries"] == 0
+
+    def test_all_map_indices_clears_across_mapped_instances(self, test_client):
+        """all_map_indices=true wipes state for every map index of the task."""
+        for map_index in (-1, 0, 1):
+            row = TaskStateModel(
+                dag_run_id=self.dag_run.id,
+                dag_id=DAG_ID,
+                run_id=RUN_ID,
+                task_id=TASK_ID,
+                map_index=map_index,
+                key="job_id",
+                value=f"app_{map_index}",
+            )
+            self._session.add(row)
+        self._session.commit()
+
+        # Default clear only wipes map_index=-1
+        assert test_client.delete(BASE_URL).status_code == 204
+        # map_index=0 and map_index=1 rows still exist
+        assert 
test_client.get(f"{BASE_URL}?map_index=0").json()["total_entries"] == 1
+        assert 
test_client.get(f"{BASE_URL}?map_index=1").json()["total_entries"] == 1
+
+        # all_map_indices=true wipes everything
+        assert 
test_client.delete(f"{BASE_URL}?all_map_indices=true").status_code == 204
+        assert 
test_client.get(f"{BASE_URL}?map_index=0").json()["total_entries"] == 0
+        assert 
test_client.get(f"{BASE_URL}?map_index=1").json()["total_entries"] == 0
+
+    def test_key_with_slash_is_supported(self, test_client):
+        _create_task_state(self._session, "workflow/step_1", "v", self.dag_run)
+        self._session.commit()
+
+        assert test_client.delete(BASE_URL).status_code == 204
+        assert test_client.get(BASE_URL).json()["total_entries"] == 0
+
+    def test_unauthorized_returns_401(self, unauthenticated_test_client):
+        assert unauthenticated_test_client.delete(BASE_URL).status_code == 401
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py 
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 276c8699de0..6886bdd3aa7 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -49,6 +49,27 @@ class AssetAliasResponse(BaseModel):
     group: Annotated[str, Field(title="Group")]
 
 
+class AssetStateBody(BaseModel):
+    """
+    Request body for setting an asset state value.
+    """
+
+    model_config = ConfigDict(
+        extra="forbid",
+    )
+    value: Annotated[str, Field(max_length=65535, title="Value")]
+
+
+class AssetStateResponse(BaseModel):
+    """
+    A single asset state key/value pair with metadata.
+    """
+
+    key: Annotated[str, Field(title="Key")]
+    value: Annotated[str, Field(title="Value")]
+    updated_at: Annotated[datetime, Field(title="Updated At")]
+
+
 class AssetWatcherResponse(BaseModel):
     """
     Asset watcher serializer for responses.
@@ -906,6 +927,28 @@ class TaskOutletAssetReference(BaseModel):
     updated_at: Annotated[datetime, Field(title="Updated At")]
 
 
+class TaskStateBody(BaseModel):
+    """
+    Request body for setting a task state value.
+    """
+
+    model_config = ConfigDict(
+        extra="forbid",
+    )
+    value: Annotated[str, Field(max_length=65535, title="Value")]
+
+
+class TaskStateResponse(BaseModel):
+    """
+    A single task state key/value pair with metadata.
+    """
+
+    key: Annotated[str, Field(title="Key")]
+    value: Annotated[str, Field(title="Value")]
+    updated_at: Annotated[datetime, Field(title="Updated At")]
+    expires_at: Annotated[datetime | None, Field(title="Expires At")] = None
+
+
 class TimeDelta(BaseModel):
     """
     TimeDelta can be used to interact with datetime.timedelta objects.
@@ -1136,6 +1179,15 @@ class AssetResponse(BaseModel):
     last_asset_event: LastAssetEventResponse | None = None
 
 
+class AssetStateCollectionResponse(BaseModel):
+    """
+    All asset state entries for an asset.
+    """
+
+    asset_states: Annotated[list[AssetStateResponse], Field(title="Asset 
States")]
+    total_entries: Annotated[int, Field(title="Total Entries")]
+
+
 class BackfillPostBody(BaseModel):
     """
     Object used for create backfill request.
@@ -1854,6 +1906,15 @@ class TaskResponse(BaseModel):
     ]
 
 
+class TaskStateCollectionResponse(BaseModel):
+    """
+    All task state entries for a task instance.
+    """
+
+    task_states: Annotated[list[TaskStateResponse], Field(title="Task States")]
+    total_entries: Annotated[int, Field(title="Total Entries")]
+
+
 class VariableCollectionResponse(BaseModel):
     """
     Variable Collection serializer for responses.

Reply via email to