amoghrajesh commented on code in PR #63355:
URL: https://github.com/apache/airflow/pull/63355#discussion_r2923241453


##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1349,7 +1360,17 @@ def _on_term(signum, frame):
         Stats.incr("ti.finish", tags={**stats_tags, "state": state.value})
 
         if msg:
-            SUPERVISOR_COMMS.send(msg=msg)
+            try:
+                SUPERVISOR_COMMS.send(msg=msg)
+            except AirflowRuntimeError as e:
+                is_duplicate, previous_state = 
_is_duplicate_state_update(error=e, requested_state=state)
+                if not is_duplicate:
+                    raise
+                log.warning(
+                    "Task instance state was already updated by API server; 
ignoring duplicate state update",
+                    requested_state=state.value,
+                    previous_state=previous_state,
+                )

Review Comment:
   I don't think we should be handling this in the task runner but in the API 
server instead.
   
   The ti_run endpoint handles something similar: 
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py#L174-L181,
 which essentially is an idempotency check. I feel having somewhat similar 
logic should be the way to go here.
   
   ie: if ti is already in the state being requested, send a 200 I guess.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to