This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch release/v1.1.0-incubating
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/release/v1.1.0-incubating by
this push:
new 58c07e50bf fix(amber): defer ProxyServer shutdown until response is
yielded
58c07e50bf is described below
commit 58c07e50bfe7e36112fdab52552671968dab77eb
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 3 14:39:27 2026 -0700
fix(amber): defer ProxyServer shutdown until response is yielded
graceful_shutdown was being dispatched from a background thread
inside the registered shutdown action's lambda — i.e. before
do_action's `yield Result(py_buffer("Bye bye!"))` ran. On a fast
runner the thread reached super().shutdown() and tore the gRPC
listener down before the "Bye bye!" Result flushed to the client,
which surfaced as
pyarrow._flight.FlightUnavailableError:
Flight returned unavailable error, with message: Broken pipe
flaking core/proxy/test_proxy_client.py::test_client_can_shutdown_server
on the python (3.12 / 3.13) matrix.
Move the spawn to do_action, *after* the yield. The action's body
becomes a no-op lambda; do_action checks for action_name == "shutdown"
post-yield and starts graceful_shutdown on a daemon thread there.
The Result is handed back to gRPC before the listener teardown
begins, so the response always reaches the client. graceful_shutdown
itself goes back to a plain shutdown call (its old "after a delay"
docstring was aspirational — it never actually slept).
Verified by stress-running the test 8x locally; full proxy suite
(11 tests) green.
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
amber/src/main/python/core/proxy/proxy_server.py | 20 +++++++++++++++-----
1 file changed, 15 insertions(+), 5 deletions(-)
diff --git a/amber/src/main/python/core/proxy/proxy_server.py
b/amber/src/main/python/core/proxy/proxy_server.py
index b3bc3afea0..ebcdb46216 100644
--- a/amber/src/main/python/core/proxy/proxy_server.py
+++ b/amber/src/main/python/core/proxy/proxy_server.py
@@ -126,12 +126,13 @@ class ProxyServer(FlightServerBase):
self.register(name="heartbeat", action=ProxyServer.ack()(lambda: None))
# register shutdown, this is the default action for the client to
- # terminate the server.
+ # terminate the server. The actual shutdown is dispatched from
+ # do_action *after* the "Bye bye!" Result is yielded, so the
+ # response has been written to the gRPC stream before the
+ # listener is torn down. The action body itself is a no-op.
self.register(
name="shutdown",
- action=ProxyServer.ack(msg="Bye bye!")(
- lambda: threading.Thread(target=self.graceful_shutdown).start()
- ),
+ action=ProxyServer.ack(msg="Bye bye!")(lambda: None),
description="Shut down this server.",
)
@@ -251,6 +252,16 @@ class ProxyServer(FlightServerBase):
else:
encoded = str(result).encode("utf-8")
yield Result(py_buffer(encoded))
+
+ # Special-case shutdown: tear the listener down only after
+ # the Result above has been handed back to the gRPC stream.
+ # Spawning the shutdown thread before the yield (the old
+ # design) raced with the response flush and surfaced as
+ # `FlightUnavailableError: Broken pipe` on the client.
+ if action_name == "shutdown":
+ threading.Thread(
+ target=self.graceful_shutdown, daemon=True
+ ).start()
else:
raise KeyError("Unknown action {!r}".format(action_name))
@@ -310,7 +321,6 @@ class ProxyServer(FlightServerBase):
# helper methods #
##################
def graceful_shutdown(self):
- """Shut down after a delay."""
logger.debug("Server is shutting down...")
super().shutdown()
logger.debug("Server is shutdown.")