This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new a6ae296e10 test(amber-python): per-channel sync in
test_main_loop_thread_can_align_ecm (#4526)
a6ae296e10 is described below
commit a6ae296e10afed49ee32ebdd75ba0aad78ff4529
Author: Yicong Huang <[email protected]>
AuthorDate: Sat Apr 25 23:51:33 2026 -0700
test(amber-python): per-channel sync in test_main_loop_thread_can_align_ecm
(#4526)
### What changes were proposed in this PR?
Fixes the recurring CI flake in
`core/runnables/test_main_loop.py::TestMainLoop::test_main_loop_thread_can_align_ecm`
and tightens the assertion to verify the actual production priority
guarantee.
**Why it flaked**: the test put a data tuple and an alignment-completing
ECM into `input_queue` back-to-back, then assumed `output_queue.get()`
would yield the `DataElement` first followed by the NoOperation reply.
`output_queue` is a `LinkedBlockingMultiQueue` whose sub-queues are
keyed by channel — control channels at priority 1, data channels at
priority 2 (`internal_queue.py:80`). MainLoop produces in this order:
data → NoOperation reply, but whether the test pops them in that order
depends on whether MainLoop has finished both productions by the time
the test calls `.get()`:
- Fast machine: only data is in the queue → data first → ✅
- Slow CI: both items queued → priority returns the control reply first
→ ❌
The production semantics are intentional and correct:
- Control to coordinator outranks data on egress (cross-channel priority
— priority 1 vs 2 sub-queues).
- Within a channel sub-queue, FIFO is preserved (so an ECM forwarded on
a data channel stays behind the data tuples on that same channel).
**Fix**: the test now expresses the priority semantics directly:
1. Wait until both expected channels have their item in `output_queue`'s
sub-queues — the data channel to the downstream worker, and the control
channel back to "sender". Sub-queues are added lazily on first put, so
the wait safely treats a missing key as size zero.
2. With both items queued, assert the priority pop order:
`output_queue.get()` first returns the `DCMElement` (NoOperation reply,
control sub-queue priority 1), then the `DataElement` (data sub-queue
priority 2).
3. Bounded with a 5s timeout so a regression that drops one of the
channels fails with a descriptive message instead of hanging.
If a future change flips priorities or routes the NoOperation reply to a
different channel, the first `get()` now fails fast with "expected
control reply first".
No production code change.
### Any related issues, documentation, discussions?
Closes #4524. Has been hitting unrelated PRs (#4512, #4520).
### How was this PR tested?
Ran 30 consecutive iterations locally — 30 PASS, 0 FAIL.
`ruff format --check .` and `ruff check .` clean.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7)
---
.../main/python/core/runnables/test_main_loop.py | 59 +++++++++++++++++++---
1 file changed, 51 insertions(+), 8 deletions(-)
diff --git a/amber/src/main/python/core/runnables/test_main_loop.py
b/amber/src/main/python/core/runnables/test_main_loop.py
index 5ad0afec9b..5612e4b41a 100644
--- a/amber/src/main/python/core/runnables/test_main_loop.py
+++ b/amber/src/main/python/core/runnables/test_main_loop.py
@@ -20,6 +20,7 @@ import pandas
import pickle
import pyarrow
import pytest
+import time
from threading import Thread
from core.models import (
@@ -1172,19 +1173,61 @@ class TestMainLoop:
input_queue.put(ECMElement(tag=mock_control_input_channel,
payload=test_ecm))
input_queue.put(mock_binary_data_element)
input_queue.put(ECMElement(tag=mock_data_input_channel,
payload=test_ecm))
- output_data_element: DataElement = output_queue.get()
+
+ # The two outputs land on different channel sub-queues:
+ # - DataElement on the data channel to the downstream worker
+ # - DCMElement (NoOperation reply) on the control channel back to
"sender"
+ # output_queue is a priority multi-queue. With both items present,
+ # the control sub-queue (priority 1) outranks the data sub-queue
+ # (priority 2), so the control reply must come out first. Wait for
+ # both channels to have their item before popping, so the priority
+ # guarantee is what we're actually testing — see #4524.
+ control_reply_channel = ChannelIdentity(
+ ActorVirtualIdentity("dummy_worker_id"),
+ ActorVirtualIdentity("sender"),
+ is_control=True,
+ )
+
+ def channel_size(channel: ChannelIdentity) -> int:
+ # Sub-queues are added lazily on first put, so the channel may not
+ # exist in the LBMQ yet. Treat that as size zero.
+ if channel not in output_queue._queue.sub_queues:
+ return 0
+ return output_queue._queue.size(channel)
+
+ deadline = time.time() + 5.0
+ while channel_size(mock_data_output_channel) == 0 or (
+ channel_size(control_reply_channel) == 0
+ ):
+ if time.time() > deadline:
+ raise AssertionError(
+ f"timed out waiting for outputs on both channels; "
+ f"data={channel_size(mock_data_output_channel)}, "
+ f"control={channel_size(control_reply_channel)}"
+ )
+ time.sleep(0.001)
+
+ # Priority pulls control before data when both are queued.
+ output_control_element = output_queue.get()
+ assert isinstance(output_control_element, DCMElement), (
+ f"expected control reply first (priority), got
{type(output_control_element).__name__}"
+ )
+ assert output_control_element.tag == control_reply_channel
+ assert output_control_element.payload.return_invocation.command_id ==
98
+ assert (
+ output_control_element.payload.return_invocation.return_value
+ == ControlReturn(empty_return=EmptyReturn())
+ )
+
+ output_data_element = output_queue.get()
+ assert isinstance(output_data_element, DataElement), (
+ f"expected data element second, got
{type(output_data_element).__name__}"
+ )
assert output_data_element.tag == mock_data_output_channel
assert isinstance(output_data_element.payload, DataFrame)
data_frame: DataFrame = output_data_element.payload
-
assert len(data_frame.frame) == 1
assert data_frame.frame.to_pylist()[0][
"test-1"
] == b"pickle " + pickle.dumps(mock_binary_tuple["test-1"])
- output_control_element: DCMElement = output_queue.get()
- assert output_control_element.payload.return_invocation.command_id ==
98
- assert (
- output_control_element.payload.return_invocation.return_value
- == ControlReturn(empty_return=EmptyReturn())
- )
reraise()