amoghrajesh commented on code in PR #61631:
URL: https://github.com/apache/airflow/pull/61631#discussion_r2815146503


##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -587,6 +591,24 @@ def ti_heartbeat(
             "Retrieved current task state", state=previous_state, 
current_hostname=hostname, current_pid=pid
         )
     except NoResultFound:
+        # Check if the TI exists in the Task Instance History table.
+        # If it does, it was likely cleared while running, so return 410 Gone
+        # instead of 404 Not Found to give the client a more specific signal.
+        tih_exists = session.scalar(
+            
select(func.count(TIH.task_instance_id)).where(TIH.task_instance_id == 
ti_id_str)
+        )
+        if tih_exists:
+            log.error(
+                "Task Instance not found in TI table but exists in Task 
Instance History",

Review Comment:
   ```suggestion
                   "TaskInstance was previously cleared and archived in 
history, heartbeat skipped",
   ```



##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:
##########
@@ -1591,6 +1591,41 @@ def test_ti_heartbeat_non_existent_task(self, client, 
session, create_task_insta
             "message": "Task Instance not found",
         }
 
+    def test_ti_heartbeat_cleared_task_returns_410(self, client, session, 
create_task_instance):
+        """Test that a 410 error is returned when a TI was cleared and moved 
to TIH."""
+        ti = create_task_instance(
+            task_id="test_ti_heartbeat_cleared",
+            state=State.RUNNING,
+            hostname="random-hostname",
+            pid=1547,
+            session=session,
+        )
+        session.commit()
+        old_ti_id = ti.id
+
+        # Simulate task being cleared: this archives the current try to TIH
+        # and assigns a new UUID to the TI, mirroring 
prepare_db_for_next_try().
+        ti.prepare_db_for_next_try(session)
+        session.commit()
+
+        # Pre-condition: old UUID is gone from TI table but exists in TIH

Review Comment:
   ```suggestion
   ```



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -587,6 +591,24 @@ def ti_heartbeat(
             "Retrieved current task state", state=previous_state, 
current_hostname=hostname, current_pid=pid
         )
     except NoResultFound:
+        # Check if the TI exists in the Task Instance History table.
+        # If it does, it was likely cleared while running, so return 410 Gone
+        # instead of 404 Not Found to give the client a more specific signal.
+        tih_exists = session.scalar(
+            
select(func.count(TIH.task_instance_id)).where(TIH.task_instance_id == 
ti_id_str)
+        )
+        if tih_exists:
+            log.error(
+                "Task Instance not found in TI table but exists in Task 
Instance History",
+                ti_id=ti_id_str,
+            )
+            raise HTTPException(
+                status_code=status.HTTP_410_GONE,
+                detail={
+                    "reason": "not_found",
+                    "message": "Task Instance not found, it may have been 
moved to the Task Instance History table",

Review Comment:
   ```suggestion
                       "message": "Task Instance not found",
   ```
   
   We know it didn't move ti TIH from the check above 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to