This is an automated email from the ASF dual-hosted git repository.
kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 86a75201759 Fix task runner failure on duplicate TI success update
conflict (#63355)
86a75201759 is described below
commit 86a75201759bb0d3d893ae51cbd0020d3253de11
Author: Siddharthan P S <[email protected]>
AuthorDate: Tue Jun 16 15:38:52 2026 -0400
Fix task runner failure on duplicate TI success update conflict (#63355)
Fixes #63183
---
.../execution_api/routes/task_instances.py | 28 +++++++----
.../versions/head/test_task_instances.py | 56 ++++++++++++++++++++++
2 files changed, 74 insertions(+), 10 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 3deccf35be6..ae71e41cf8e 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -336,16 +336,12 @@ def ti_run(
@ti_id_router.patch(
"/{task_instance_id}/state",
status_code=status.HTTP_204_NO_CONTENT,
- responses=create_openapi_http_exception_doc(
- [
- (status.HTTP_404_NOT_FOUND, "Task Instance not found"),
- (
- status.HTTP_409_CONFLICT,
- "The TI is already in the requested state",
- ),
- (HTTP_422_UNPROCESSABLE_CONTENT, "Invalid payload for the state
transition"),
- ]
- ),
+ responses={
+ status.HTTP_200_OK: {"description": "The TI was already in the
requested state"},
+ status.HTTP_404_NOT_FOUND: {"description": "Task Instance not found"},
+ status.HTTP_409_CONFLICT: {"description": "The TI is not in a valid
state for this transition"},
+ HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid payload for
the state transition"},
+ },
)
def ti_update_state(
task_instance_id: UUID,
@@ -410,6 +406,18 @@ def ti_update_state(
},
)
+ # TIStateUpdate can include terminal and intermediate states. This
idempotency check handles
+ # duplicate updates when the requested state is already persisted (for
example SUCCESS ->
+ # SUCCESS or DEFERRED -> DEFERRED), including duplicates that would not
pass the RUNNING
+ # transition check below.
+ if ti_patch_payload.state.value == previous_state:
+ log.info(
+ "Duplicate state update request received; state already set",
+ requested_state=ti_patch_payload.state.value,
+ previous_state=previous_state,
+ )
+ return Response(status_code=status.HTTP_200_OK)
+
if previous_state != TaskInstanceState.RUNNING:
log.warning(
"Cannot update Task Instance in invalid state",
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index f990b7008ab..395837c0e61 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -2011,6 +2011,62 @@ class TestTIUpdateState:
session.refresh(ti)
assert ti.state == State.SUCCESS
+ def test_ti_update_state_same_state_is_idempotent(self, client, session,
create_task_instance):
+ """Test that setting a TI to its current state is treated as an
idempotent no-op."""
+ ti = create_task_instance(
+ task_id="test_ti_update_state_same_state_is_idempotent",
+ state=State.SUCCESS,
+ session=session,
+ start_date=DEFAULT_START_DATE,
+ )
+ ti.end_date = DEFAULT_END_DATE
+ session.commit()
+
+ response = client.patch(
+ f"/execution/task-instances/{ti.id}/state",
+ json={
+ "state": "success",
+ "end_date": timezone.parse("2024-10-31T13:00:00Z").isoformat(),
+ },
+ )
+ assert response.status_code == 200
+ assert response.content == b""
+
+ session.refresh(ti)
+ assert ti.state == State.SUCCESS
+ assert ti.end_date == DEFAULT_END_DATE
+
+ def test_ti_update_state_terminal_state_mismatch_returns_conflict(
+ self, client, session, create_task_instance
+ ):
+ """A completed TI cannot be updated to a different state."""
+ ti = create_task_instance(
+
task_id="test_ti_update_state_terminal_state_mismatch_returns_conflict",
+ state=State.SUCCESS,
+ session=session,
+ start_date=DEFAULT_START_DATE,
+ )
+ ti.end_date = DEFAULT_END_DATE
+ session.commit()
+
+ response = client.patch(
+ f"/execution/task-instances/{ti.id}/state",
+ json={
+ "state": "failed",
+ "end_date": timezone.parse("2024-10-31T13:00:00Z").isoformat(),
+ },
+ )
+ assert response.status_code == 409
+ assert response.json()["detail"] == {
+ "reason": "invalid_state",
+ "message": "TI was not in the running state so it cannot be
updated",
+ "previous_state": State.SUCCESS,
+ }
+
+ session.refresh(ti)
+ assert ti.state == State.SUCCESS
+ assert ti.end_date == DEFAULT_END_DATE
+
def test_ti_update_state_to_failed_without_fail_fast(self, client,
session, dag_maker):
"""Test that SerializedDAG is NOT loaded when fail_fast=False
(default)."""
with dag_maker(dag_id="test_dag_no_fail_fast", serialized=True):