This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 4f016f6f31 test(pyamber): add unit tests for AsyncRPCClient (#4744)
4f016f6f31 is described below
commit 4f016f6f3191cd6e530faa481e7081c7fd1afa60
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 3 01:33:15 2026 -0700
test(pyamber): add unit tests for AsyncRPCClient (#4744)
### What changes were proposed in this PR?
Adds pytest coverage for
`amber/src/main/python/core/architecture/rpc/async_rpc_client.py`. The
RPC client had no dedicated spec.
### Any related issues, documentation, discussions?
Closes #4741.
### How was this PR tested?
```
cd amber/src/main/python
ruff check core/architecture/rpc/test_async_rpc_client.py
ruff format --check core/architecture/rpc/test_async_rpc_client.py
python -m pytest core/architecture/rpc/test_async_rpc_client.py
```
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
Co-authored-by: Xinyuan Lin <[email protected]>
---
.../core/architecture/rpc/test_async_rpc_client.py | 259 +++++++++++++++++++++
1 file changed, 259 insertions(+)
diff --git
a/amber/src/main/python/core/architecture/rpc/test_async_rpc_client.py
b/amber/src/main/python/core/architecture/rpc/test_async_rpc_client.py
new file mode 100644
index 0000000000..f4b3c63e1e
--- /dev/null
+++ b/amber/src/main/python/core/architecture/rpc/test_async_rpc_client.py
@@ -0,0 +1,259 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import asyncio
+import inspect
+from concurrent.futures import Future
+from types import SimpleNamespace
+from unittest.mock import MagicMock
+
+import pytest
+
+from core.architecture.rpc import async_rpc_client as async_rpc_client_module
+from core.architecture.rpc.async_rpc_client import AsyncRPCClient, async_run
+from proto.org.apache.texera.amber.core import (
+ ActorVirtualIdentity,
+ ChannelIdentity,
+)
+from proto.org.apache.texera.amber.engine.architecture.rpc import (
+ ControllerServiceStub,
+ ControlReturn,
+ ReturnInvocation,
+)
+
+
+def _make_client():
+ """AsyncRPCClient with mock queue and a SimpleNamespace context.
+
+ The constructor only reads `context.worker_id` and calls `output_queue.put`
+ along the send path, so a duck-typed namespace + MagicMock queue is enough.
+ """
+ return AsyncRPCClient(MagicMock(), SimpleNamespace(worker_id="w0"))
+
+
+class TestAsyncRunDecorator:
+ def test_runs_coroutine_via_asyncio_run_when_no_loop(self):
+ @async_run
+ async def f():
+ return 42
+
+ # No running loop here, so the wrapper hits the RuntimeError branch
+ # and dispatches via asyncio.run.
+ assert f() == 42
+
+ def test_returns_awaitable_directly_when_called_inside_running_loop(self):
+ # Inside a running loop, the wrapper just calls the underlying function
+ # and returns the coroutine, leaving the await to the caller.
+ @async_run
+ async def f():
+ return "deep"
+
+ async def driver():
+ result = f() # Must be a coroutine
+ assert asyncio.iscoroutine(result)
+ return await result
+
+ assert asyncio.run(driver()) == "deep"
+
+
+class TestCreateFuture:
+ def test_returns_future_instance(self):
+ client = _make_client()
+ to = ActorVirtualIdentity(name="dest")
+ fut = client._create_future(to)
+ assert isinstance(fut, Future)
+
+ def
test_records_promise_at_pre_increment_sequence_and_then_increments(self):
+ client = _make_client()
+ to = ActorVirtualIdentity(name="dest")
+ # _send_sequences starts at 0 (defaultdict(int)). _create_future stores
+ # the promise at the current sequence and only THEN increments — so the
+ # very first promise lives at key (to, 0).
+ fut = client._create_future(to)
+ assert client._unfulfilled_promises[(to, 0)] is fut
+ assert client._send_sequences[to] == 1
+
+ def test_sequence_increments_per_target_independently(self):
+ client = _make_client()
+ a = ActorVirtualIdentity(name="A")
+ b = ActorVirtualIdentity(name="B")
+ client._create_future(a)
+ client._create_future(a)
+ client._create_future(b)
+ assert client._send_sequences[a] == 2
+ assert client._send_sequences[b] == 1
+ assert (a, 0) in client._unfulfilled_promises
+ assert (a, 1) in client._unfulfilled_promises
+ assert (b, 0) in client._unfulfilled_promises
+
+
+class TestFulfillPromise:
+ def _channel(self, name: str) -> ChannelIdentity:
+ # `_fulfill_promise` looks up the dict by `from_.from_worker_id`; build
+ # a ChannelIdentity whose sender slot matches the actor we promised to.
+ return ChannelIdentity(
+ from_worker_id=ActorVirtualIdentity(name=name),
+ to_worker_id=ActorVirtualIdentity(name="self"),
+ is_control=True,
+ )
+
+ def test_resolves_matching_future_and_clears_the_entry(self):
+ client = _make_client()
+ actor = ActorVirtualIdentity(name="A")
+ fut = client._create_future(actor)
+ ret = ControlReturn()
+
+ client._fulfill_promise(self._channel("A"), command_id=0,
control_return=ret)
+
+ assert fut.done() and fut.result() is ret
+ assert (actor, 0) not in client._unfulfilled_promises
+
+ def test_silently_logs_when_no_matching_promise_exists(self, monkeypatch):
+ client = _make_client()
+ # Place an unrelated pending promise so we can verify the no-match
+ # branch leaves it alone instead of silently dropping the dict entry.
+ actor_b = ActorVirtualIdentity(name="B")
+ fut_b = client._create_future(actor_b)
+ # Patch the loguru logger used inside async_rpc_client so we can
+ # assert that the no-match branch DID emit a warning. Without this
+ # the implementation could silently drop unknown ControlReturns and
+ # the suite would still pass.
+ warning_calls = []
+ monkeypatch.setattr(
+ async_rpc_client_module.logger,
+ "warning",
+ lambda msg, *a, **kw: warning_calls.append(msg),
+ )
+
+ # No prior _create_future for actor "A" — nothing to match. Method
+ # must not raise.
+ client._fulfill_promise(
+ self._channel("A"), command_id=99, control_return=ControlReturn()
+ )
+
+ assert len(warning_calls) == 1
+ assert "no corresponding ControlCommand found" in warning_calls[0]
+ # Unrelated pending promise is untouched.
+ assert not fut_b.done()
+ assert (actor_b, 0) in client._unfulfilled_promises
+
+ def test_does_not_disturb_unrelated_pending_promises(self):
+ client = _make_client()
+ actor_a = ActorVirtualIdentity(name="A")
+ actor_b = ActorVirtualIdentity(name="B")
+ fut_a = client._create_future(actor_a)
+ fut_b = client._create_future(actor_b)
+
+ client._fulfill_promise(
+ self._channel("A"), command_id=0, control_return=ControlReturn()
+ )
+
+ assert fut_a.done()
+ assert not fut_b.done()
+ assert (actor_b, 0) in client._unfulfilled_promises
+
+
+class TestReceive:
+ def test_delegates_command_id_and_return_value_to_fulfill_promise(self):
+ client = _make_client()
+ actor = ActorVirtualIdentity(name="A")
+ fut = client._create_future(actor)
+ ret = ControlReturn()
+ invocation = ReturnInvocation(command_id=0, return_value=ret)
+ from_ = ChannelIdentity(
+ from_worker_id=actor,
+ to_worker_id=ActorVirtualIdentity(name="self"),
+ is_control=True,
+ )
+
+ client.receive(from_, invocation)
+
+ assert fut.done() and fut.result() is ret
+
+
+class TestProxyStreamBlockers:
+ def test_stream_unary_blocked(self):
+ client = _make_client()
+ proxy = client.get_worker_interface("worker-X")
+ with pytest.raises(NotImplementedError, match="_stream_unary"):
+ proxy._stream_unary()
+
+ def test_unary_stream_blocked(self):
+ client = _make_client()
+ proxy = client.get_worker_interface("worker-X")
+ with pytest.raises(NotImplementedError, match="_unary_stream"):
+ proxy._unary_stream()
+
+ def test_stream_stream_blocked(self):
+ client = _make_client()
+ proxy = client.get_worker_interface("worker-X")
+ with pytest.raises(NotImplementedError, match="_stream_stream"):
+ proxy._stream_stream()
+
+
+class TestControllerStub:
+ def test_controller_stub_returns_configured_stub(self):
+ client = _make_client()
+ stub = client.controller_stub()
+ # Identity check: same instance every call (lazily configured in
__init__).
+ assert stub is client._controller_service_stub
+ assert stub is client.controller_stub()
+
+ def test_controller_stub_unary_unary_is_rewired_with_async_context(self):
+ # AsyncRPCClient.__init__ replaces the stub's `_unary_unary` with the
+ # closure produced by `_assign_context`, then `_wrap_all_async_methods`
+ # wraps that (originally async) function with `async_run`. The end
+ # state is therefore: the handler is no longer the bound method from
+ # ControllerServiceStub, but a synchronous async_run wrapper. A
+ # regression that returned an unconfigured stub would pass the identity
+ # check above, but cannot pass this one.
+ client = _make_client()
+ stub = client.controller_stub()
+ baseline = ControllerServiceStub("")
+ assert stub._unary_unary is not baseline._unary_unary
+ # The _assign_context wrapper closes over the AsyncRPCClient self, so
+ # if the rewiring really happened the function we end up with mentions
+ # `_assign_context` somewhere in its qualname (either directly, when
+ # async_run reuses the wrapped name, or via __wrapped__).
+ target = getattr(stub._unary_unary, "__wrapped__", stub._unary_unary)
+ assert "_assign_context" in target.__qualname__
+
+ def test_controller_stub_async_methods_are_wrapped_with_async_run(self):
+ # AsyncRPCClient also runs `_wrap_all_async_methods_with_async_run`,
+ # which replaces every coroutinefunction on the stub with the sync
+ # `async_run` wrapper. So whatever methods were async on a fresh
+ # `ControllerServiceStub` must now be NON-coroutine on the configured
+ # stub. Without this assertion the wrap-all pass could no-op silently.
+ client = _make_client()
+ stub = client.controller_stub()
+ baseline = ControllerServiceStub("")
+ async_method_names = [
+ name
+ for name in dir(baseline)
+ if not name.startswith("_")
+ and inspect.iscoroutinefunction(getattr(baseline, name))
+ ]
+ # Sanity: the upstream stub really does ship with async methods.
+ assert async_method_names, (
+ "ControllerServiceStub no longer has any async methods; this test "
+ "needs to be reconsidered."
+ )
+ for name in async_method_names:
+ assert not inspect.iscoroutinefunction(getattr(stub, name)), (
+ f"{name!r} on the configured stub should have been wrapped by "
+ "async_run but is still a coroutine function."
+ )