This is an automated email from the ASF dual-hosted git repository.

kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 86a75201759 Fix task runner failure on duplicate TI success update 
conflict (#63355)
86a75201759 is described below

commit 86a75201759bb0d3d893ae51cbd0020d3253de11
Author: Siddharthan P S <[email protected]>
AuthorDate: Tue Jun 16 15:38:52 2026 -0400

    Fix task runner failure on duplicate TI success update conflict (#63355)
    
    Fixes #63183
---
 .../execution_api/routes/task_instances.py         | 28 +++++++----
 .../versions/head/test_task_instances.py           | 56 ++++++++++++++++++++++
 2 files changed, 74 insertions(+), 10 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 3deccf35be6..ae71e41cf8e 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -336,16 +336,12 @@ def ti_run(
 @ti_id_router.patch(
     "/{task_instance_id}/state",
     status_code=status.HTTP_204_NO_CONTENT,
-    responses=create_openapi_http_exception_doc(
-        [
-            (status.HTTP_404_NOT_FOUND, "Task Instance not found"),
-            (
-                status.HTTP_409_CONFLICT,
-                "The TI is already in the requested state",
-            ),
-            (HTTP_422_UNPROCESSABLE_CONTENT, "Invalid payload for the state 
transition"),
-        ]
-    ),
+    responses={
+        status.HTTP_200_OK: {"description": "The TI was already in the 
requested state"},
+        status.HTTP_404_NOT_FOUND: {"description": "Task Instance not found"},
+        status.HTTP_409_CONFLICT: {"description": "The TI is not in a valid 
state for this transition"},
+        HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid payload for 
the state transition"},
+    },
 )
 def ti_update_state(
     task_instance_id: UUID,
@@ -410,6 +406,18 @@ def ti_update_state(
             },
         )
 
+    # TIStateUpdate can include terminal and intermediate states. This 
idempotency check handles
+    # duplicate updates when the requested state is already persisted (for 
example SUCCESS ->
+    # SUCCESS or DEFERRED -> DEFERRED), including duplicates that would not 
pass the RUNNING
+    # transition check below.
+    if ti_patch_payload.state.value == previous_state:
+        log.info(
+            "Duplicate state update request received; state already set",
+            requested_state=ti_patch_payload.state.value,
+            previous_state=previous_state,
+        )
+        return Response(status_code=status.HTTP_200_OK)
+
     if previous_state != TaskInstanceState.RUNNING:
         log.warning(
             "Cannot update Task Instance in invalid state",
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index f990b7008ab..395837c0e61 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -2011,6 +2011,62 @@ class TestTIUpdateState:
         session.refresh(ti)
         assert ti.state == State.SUCCESS
 
+    def test_ti_update_state_same_state_is_idempotent(self, client, session, 
create_task_instance):
+        """Test that setting a TI to its current state is treated as an 
idempotent no-op."""
+        ti = create_task_instance(
+            task_id="test_ti_update_state_same_state_is_idempotent",
+            state=State.SUCCESS,
+            session=session,
+            start_date=DEFAULT_START_DATE,
+        )
+        ti.end_date = DEFAULT_END_DATE
+        session.commit()
+
+        response = client.patch(
+            f"/execution/task-instances/{ti.id}/state",
+            json={
+                "state": "success",
+                "end_date": timezone.parse("2024-10-31T13:00:00Z").isoformat(),
+            },
+        )
+        assert response.status_code == 200
+        assert response.content == b""
+
+        session.refresh(ti)
+        assert ti.state == State.SUCCESS
+        assert ti.end_date == DEFAULT_END_DATE
+
+    def test_ti_update_state_terminal_state_mismatch_returns_conflict(
+        self, client, session, create_task_instance
+    ):
+        """A completed TI cannot be updated to a different state."""
+        ti = create_task_instance(
+            
task_id="test_ti_update_state_terminal_state_mismatch_returns_conflict",
+            state=State.SUCCESS,
+            session=session,
+            start_date=DEFAULT_START_DATE,
+        )
+        ti.end_date = DEFAULT_END_DATE
+        session.commit()
+
+        response = client.patch(
+            f"/execution/task-instances/{ti.id}/state",
+            json={
+                "state": "failed",
+                "end_date": timezone.parse("2024-10-31T13:00:00Z").isoformat(),
+            },
+        )
+        assert response.status_code == 409
+        assert response.json()["detail"] == {
+            "reason": "invalid_state",
+            "message": "TI was not in the running state so it cannot be 
updated",
+            "previous_state": State.SUCCESS,
+        }
+
+        session.refresh(ti)
+        assert ti.state == State.SUCCESS
+        assert ti.end_date == DEFAULT_END_DATE
+
     def test_ti_update_state_to_failed_without_fail_fast(self, client, 
session, dag_maker):
         """Test that SerializedDAG is NOT loaded when fail_fast=False 
(default)."""
         with dag_maker(dag_id="test_dag_no_fail_fast", serialized=True):

Reply via email to