This is an automated email from the ASF dual-hosted git repository.
aicam pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new e4ec881208 fix: order shutdown after Result yield in
ProxyServer.do_action (#5076)
e4ec881208 is described below
commit e4ec881208162adfe9cd7226fef3677a9d676a6f
Author: Matthew B. <[email protected]>
AuthorDate: Fri May 15 16:33:48 2026 -0700
fix: order shutdown after Result yield in ProxyServer.do_action (#5076)
### What changes were proposed in this PR?
On Python 3.13, the
`core/proxy/test_proxy_client.py::test_client_can_shutdown_server` test
intermittently failed with `pyarrow._flight.FlightUnavailableError:
Broken pipe` because the registered `shutdown` action spawned a thread
that called `super().shutdown()` while pyarrow was still flushing the
`"Bye bye!"` reply. The handler is now a no-op, and
`ProxyServer.do_action` launches the shutdown thread only after `yield
Result(...)`, so the reply is handed to gRPC before the listener closes.
This is a causal fix, not a timing buffer, and works on every supported
Python version.
### Any related issues, documentation, or discussions?
Closes: #4650
### How was this PR tested?
Ran `pytest src/test/python/core/proxy/` on Python 3.13 (11/11 passing)
and repeated the previously flaky `test_client_can_shutdown_server` 10
consecutive times with no failures; `ruff check` and `ruff format
--check` over `amber/src/{main, test}/python` both pass, matching CI.
### Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.7 in compliance with ASF
---
amber/src/main/python/core/proxy/proxy_server.py | 10 ++++++----
amber/src/test/python/core/proxy/test_proxy_server.py | 19 +++++++++++++++++++
2 files changed, 25 insertions(+), 4 deletions(-)
diff --git a/amber/src/main/python/core/proxy/proxy_server.py
b/amber/src/main/python/core/proxy/proxy_server.py
index b3bc3afea0..0dee156bbf 100644
--- a/amber/src/main/python/core/proxy/proxy_server.py
+++ b/amber/src/main/python/core/proxy/proxy_server.py
@@ -126,12 +126,10 @@ class ProxyServer(FlightServerBase):
self.register(name="heartbeat", action=ProxyServer.ack()(lambda: None))
# register shutdown, this is the default action for the client to
- # terminate the server.
+ # terminate the server after the "Bye bye!" Result has been yielded.
self.register(
name="shutdown",
- action=ProxyServer.ack(msg="Bye bye!")(
- lambda: threading.Thread(target=self.graceful_shutdown).start()
- ),
+ action=ProxyServer.ack(msg="Bye bye!")(lambda: None),
description="Shut down this server.",
)
@@ -251,6 +249,10 @@ class ProxyServer(FlightServerBase):
else:
encoded = str(result).encode("utf-8")
yield Result(py_buffer(encoded))
+
+ # For "shutdown", tear the server down only after the Result has
been yielded
+ if action_name == "shutdown":
+ threading.Thread(target=self.graceful_shutdown).start()
else:
raise KeyError("Unknown action {!r}".format(action_name))
diff --git a/amber/src/test/python/core/proxy/test_proxy_server.py
b/amber/src/test/python/core/proxy/test_proxy_server.py
index 22aec0cc69..3055ce0c64 100644
--- a/amber/src/test/python/core/proxy/test_proxy_server.py
+++ b/amber/src/test/python/core/proxy/test_proxy_server.py
@@ -15,6 +15,9 @@
# specific language governing permissions and limitations
# under the License.
+import threading
+from unittest.mock import patch
+
import pytest
from pyarrow.flight import Action
@@ -66,3 +69,19 @@ class TestProxyServer:
assert next(
server.do_action(None, Action(name, b""))
).body.to_pybytes() == str(result).encode("utf-8")
+
+ def test_shutdown_action_yields_reply_before_starting_shutdown(self,
server):
+ shutdown_started = threading.Event()
+ with patch.object(
+ server, "graceful_shutdown", side_effect=shutdown_started.set
+ ) as mock_shutdown:
+ results = server.do_action(None, Action("shutdown", b""))
+
+ first = next(results)
+ assert first.body.to_pybytes() == b"Bye bye!"
+ assert not mock_shutdown.called
+
+ with pytest.raises(StopIteration):
+ next(results)
+ assert shutdown_started.wait(timeout=5)
+ mock_shutdown.assert_called_once()