Copilot commented on code in PR #64743:
URL: https://github.com/apache/airflow/pull/64743#discussion_r3066493952


##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -693,6 +704,41 @@ def _cleanup_open_sockets(self):
 
         self.selector.close()
         self.stdin.close()
+        self._request_thread_pool.shutdown(wait=False)
+
+    def _drain_pending_requests(self):
+        """Send responses for any offloaded requests that have completed."""
+        remaining: deque[tuple[Future, int]] = deque()
+        while self._pending_requests:
+            future, req_id = self._pending_requests.popleft()
+            if not future.done():
+                remaining.append((future, req_id))
+                continue
+            exc = future.exception()
+            if exc is not None:
+                if isinstance(exc, ServerResponseError):
+                    error_details = exc.response.json() if exc.response else 
None
+                    log.error(
+                        "API server error",
+                        status_code=exc.response.status_code,
+                        detail=error_details,
+                        message=str(exc),
+                    )
+                    self.send_msg(
+                        msg=None,
+                        error=ErrorResponse(
+                            error=ErrorType.API_SERVER_ERROR,
+                            detail={
+                                "status_code": exc.response.status_code,
+                                "message": str(exc),
+                                "detail": error_details,
+                            },
+                        ),
+                        request_id=req_id,
+                    )
+            else:
+                self.send_msg(msg=None, request_id=req_id)

Review Comment:
   Two concrete issues in the exception path: (1) when 
`ServerResponseError.response` is `None`, the code dereferences 
`exc.response.status_code` (lines 723/732), which will raise and prevent any 
response being sent back to the task; (2) for exceptions that are *not* 
`ServerResponseError`, no response is sent at all, which can leave the task 
process waiting indefinitely for a reply. Fix by handling `response is None` 
safely (e.g., omit status_code or set it to `None`) and by adding a fallback 
that sends an appropriate `ErrorResponse` for any non-`ServerResponseError` 
exception.



##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -693,6 +704,41 @@ def _cleanup_open_sockets(self):
 
         self.selector.close()
         self.stdin.close()
+        self._request_thread_pool.shutdown(wait=False)
+
+    def _drain_pending_requests(self):
+        """Send responses for any offloaded requests that have completed."""
+        remaining: deque[tuple[Future, int]] = deque()
+        while self._pending_requests:
+            future, req_id = self._pending_requests.popleft()
+            if not future.done():
+                remaining.append((future, req_id))
+                continue
+            exc = future.exception()
+            if exc is not None:
+                if isinstance(exc, ServerResponseError):
+                    error_details = exc.response.json() if exc.response else 
None

Review Comment:
   `exc.response.json()` can raise (e.g., non-JSON error body / decode error). 
If that happens inside `_drain_pending_requests()`, it can bubble out and 
disrupt the supervisor loop, defeating the goal of keeping heartbeats flowing. 
Wrap JSON parsing in a try/except and fall back to `exc.response.text` (or 
`None`) for `detail` if parsing fails.



##########
task-sdk/tests/task_sdk/execution_time/test_supervisor.py:
##########
@@ -2618,6 +2618,12 @@ def test_handle_requests(
         req_frame = _RequestFrame(id=randint(1, 2**32 - 1), 
body=message.model_dump())
         generator.send(req_frame)
 
+        # SetXCom is offloaded to a thread to avoid blocking the event loop.
+        # We need to wait for the future and drain it before reading the 
response.
+        if isinstance(message, SetXCom):
+            watched_subprocess._request_thread_pool.shutdown(wait=True)
+            watched_subprocess._drain_pending_requests()

Review Comment:
   Calling `shutdown(wait=True)` inside the request loop makes the executor 
unusable for any subsequent `SetXCom` submissions in the same test run, which 
can cause order-dependent failures if the generator sends multiple `SetXCom` 
messages. Prefer waiting on the currently pending futures (similar to the 
in-process path’s `concurrent.futures.wait`) and then calling 
`_drain_pending_requests()`, without shutting down the executor in the middle 
of the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to