This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch release/v1.1.0-incubating
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/release/v1.1.0-incubating by 
this push:
     new 58c07e50bf fix(amber): defer ProxyServer shutdown until response is 
yielded
58c07e50bf is described below

commit 58c07e50bfe7e36112fdab52552671968dab77eb
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 3 14:39:27 2026 -0700

    fix(amber): defer ProxyServer shutdown until response is yielded
    
    graceful_shutdown was being dispatched from a background thread
    inside the registered shutdown action's lambda — i.e. before
    do_action's `yield Result(py_buffer("Bye bye!"))` ran. On a fast
    runner the thread reached super().shutdown() and tore the gRPC
    listener down before the "Bye bye!" Result flushed to the client,
    which surfaced as
    
      pyarrow._flight.FlightUnavailableError:
        Flight returned unavailable error, with message: Broken pipe
    
    flaking core/proxy/test_proxy_client.py::test_client_can_shutdown_server
    on the python (3.12 / 3.13) matrix.
    
    Move the spawn to do_action, *after* the yield. The action's body
    becomes a no-op lambda; do_action checks for action_name == "shutdown"
    post-yield and starts graceful_shutdown on a daemon thread there.
    The Result is handed back to gRPC before the listener teardown
    begins, so the response always reaches the client. graceful_shutdown
    itself goes back to a plain shutdown call (its old "after a delay"
    docstring was aspirational — it never actually slept).
    
    Verified by stress-running the test 8x locally; full proxy suite
    (11 tests) green.
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 amber/src/main/python/core/proxy/proxy_server.py | 20 +++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)

diff --git a/amber/src/main/python/core/proxy/proxy_server.py 
b/amber/src/main/python/core/proxy/proxy_server.py
index b3bc3afea0..ebcdb46216 100644
--- a/amber/src/main/python/core/proxy/proxy_server.py
+++ b/amber/src/main/python/core/proxy/proxy_server.py
@@ -126,12 +126,13 @@ class ProxyServer(FlightServerBase):
         self.register(name="heartbeat", action=ProxyServer.ack()(lambda: None))
 
         # register shutdown, this is the default action for the client to
-        # terminate the server.
+        # terminate the server. The actual shutdown is dispatched from
+        # do_action *after* the "Bye bye!" Result is yielded, so the
+        # response has been written to the gRPC stream before the
+        # listener is torn down. The action body itself is a no-op.
         self.register(
             name="shutdown",
-            action=ProxyServer.ack(msg="Bye bye!")(
-                lambda: threading.Thread(target=self.graceful_shutdown).start()
-            ),
+            action=ProxyServer.ack(msg="Bye bye!")(lambda: None),
             description="Shut down this server.",
         )
 
@@ -251,6 +252,16 @@ class ProxyServer(FlightServerBase):
             else:
                 encoded = str(result).encode("utf-8")
             yield Result(py_buffer(encoded))
+
+            # Special-case shutdown: tear the listener down only after
+            # the Result above has been handed back to the gRPC stream.
+            # Spawning the shutdown thread before the yield (the old
+            # design) raced with the response flush and surfaced as
+            # `FlightUnavailableError: Broken pipe` on the client.
+            if action_name == "shutdown":
+                threading.Thread(
+                    target=self.graceful_shutdown, daemon=True
+                ).start()
         else:
             raise KeyError("Unknown action {!r}".format(action_name))
 
@@ -310,7 +321,6 @@ class ProxyServer(FlightServerBase):
     # helper methods #
     ##################
     def graceful_shutdown(self):
-        """Shut down after a delay."""
         logger.debug("Server is shutting down...")
         super().shutdown()
         logger.debug("Server is shutdown.")

Reply via email to