Copilot commented on code in PR #64743:
URL: https://github.com/apache/airflow/pull/64743#discussion_r3066493952
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -693,6 +704,41 @@ def _cleanup_open_sockets(self):
self.selector.close()
self.stdin.close()
+ self._request_thread_pool.shutdown(wait=False)
+
+ def _drain_pending_requests(self):
+ """Send responses for any offloaded requests that have completed."""
+ remaining: deque[tuple[Future, int]] = deque()
+ while self._pending_requests:
+ future, req_id = self._pending_requests.popleft()
+ if not future.done():
+ remaining.append((future, req_id))
+ continue
+ exc = future.exception()
+ if exc is not None:
+ if isinstance(exc, ServerResponseError):
+ error_details = exc.response.json() if exc.response else
None
+ log.error(
+ "API server error",
+ status_code=exc.response.status_code,
+ detail=error_details,
+ message=str(exc),
+ )
+ self.send_msg(
+ msg=None,
+ error=ErrorResponse(
+ error=ErrorType.API_SERVER_ERROR,
+ detail={
+ "status_code": exc.response.status_code,
+ "message": str(exc),
+ "detail": error_details,
+ },
+ ),
+ request_id=req_id,
+ )
+ else:
+ self.send_msg(msg=None, request_id=req_id)
Review Comment:
Two concrete issues in the exception path: (1) when
`ServerResponseError.response` is `None`, the code dereferences
`exc.response.status_code` (lines 723/732), which will raise and prevent any
response being sent back to the task; (2) for exceptions that are *not*
`ServerResponseError`, no response is sent at all, which can leave the task
process waiting indefinitely for a reply. Fix by handling `response is None`
safely (e.g., omit status_code or set it to `None`) and by adding a fallback
that sends an appropriate `ErrorResponse` for any non-`ServerResponseError`
exception.
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -693,6 +704,41 @@ def _cleanup_open_sockets(self):
self.selector.close()
self.stdin.close()
+ self._request_thread_pool.shutdown(wait=False)
+
+ def _drain_pending_requests(self):
+ """Send responses for any offloaded requests that have completed."""
+ remaining: deque[tuple[Future, int]] = deque()
+ while self._pending_requests:
+ future, req_id = self._pending_requests.popleft()
+ if not future.done():
+ remaining.append((future, req_id))
+ continue
+ exc = future.exception()
+ if exc is not None:
+ if isinstance(exc, ServerResponseError):
+ error_details = exc.response.json() if exc.response else
None
Review Comment:
`exc.response.json()` can raise (e.g., non-JSON error body / decode error).
If that happens inside `_drain_pending_requests()`, it can bubble out and
disrupt the supervisor loop, defeating the goal of keeping heartbeats flowing.
Wrap JSON parsing in a try/except and fall back to `exc.response.text` (or
`None`) for `detail` if parsing fails.
##########
task-sdk/tests/task_sdk/execution_time/test_supervisor.py:
##########
@@ -2618,6 +2618,12 @@ def test_handle_requests(
req_frame = _RequestFrame(id=randint(1, 2**32 - 1),
body=message.model_dump())
generator.send(req_frame)
+ # SetXCom is offloaded to a thread to avoid blocking the event loop.
+ # We need to wait for the future and drain it before reading the
response.
+ if isinstance(message, SetXCom):
+ watched_subprocess._request_thread_pool.shutdown(wait=True)
+ watched_subprocess._drain_pending_requests()
Review Comment:
Calling `shutdown(wait=True)` inside the request loop makes the executor
unusable for any subsequent `SetXCom` submissions in the same test run, which
can cause order-dependent failures if the generator sends multiple `SetXCom`
messages. Prefer waiting on the currently pending futures (similar to the
in-process path’s `concurrent.futures.wait`) and then calling
`_drain_pending_requests()`, without shutting down the executor in the middle
of the test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]