This is an automated email from the ASF dual-hosted git repository. vatsrahul1001 pushed a commit to branch backport-1e5d799-v3-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 5519b592534fed177f62f3fba522c1967bf5b3cb Author: Jarek Potiuk <[email protected]> AuthorDate: Tue May 19 13:21:18 2026 +0200 Don't crash supervisor IPC loop on transient network errors (#66572) * Don't crash supervisor IPC loop on transient network errors handle_requests in the supervisor only caught ServerResponseError. Any non-HTTP exception (httpx.ConnectError, httpx.TimeoutException, socket timeouts, etc.) would propagate, terminate the generator, and permanently break the supervisor-to-task IPC channel. The task subprocess would then get EOFError on every subsequent send, and the worker would be stuck waiting for replies that never come. Add a catch-all except Exception after the ServerResponseError handler that logs the unhandled exception with type info, sends a best-effort ErrorResponse(API_SERVER_ERROR, ...) back to the task so the failure surfaces in task logs (wrapped in suppress(Exception) because if we can't reach the task subprocess via stdin we shouldn't double-fault), and lets the request loop continue to the next request. Test added: a fake httpx.ConnectError on the first call produces an ErrorResponse, the generator stays alive, and a second request is processed normally (the loop is not dead). Reported by the L3 ASVS sweep at apache/tooling-agents#24 (FINDING-005). * Address review comments: shorten comment and use exc_info - Shorten the catch-all comment per amoghrajesh's suggestion. - Use exc_info=e in log.exception instead of exception_type field per jason810496's suggestion (exception type is redundant since the exception itself is logged with full type info and traceback). (cherry picked from commit 1e5d79945ad5df1fca8f6d06c8f2cde8124a981a) --- .../src/airflow/sdk/execution_time/supervisor.py | 22 ++++++++++++ .../task_sdk/execution_time/test_supervisor.py | 40 ++++++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 3f53a68016a..cd68dc85255 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -760,6 +760,28 @@ class WatchedSubprocess: ), request_id=request.id, ) + except Exception as e: + # Generic exception handling so a transient network error (httpx.ConnectError / + # httpx.TimeoutException) or any other exception + # doesn't crash this generator and crash the IPC communication between supervisor and task. + log.exception( + "Unhandled exception while handling task request", + request_id=request.id, + exc_info=e, + ) + with suppress(Exception): + self.send_msg( + msg=None, + error=ErrorResponse( + error=ErrorType.API_SERVER_ERROR, + detail={ + "status_code": None, + "message": str(e), + "exception_type": type(e).__name__, + }, + ), + request_id=request.id, + ) def _handle_request(self, msg, log: FilteringBoundLogger, req_id: int) -> None: raise NotImplementedError() diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 196679708e2..f61b257b71b 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -2753,6 +2753,46 @@ class TestHandleRequest: "detail": error.response.json(), } + def test_handle_requests_network_exception_does_not_crash_loop(self, watched_subprocess, mocker): + """A transient network error must not crash the IPC generator. + + Without the catch-all in handle_requests, an httpx.ConnectError would + propagate, the generator would terminate, the task subprocess would + get EOFError on every subsequent send, and the worker would be stuck. + Verify that the error is reported back to the task as an + API_SERVER_ERROR ErrorResponse and that the loop stays alive for the + next request. + """ + watched_subprocess, read_socket = watched_subprocess + + # First request raises a network exception, second succeeds. + first_call = httpx.ConnectError("connection refused") + watched_subprocess.client.task_instances.succeed = mocker.Mock(side_effect=[first_call, None]) + + generator = watched_subprocess.handle_requests(log=mocker.Mock()) + next(generator) + + # First request — should produce an ErrorResponse, not crash the generator. + msg1 = SucceedTask(end_date=timezone.parse("2024-10-31T12:00:00Z")) + req1 = _RequestFrame(id=randint(1, 2**32 - 1), body=msg1.model_dump()) + generator.send(req1) + + read_socket.settimeout(0.5) + frame_len = int.from_bytes(read_socket.recv(4), "big") + bytes_ = read_socket.recv(frame_len) + frame = msgspec.msgpack.Decoder(_ResponseFrame).decode(bytes_) + + assert frame.id == req1.id + assert frame.error is not None + assert frame.error["error"] == "API_SERVER_ERROR" + assert frame.error["detail"]["exception_type"] == "ConnectError" + + # Second request — generator must still be alive and process it normally. + msg2 = SucceedTask(end_date=timezone.parse("2024-10-31T12:01:00Z")) + req2 = _RequestFrame(id=randint(1, 2**32 - 1), body=msg2.model_dump()) + # Should not raise StopIteration (which would mean the loop crashed). + generator.send(req2) + class TestSetSupervisorComms: class DummyComms:
