This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-state-only
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-state-only by this 
push:
     new 2540c8a8fc test: add multiple state processing regression
2540c8a8fc is described below

commit 2540c8a8fc7c6d14da4b0f9d5eb17d88e587f0ec
Author: Xinyuan Lin <[email protected]>
AuthorDate: Mon Apr 20 13:06:31 2026 -0700

    test: add multiple state processing regression
---
 .../main/python/core/runnables/test_main_loop.py   | 59 ++++++++++++++++++++++
 1 file changed, 59 insertions(+)

diff --git a/amber/src/main/python/core/runnables/test_main_loop.py 
b/amber/src/main/python/core/runnables/test_main_loop.py
index 5ad0afec9b..3ff0367208 100644
--- a/amber/src/main/python/core/runnables/test_main_loop.py
+++ b/amber/src/main/python/core/runnables/test_main_loop.py
@@ -25,6 +25,8 @@ from threading import Thread
 from core.models import (
     DataFrame,
     InternalQueue,
+    State,
+    StateFrame,
     Tuple,
 )
 from core.models.internal_queue import (
@@ -1077,6 +1079,63 @@ class TestMainLoop:
             ),
         )
 
+    @pytest.mark.timeout(2)
+    def test_process_state_can_emit_multiple_states(
+        self,
+        main_loop,
+        output_queue,
+        mock_data_output_channel,
+        monkeypatch,
+    ):
+        class DummyExecutor:
+            @staticmethod
+            def process_state(state: State, port: int) -> State:
+                return {"value": state["value"] + 1, "port": port}
+
+        main_loop.context.executor_manager.executor = DummyExecutor()
+        monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: 
None)
+        monkeypatch.setattr(
+            main_loop.context.output_manager,
+            "emit_state",
+            lambda state: [(mock_data_output_channel.to_worker_id, 
StateFrame(state))],
+        )
+
+        switch_count = {"value": 0}
+
+        def fake_switch_context():
+            switch_count["value"] += 1
+            # xinyuan-state-only still uses the original two-switch state 
handshake:
+            # the DataProcessor produces output during the first switch of each
+            # process_input_state() call, before MainLoop reads 
current_output_state.
+            if switch_count["value"] % 2 == 1:
+                current_input_state = (
+                    
main_loop.context.state_processing_manager.current_input_state
+                )
+                
main_loop.context.state_processing_manager.current_output_state = (
+                    DummyExecutor.process_state(current_input_state, 0)
+                )
+
+        monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context)
+
+        first_state = {"value": 1}
+        second_state = {"value": 41}
+
+        main_loop._process_state(first_state)
+        main_loop._process_state(second_state)
+
+        first_output: DataElement = output_queue.get()
+        second_output: DataElement = output_queue.get()
+
+        assert first_output.tag == mock_data_output_channel
+        assert isinstance(first_output.payload, StateFrame)
+        assert first_output.payload.frame["value"] == 2
+        assert first_output.payload.frame["port"] == 0
+
+        assert second_output.tag == mock_data_output_channel
+        assert isinstance(second_output.payload, StateFrame)
+        assert second_output.payload.frame["value"] == 42
+        assert second_output.payload.frame["port"] == 0
+
     @pytest.mark.timeout(5)
     def test_main_loop_thread_can_align_ecm(
         self,

Reply via email to