This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-state-only
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-state-only by this
push:
new 2540c8a8fc test: add multiple state processing regression
2540c8a8fc is described below
commit 2540c8a8fc7c6d14da4b0f9d5eb17d88e587f0ec
Author: Xinyuan Lin <[email protected]>
AuthorDate: Mon Apr 20 13:06:31 2026 -0700
test: add multiple state processing regression
---
.../main/python/core/runnables/test_main_loop.py | 59 ++++++++++++++++++++++
1 file changed, 59 insertions(+)
diff --git a/amber/src/main/python/core/runnables/test_main_loop.py
b/amber/src/main/python/core/runnables/test_main_loop.py
index 5ad0afec9b..3ff0367208 100644
--- a/amber/src/main/python/core/runnables/test_main_loop.py
+++ b/amber/src/main/python/core/runnables/test_main_loop.py
@@ -25,6 +25,8 @@ from threading import Thread
from core.models import (
DataFrame,
InternalQueue,
+ State,
+ StateFrame,
Tuple,
)
from core.models.internal_queue import (
@@ -1077,6 +1079,63 @@ class TestMainLoop:
),
)
+ @pytest.mark.timeout(2)
+ def test_process_state_can_emit_multiple_states(
+ self,
+ main_loop,
+ output_queue,
+ mock_data_output_channel,
+ monkeypatch,
+ ):
+ class DummyExecutor:
+ @staticmethod
+ def process_state(state: State, port: int) -> State:
+ return {"value": state["value"] + 1, "port": port}
+
+ main_loop.context.executor_manager.executor = DummyExecutor()
+ monkeypatch.setattr(main_loop, "_check_and_process_control", lambda:
None)
+ monkeypatch.setattr(
+ main_loop.context.output_manager,
+ "emit_state",
+ lambda state: [(mock_data_output_channel.to_worker_id,
StateFrame(state))],
+ )
+
+ switch_count = {"value": 0}
+
+ def fake_switch_context():
+ switch_count["value"] += 1
+ # xinyuan-state-only still uses the original two-switch state
handshake:
+ # the DataProcessor produces output during the first switch of each
+ # process_input_state() call, before MainLoop reads
current_output_state.
+ if switch_count["value"] % 2 == 1:
+ current_input_state = (
+
main_loop.context.state_processing_manager.current_input_state
+ )
+
main_loop.context.state_processing_manager.current_output_state = (
+ DummyExecutor.process_state(current_input_state, 0)
+ )
+
+ monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context)
+
+ first_state = {"value": 1}
+ second_state = {"value": 41}
+
+ main_loop._process_state(first_state)
+ main_loop._process_state(second_state)
+
+ first_output: DataElement = output_queue.get()
+ second_output: DataElement = output_queue.get()
+
+ assert first_output.tag == mock_data_output_channel
+ assert isinstance(first_output.payload, StateFrame)
+ assert first_output.payload.frame["value"] == 2
+ assert first_output.payload.frame["port"] == 0
+
+ assert second_output.tag == mock_data_output_channel
+ assert isinstance(second_output.payload, StateFrame)
+ assert second_output.payload.frame["value"] == 42
+ assert second_output.payload.frame["port"] == 0
+
@pytest.mark.timeout(5)
def test_main_loop_thread_can_align_ecm(
self,