This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e5d79945ad Don't crash supervisor IPC loop on transient network 
errors (#66572)
1e5d79945ad is described below

commit 1e5d79945ad5df1fca8f6d06c8f2cde8124a981a
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue May 19 13:21:18 2026 +0200

    Don't crash supervisor IPC loop on transient network errors (#66572)
    
    * Don't crash supervisor IPC loop on transient network errors
    
    handle_requests in the supervisor only caught ServerResponseError. Any
    non-HTTP exception (httpx.ConnectError, httpx.TimeoutException, socket
    timeouts, etc.) would propagate, terminate the generator, and
    permanently break the supervisor-to-task IPC channel. The task
    subprocess would then get EOFError on every subsequent send, and the
    worker would be stuck waiting for replies that never come.
    
    Add a catch-all except Exception after the ServerResponseError handler
    that logs the unhandled exception with type info, sends a best-effort
    ErrorResponse(API_SERVER_ERROR, ...) back to the task so the failure
    surfaces in task logs (wrapped in suppress(Exception) because if we
    can't reach the task subprocess via stdin we shouldn't double-fault),
    and lets the request loop continue to the next request.
    
    Test added: a fake httpx.ConnectError on the first call produces an
    ErrorResponse, the generator stays alive, and a second request is
    processed normally (the loop is not dead).
    
    Reported by the L3 ASVS sweep at apache/tooling-agents#24 (FINDING-005).
    
    * Address review comments: shorten comment and use exc_info
    
    - Shorten the catch-all comment per amoghrajesh's suggestion.
    - Use exc_info=e in log.exception instead of exception_type field
      per jason810496's suggestion (exception type is redundant since
      the exception itself is logged with full type info and traceback).
---
 .../src/airflow/sdk/execution_time/supervisor.py   | 22 ++++++++++++
 .../task_sdk/execution_time/test_supervisor.py     | 40 ++++++++++++++++++++++
 2 files changed, 62 insertions(+)

diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 27dcbb44346..7c9ecbeab9e 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -793,6 +793,28 @@ class WatchedSubprocess:
                     ),
                     request_id=request.id,
                 )
+            except Exception as e:
+                # Generic exception handling so a transient network error 
(httpx.ConnectError /
+                # httpx.TimeoutException) or any other exception
+                # doesn't crash this generator and crash the IPC communication 
between supervisor and task.
+                log.exception(
+                    "Unhandled exception while handling task request",
+                    request_id=request.id,
+                    exc_info=e,
+                )
+                with suppress(Exception):
+                    self.send_msg(
+                        msg=None,
+                        error=ErrorResponse(
+                            error=ErrorType.API_SERVER_ERROR,
+                            detail={
+                                "status_code": None,
+                                "message": str(e),
+                                "exception_type": type(e).__name__,
+                            },
+                        ),
+                        request_id=request.id,
+                    )
             finally:
                 if token is not None:
                     otel_context.detach(token)
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 881f5ddaad1..0b3cd64a21e 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -3039,6 +3039,46 @@ class TestHandleRequest:
             "detail": error.response.json(),
         }
 
+    def test_handle_requests_network_exception_does_not_crash_loop(self, 
watched_subprocess, mocker):
+        """A transient network error must not crash the IPC generator.
+
+        Without the catch-all in handle_requests, an httpx.ConnectError would
+        propagate, the generator would terminate, the task subprocess would
+        get EOFError on every subsequent send, and the worker would be stuck.
+        Verify that the error is reported back to the task as an
+        API_SERVER_ERROR ErrorResponse and that the loop stays alive for the
+        next request.
+        """
+        watched_subprocess, read_socket = watched_subprocess
+
+        # First request raises a network exception, second succeeds.
+        first_call = httpx.ConnectError("connection refused")
+        watched_subprocess.client.task_instances.succeed = 
mocker.Mock(side_effect=[first_call, None])
+
+        generator = watched_subprocess.handle_requests(log=mocker.Mock())
+        next(generator)
+
+        # First request — should produce an ErrorResponse, not crash the 
generator.
+        msg1 = SucceedTask(end_date=timezone.parse("2024-10-31T12:00:00Z"))
+        req1 = _RequestFrame(id=randint(1, 2**32 - 1), body=msg1.model_dump())
+        generator.send(req1)
+
+        read_socket.settimeout(0.5)
+        frame_len = int.from_bytes(read_socket.recv(4), "big")
+        bytes_ = read_socket.recv(frame_len)
+        frame = msgspec.msgpack.Decoder(_ResponseFrame).decode(bytes_)
+
+        assert frame.id == req1.id
+        assert frame.error is not None
+        assert frame.error["error"] == "API_SERVER_ERROR"
+        assert frame.error["detail"]["exception_type"] == "ConnectError"
+
+        # Second request — generator must still be alive and process it 
normally.
+        msg2 = SucceedTask(end_date=timezone.parse("2024-10-31T12:01:00Z"))
+        req2 = _RequestFrame(id=randint(1, 2**32 - 1), body=msg2.model_dump())
+        # Should not raise StopIteration (which would mean the loop crashed).
+        generator.send(req2)
+
 
 class TestSetSupervisorComms:
     class DummyComms:

Reply via email to