amoghrajesh commented on code in PR #67319:
URL: https://github.com/apache/airflow/pull/67319#discussion_r3296241735


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py:
##########
@@ -147,24 +180,51 @@ def set_task_state(
     map_index: Annotated[int, Query(ge=-1)] = -1,
 ) -> None:
     """Set a task state value. Creates or overwrites the key."""
-    ti_exists = session.scalar(
-        select(TI.task_id).where(
-            TI.dag_id == dag_id,
-            TI.run_id == dag_run_id,
-            TI.task_id == task_id,
-            TI.map_index == map_index,
+    _require_ti(dag_id, dag_run_id, task_id, map_index, session)
+    expires_at = _resolve_expires_at(body.expires_at)
+    scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
+    try:
+        get_state_backend().set(scope, key, body.value, expires_at=expires_at, 
session=session)
+    except ValueError as e:
+        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, 
detail=str(e)) from e
+
+
+@task_state_router.patch(
+    "/{key:path}",
+    status_code=status.HTTP_204_NO_CONTENT,
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_dag(method="PUT", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def patch_task_state(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    key: str,
+    body: TaskStatePatchBody,
+    session: SessionDep,
+    map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> None:
+    """Update the value of an existing task state key."""
+    _require_ti(dag_id, dag_run_id, task_id, map_index, session)
+
+    existing = session.execute(
+        select(TaskStateModel.expires_at).where(
+            TaskStateModel.dag_id == dag_id,
+            TaskStateModel.run_id == dag_run_id,
+            TaskStateModel.task_id == task_id,
+            TaskStateModel.map_index == map_index,
+            TaskStateModel.key == key,
         )
-    )
-    if ti_exists is None:
+    ).one_or_none()
+
+    if existing is None:
         raise HTTPException(
             status_code=status.HTTP_404_NOT_FOUND,
-            detail=f"Task instance not found for dag_id={dag_id!r}, 
run_id={dag_run_id!r}, task_id={task_id!r}, map_index={map_index}",
+            detail=f"Task state key {key!r} not found",
         )
+
     scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
-    try:
-        get_state_backend().set(scope, key, body.value, session=session)
-    except ValueError as e:
-        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, 
detail=str(e)) from e
+    get_state_backend().set(scope, key, body.value, 
expires_at=existing.expires_at, session=session)

Review Comment:
   Good point on the potential race. Looking at how xcom Core API handles this 
— it explicitly bypasses the backend and operates on raw DB values only. That 
raises a broader question: should the task state Core API also bypass 
get_state_backend() entirely and use direct SQL like XCom does which I will 
clarify with the team. 
   
   
   
   Right now PUT/DELETE/CLEAR all go through the backend, so changing PATCH to 
do a direct UPDATE would be inconsistent. I will rather keep PATCH going 
through `get_state_backend().set()` for now, and track the Core API vs backend 
question as a follow-up — either we align everything with xcom pattern or we 
add a `patch()` method to BaseStateBackend for atomic value-only updates. The 
race is theoretically possible but practically unlikely since task state is 
scoped to a single TI. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to