This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new a6ae296e10 test(amber-python): per-channel sync in 
test_main_loop_thread_can_align_ecm (#4526)
a6ae296e10 is described below

commit a6ae296e10afed49ee32ebdd75ba0aad78ff4529
Author: Yicong Huang <[email protected]>
AuthorDate: Sat Apr 25 23:51:33 2026 -0700

    test(amber-python): per-channel sync in test_main_loop_thread_can_align_ecm 
(#4526)
    
    ### What changes were proposed in this PR?
    
    Fixes the recurring CI flake in
    
`core/runnables/test_main_loop.py::TestMainLoop::test_main_loop_thread_can_align_ecm`
    and tightens the assertion to verify the actual production priority
    guarantee.
    
    **Why it flaked**: the test put a data tuple and an alignment-completing
    ECM into `input_queue` back-to-back, then assumed `output_queue.get()`
    would yield the `DataElement` first followed by the NoOperation reply.
    `output_queue` is a `LinkedBlockingMultiQueue` whose sub-queues are
    keyed by channel — control channels at priority 1, data channels at
    priority 2 (`internal_queue.py:80`). MainLoop produces in this order:
    data → NoOperation reply, but whether the test pops them in that order
    depends on whether MainLoop has finished both productions by the time
    the test calls `.get()`:
    
    - Fast machine: only data is in the queue → data first → ✅
    - Slow CI: both items queued → priority returns the control reply first
    → ❌
    
    The production semantics are intentional and correct:
    - Control to coordinator outranks data on egress (cross-channel priority
    — priority 1 vs 2 sub-queues).
    - Within a channel sub-queue, FIFO is preserved (so an ECM forwarded on
    a data channel stays behind the data tuples on that same channel).
    
    **Fix**: the test now expresses the priority semantics directly:
    
    1. Wait until both expected channels have their item in `output_queue`'s
    sub-queues — the data channel to the downstream worker, and the control
    channel back to "sender". Sub-queues are added lazily on first put, so
    the wait safely treats a missing key as size zero.
    2. With both items queued, assert the priority pop order:
    `output_queue.get()` first returns the `DCMElement` (NoOperation reply,
    control sub-queue priority 1), then the `DataElement` (data sub-queue
    priority 2).
    3. Bounded with a 5s timeout so a regression that drops one of the
    channels fails with a descriptive message instead of hanging.
    
    If a future change flips priorities or routes the NoOperation reply to a
    different channel, the first `get()` now fails fast with "expected
    control reply first".
    
    No production code change.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4524. Has been hitting unrelated PRs (#4512, #4520).
    
    ### How was this PR tested?
    
    Ran 30 consecutive iterations locally — 30 PASS, 0 FAIL.
    `ruff format --check .` and `ruff check .` clean.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.7)
---
 .../main/python/core/runnables/test_main_loop.py   | 59 +++++++++++++++++++---
 1 file changed, 51 insertions(+), 8 deletions(-)

diff --git a/amber/src/main/python/core/runnables/test_main_loop.py 
b/amber/src/main/python/core/runnables/test_main_loop.py
index 5ad0afec9b..5612e4b41a 100644
--- a/amber/src/main/python/core/runnables/test_main_loop.py
+++ b/amber/src/main/python/core/runnables/test_main_loop.py
@@ -20,6 +20,7 @@ import pandas
 import pickle
 import pyarrow
 import pytest
+import time
 from threading import Thread
 
 from core.models import (
@@ -1172,19 +1173,61 @@ class TestMainLoop:
         input_queue.put(ECMElement(tag=mock_control_input_channel, 
payload=test_ecm))
         input_queue.put(mock_binary_data_element)
         input_queue.put(ECMElement(tag=mock_data_input_channel, 
payload=test_ecm))
-        output_data_element: DataElement = output_queue.get()
+
+        # The two outputs land on different channel sub-queues:
+        #   - DataElement on the data channel to the downstream worker
+        #   - DCMElement (NoOperation reply) on the control channel back to 
"sender"
+        # output_queue is a priority multi-queue. With both items present,
+        # the control sub-queue (priority 1) outranks the data sub-queue
+        # (priority 2), so the control reply must come out first. Wait for
+        # both channels to have their item before popping, so the priority
+        # guarantee is what we're actually testing — see #4524.
+        control_reply_channel = ChannelIdentity(
+            ActorVirtualIdentity("dummy_worker_id"),
+            ActorVirtualIdentity("sender"),
+            is_control=True,
+        )
+
+        def channel_size(channel: ChannelIdentity) -> int:
+            # Sub-queues are added lazily on first put, so the channel may not
+            # exist in the LBMQ yet. Treat that as size zero.
+            if channel not in output_queue._queue.sub_queues:
+                return 0
+            return output_queue._queue.size(channel)
+
+        deadline = time.time() + 5.0
+        while channel_size(mock_data_output_channel) == 0 or (
+            channel_size(control_reply_channel) == 0
+        ):
+            if time.time() > deadline:
+                raise AssertionError(
+                    f"timed out waiting for outputs on both channels; "
+                    f"data={channel_size(mock_data_output_channel)}, "
+                    f"control={channel_size(control_reply_channel)}"
+                )
+            time.sleep(0.001)
+
+        # Priority pulls control before data when both are queued.
+        output_control_element = output_queue.get()
+        assert isinstance(output_control_element, DCMElement), (
+            f"expected control reply first (priority), got 
{type(output_control_element).__name__}"
+        )
+        assert output_control_element.tag == control_reply_channel
+        assert output_control_element.payload.return_invocation.command_id == 
98
+        assert (
+            output_control_element.payload.return_invocation.return_value
+            == ControlReturn(empty_return=EmptyReturn())
+        )
+
+        output_data_element = output_queue.get()
+        assert isinstance(output_data_element, DataElement), (
+            f"expected data element second, got 
{type(output_data_element).__name__}"
+        )
         assert output_data_element.tag == mock_data_output_channel
         assert isinstance(output_data_element.payload, DataFrame)
         data_frame: DataFrame = output_data_element.payload
-
         assert len(data_frame.frame) == 1
         assert data_frame.frame.to_pylist()[0][
             "test-1"
         ] == b"pickle    " + pickle.dumps(mock_binary_tuple["test-1"])
-        output_control_element: DCMElement = output_queue.get()
-        assert output_control_element.payload.return_invocation.command_id == 
98
-        assert (
-            output_control_element.payload.return_invocation.return_value
-            == ControlReturn(empty_return=EmptyReturn())
-        )
         reraise()

Reply via email to