Copilot commented on code in PR #64023:
URL: https://github.com/apache/airflow/pull/64023#discussion_r3066477355
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -1262,22 +1283,44 @@ def _handle_request(self, msg: ToSupervisor, log:
FilteringBoundLogger, req_id:
self._terminal_state = msg.state
self._task_end_time_monotonic = time.monotonic()
self._rendered_map_index = msg.rendered_map_index
- self.client.task_instances.succeed(
- id=self.id,
- when=msg.end_date,
- task_outlets=msg.task_outlets,
- outlet_events=msg.outlet_events,
- rendered_map_index=self._rendered_map_index,
- )
+ try:
+ self.client.task_instances.succeed(
+ id=self.id,
+ when=msg.end_date,
+ task_outlets=msg.task_outlets,
+ outlet_events=msg.outlet_events,
+ rendered_map_index=self._rendered_map_index,
+ )
+ except ServerResponseError as e:
+ if e.response.status_code == HTTPStatus.CONFLICT and
_is_already_in_target_state(
+ e, msg.state
+ ):
+ log.info(
+ "TI already in success state, treating as idempotent
success",
+ ti_id=self.id,
+ )
+ else:
+ raise
elif isinstance(msg, RetryTask):
self._terminal_state = msg.state
self._task_end_time_monotonic = time.monotonic()
self._rendered_map_index = msg.rendered_map_index
- self.client.task_instances.retry(
- id=self.id,
- end_date=msg.end_date,
- rendered_map_index=self._rendered_map_index,
- )
+ try:
+ self.client.task_instances.retry(
+ id=self.id,
+ end_date=msg.end_date,
+ rendered_map_index=self._rendered_map_index,
+ )
+ except ServerResponseError as e:
+ if e.response.status_code == HTTPStatus.CONFLICT and
_is_already_in_target_state(
+ e, msg.state
+ ):
+ log.info(
+ "TI already in retry state, treating as idempotent
success",
+ ti_id=self.id,
+ )
Review Comment:
The log message in the `RetryTask` conflict handler says \"treating as
idempotent success\", which is misleading for a retry transition. Update the
message to reflect a retry/idempotent state update (e.g., \"idempotent retry\"
or \"idempotent state already applied\") so operators can accurately interpret
logs.
##########
task-sdk/tests/task_sdk/execution_time/test_supervisor.py:
##########
@@ -2702,6 +2703,211 @@ def test_handle_requests_api_server_error(self,
watched_subprocess, mocker):
"detail": error.response.json(),
}
+ def test_succeed_task_idempotent_on_409_already_success(self,
watched_subprocess, mocker):
+ """Test that a 409 with previous_state=success is treated as
idempotent success for SucceedTask."""
+ watched_subprocess, read_socket = watched_subprocess
+
+ error = ServerResponseError(
+ message="Conflict",
+ request=httpx.Request("PATCH", "http://test"),
+ response=httpx.Response(
+ 409,
+ json={
+ "detail": {
+ "reason": "invalid_state",
+ "message": "TI was not in the running state so it
cannot be updated",
+ "previous_state": "success",
+ }
+ },
+ ),
+ )
+ watched_subprocess.client.task_instances.succeed =
mocker.Mock(side_effect=error)
+
+ generator = watched_subprocess.handle_requests(log=mocker.Mock())
+ next(generator)
+
+ msg = SucceedTask(end_date=timezone.parse("2024-10-31T12:00:00Z"))
+ req_frame = _RequestFrame(id=randint(1, 2**32 - 1),
body=msg.model_dump())
+ generator.send(req_frame)
+
+ # Read response — should be an ack (no error), not an error response
+ read_socket.settimeout(0.1)
+ frame_len = int.from_bytes(read_socket.recv(4), "big")
+ response_bytes = read_socket.recv(frame_len)
+ frame = msgspec.msgpack.Decoder(_ResponseFrame).decode(response_bytes)
Review Comment:
These new tests assume `socket.recv(n)` returns exactly `n` bytes, but
`recv` can legally return fewer bytes than requested. This can make the tests
flaky under different OS/CI timing; consider using a small helper that loops
until exactly 4 bytes (length prefix) and then exactly `frame_len` bytes are
read. Also, a 0.1s socket timeout is tight for CI—raising it (or using a retry
loop) would further reduce intermittent failures.
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -448,6 +461,13 @@ def _handle_fail_fast_for_dag(ti: TI, dag_id: str,
session: SessionDep, dag_bag:
_stop_remaining_tasks(task_instance=ti,
task_teardown_map=task_teardown_map, session=session)
+def _get_requested_state(ti_patch_payload: TIStateUpdate) -> TaskInstanceState
| None:
+ """Extract the requested terminal state from a TI state update payload."""
Review Comment:
The docstring says \"requested terminal state\" but the implementation also
accepts `TIRetryStatePayload` (which is not a terminal state). Either adjust
the docstring to say \"requested state\" (or similar), or narrow the accepted
payload types to terminal-only to keep the documentation accurate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]