Copilot commented on code in PR #64023:
URL: https://github.com/apache/airflow/pull/64023#discussion_r3025336040
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -448,6 +461,13 @@ def _handle_fail_fast_for_dag(ti: TI, dag_id: str,
session: SessionDep, dag_bag:
_stop_remaining_tasks(task_instance=ti,
task_teardown_map=task_teardown_map, session=session)
+def _get_requested_state(ti_patch_payload: TIStateUpdate) -> TaskInstanceState
| None:
+ """Extract the requested terminal state from a TI state update payload."""
Review Comment:
`_get_requested_state()` claims to extract a "terminal" state, but it also
accepts `TIRetryStatePayload` (up_for_retry). Suggest updating the docstring
(or renaming the helper) to reflect that it extracts the requested *target*
state for idempotency checks, not strictly terminal states.
```suggestion
"""Extract the requested target state from a TI state update payload for
idempotency checks."""
```
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -363,6 +363,19 @@ def ti_update_state(
)
if previous_state != TaskInstanceState.RUNNING:
+ # Check for idempotent duplicate: if the TI is already in the exact
same
+ # terminal state being requested, treat it as a successful no-op.
+ # This handles network-retry scenarios where the first request
succeeded
+ # but the response was lost, causing a retry that sees the TI already
+ # in the target state.
+ requested_state = _get_requested_state(ti_patch_payload)
+ if requested_state is not None and previous_state == requested_state:
+ log.info(
+ "TI is already in the requested terminal state, treating as
idempotent success",
Review Comment:
The new idempotency path is described as handling a "terminal" state, but
this branch is also used for non-terminal-but-direct states like up_for_retry.
Consider rewording these comments (and any related docs) to refer to the
"requested target state" rather than "terminal state" to avoid confusion for
future maintainers.
```suggestion
# requested target state, treat it as a successful no-op.
# This handles network-retry scenarios where the first request
succeeded
# but the response was lost, causing a retry that sees the TI already
# in that requested state.
requested_state = _get_requested_state(ti_patch_payload)
if requested_state is not None and previous_state == requested_state:
log.info(
"TI is already in the requested target state, treating as
idempotent success",
```
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -1081,12 +1091,23 @@ def update_task_state_if_needed(self):
# For states like `deferred`, `up_for_reschedule`, the process will
exit with 0, but the state will be updated
# by the subprocess in the `handle_requests` method.
if self.final_state not in STATES_SENT_DIRECTLY:
- self.client.task_instances.finish(
- id=self.id,
- state=self.final_state,
- when=datetime.now(tz=timezone.utc),
- rendered_map_index=self._rendered_map_index,
- )
+ try:
+ self.client.task_instances.finish(
+ id=self.id,
+ state=self.final_state,
+ when=datetime.now(tz=timezone.utc),
+ rendered_map_index=self._rendered_map_index,
+ )
+ except ServerResponseError as e:
+ if e.response.status_code == HTTPStatus.CONFLICT:
+ log.warning(
+ "Failed to update TI state after process exit due to
conflict",
+ ti_id=self.id,
+ final_state=self.final_state,
+ detail=getattr(e, "detail", str(e)),
Review Comment:
On conflict, this log field uses `getattr(e, "detail", str(e))`, but
`ServerResponseError` details usually live on `e.response` (e.g., JSON body).
Logging `e.response.json()`/`e.response.text` (guarded) would make this warning
much more actionable when diagnosing why the finish update conflicted.
```suggestion
detail = getattr(e, "detail", str(e))
response_body = None
try:
if e.response is not None:
try:
response_body = e.response.json()
except Exception:
response_body = e.response.text
except Exception:
response_body = None
log.warning(
"Failed to update TI state after process exit due to
conflict",
ti_id=self.id,
final_state=self.final_state,
detail=detail,
response_body=response_body,
```
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -1262,22 +1283,44 @@ def _handle_request(self, msg: ToSupervisor, log:
FilteringBoundLogger, req_id:
self._terminal_state = msg.state
self._task_end_time_monotonic = time.monotonic()
self._rendered_map_index = msg.rendered_map_index
- self.client.task_instances.succeed(
- id=self.id,
- when=msg.end_date,
- task_outlets=msg.task_outlets,
- outlet_events=msg.outlet_events,
- rendered_map_index=self._rendered_map_index,
- )
+ try:
+ self.client.task_instances.succeed(
+ id=self.id,
+ when=msg.end_date,
+ task_outlets=msg.task_outlets,
+ outlet_events=msg.outlet_events,
+ rendered_map_index=self._rendered_map_index,
+ )
+ except ServerResponseError as e:
+ if e.response.status_code == HTTPStatus.CONFLICT and
_is_already_in_target_state(
+ e, msg.state
+ ):
+ log.info(
+ "TI already in success state, treating as idempotent
success",
+ ti_id=self.id,
+ )
Review Comment:
This info log says "treating as idempotent success". Since this is
specifically swallowing a 409 conflict, wording like "idempotent no-op" /
"already in target state" (and optionally including
`previous_state`/`target_state`) would be clearer for operators reading logs
during incident triage.
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -1262,22 +1283,44 @@ def _handle_request(self, msg: ToSupervisor, log:
FilteringBoundLogger, req_id:
self._terminal_state = msg.state
self._task_end_time_monotonic = time.monotonic()
self._rendered_map_index = msg.rendered_map_index
- self.client.task_instances.succeed(
- id=self.id,
- when=msg.end_date,
- task_outlets=msg.task_outlets,
- outlet_events=msg.outlet_events,
- rendered_map_index=self._rendered_map_index,
- )
+ try:
+ self.client.task_instances.succeed(
+ id=self.id,
+ when=msg.end_date,
+ task_outlets=msg.task_outlets,
+ outlet_events=msg.outlet_events,
+ rendered_map_index=self._rendered_map_index,
+ )
+ except ServerResponseError as e:
+ if e.response.status_code == HTTPStatus.CONFLICT and
_is_already_in_target_state(
+ e, msg.state
+ ):
+ log.info(
+ "TI already in success state, treating as idempotent
success",
+ ti_id=self.id,
+ )
+ else:
+ raise
elif isinstance(msg, RetryTask):
self._terminal_state = msg.state
self._task_end_time_monotonic = time.monotonic()
self._rendered_map_index = msg.rendered_map_index
- self.client.task_instances.retry(
- id=self.id,
- end_date=msg.end_date,
- rendered_map_index=self._rendered_map_index,
- )
+ try:
+ self.client.task_instances.retry(
+ id=self.id,
+ end_date=msg.end_date,
+ rendered_map_index=self._rendered_map_index,
+ )
+ except ServerResponseError as e:
+ if e.response.status_code == HTTPStatus.CONFLICT and
_is_already_in_target_state(
+ e, msg.state
+ ):
+ log.info(
+ "TI already in retry state, treating as idempotent
success",
Review Comment:
This log message says "treating as idempotent success" even though the
target state here is a retry transition. Consider using neutral phrasing such
as "already in retry state, treating as idempotent no-op" to avoid implying the
task succeeded when it actually moved to UP_FOR_RETRY.
```suggestion
"TI already in retry state, treating as idempotent
no-op",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]