Copilot commented on code in PR #65029:
URL: https://github.com/apache/airflow/pull/65029#discussion_r3066476433


##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:
##########
@@ -1929,6 +1929,95 @@ def test_ti_heartbeat_update(self, client, session, 
create_task_instance, time_m
         session.refresh(ti)
         assert ti.last_heartbeat_at == time_now.add(minutes=10)
 
+    def test_ti_heartbeat_fast_path_skips_fallback(
+        self, client, session, create_task_instance, monkeypatch, time_machine
+    ):
+        """When the fast-path UPDATE succeeds, the fallback path does not 
run."""
+        time_now = timezone.parse("2024-10-31T12:00:00Z")
+        time_machine.move_to(time_now, tick=False)
+
+        ti = create_task_instance(
+            task_id="test_ti_heartbeat_fast_path_skips_fallback",
+            state=State.RUNNING,
+            hostname="random-hostname",
+            pid=1547,
+            last_heartbeat_at=time_now,
+            session=session,
+        )
+        session.commit()
+
+        new_time = time_now.add(minutes=10)
+        time_machine.move_to(new_time, tick=False)
+
+        original_execute = Session.execute
+        update_count = 0
+
+        def counting_execute(session_obj, statement, *args, **kwargs):
+            nonlocal update_count
+            if getattr(statement, "is_update", False) and statement.table.name 
== TaskInstance.__table__.name:
+                update_count += 1
+            return original_execute(session_obj, statement, *args, **kwargs)
+
+        monkeypatch.setattr(Session, "execute", counting_execute)
+
+        response = client.put(
+            f"/execution/task-instances/{ti.id}/heartbeat",
+            json={"hostname": "random-hostname", "pid": 1547},
+        )
+
+        assert response.status_code == 204
+        # Only the fast-path UPDATE should have executed; no fallback UPDATE
+        assert update_count == 1

Review Comment:
   This test is tightly coupled to internal SQLAlchemy call patterns by 
counting *any* `UPDATE` against the `task_instance` table. It can become 
brittle/flaky if the endpoint later adds another legitimate `TaskInstance` 
update (e.g., ancillary columns) even when the fallback is not exercised. A 
more robust assertion would directly detect that the fallback path didn’t run 
(e.g., by asserting no `SELECT ... FOR UPDATE` was executed, or by matching the 
specific fast-path guarded `UPDATE` shape rather than any `TaskInstance` 
update).



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -695,8 +695,24 @@ def ti_heartbeat(
     bind_contextvars(ti_id=str(task_instance_id))
     log.debug("Processing heartbeat", hostname=ti_payload.hostname, 
pid=ti_payload.pid)
 
-    # Hot path: since heartbeating a task is a very common operation, we try 
to do minimize the number of queries
-    # and DB round trips as much as possible.
+    # Hot path: in the common case the TI is still running on the same host 
and pid,
+    # so we can update last_heartbeat_at directly without first taking a row 
lock.
+    fast_path_result = session.execute(
+        update(TI)
+        .where(
+            TI.id == task_instance_id,
+            TI.state == TaskInstanceState.RUNNING,
+            TI.hostname == ti_payload.hostname,
+            TI.pid == ti_payload.pid,
+        )
+        .values(last_heartbeat_at=timezone.utcnow())
+        .execution_options(synchronize_session=False)
+    )
+    if fast_path_result.rowcount:

Review Comment:
   `rowcount` is not guaranteed to be a positive integer across all 
DBAPIs/dialects; some can return `-1` (unknown) or `None`. Using truthiness 
here can incorrectly treat an “unknown” rowcount (e.g., `-1`) as success and 
skip the fallback path. Prefer an explicit positive check (e.g., `rowcount is 
not None and rowcount > 0`) to avoid false positives.
   ```suggestion
       if fast_path_result.rowcount is not None and fast_path_result.rowcount > 
0:
   ```



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -695,8 +695,24 @@ def ti_heartbeat(
     bind_contextvars(ti_id=str(task_instance_id))
     log.debug("Processing heartbeat", hostname=ti_payload.hostname, 
pid=ti_payload.pid)
 
-    # Hot path: since heartbeating a task is a very common operation, we try 
to do minimize the number of queries
-    # and DB round trips as much as possible.
+    # Hot path: in the common case the TI is still running on the same host 
and pid,
+    # so we can update last_heartbeat_at directly without first taking a row 
lock.
+    fast_path_result = session.execute(
+        update(TI)
+        .where(
+            TI.id == task_instance_id,
+            TI.state == TaskInstanceState.RUNNING,
+            TI.hostname == ti_payload.hostname,
+            TI.pid == ti_payload.pid,
+        )
+        .values(last_heartbeat_at=timezone.utcnow())
+        .execution_options(synchronize_session=False)
+    )
+    if fast_path_result.rowcount:
+        log.debug("Heartbeat updated via fast path")
+        return
+
+    log.info("Heartbeat fast path missed; falling back to diagnostic checks")

Review Comment:
   This endpoint is likely very high volume; emitting an `INFO` log on every 
fast-path miss can create significant log noise (e.g., for non-running TIs, 
mismatched host/pid, or invalid IDs). Consider lowering this to `DEBUG` (or 
substantially enriching/ratelimiting it) so production logs aren’t flooded 
during transient mismatches or misrouted heartbeats.
   ```suggestion
       log.debug("Heartbeat fast path missed; falling back to diagnostic 
checks")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to