This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-fix-state-processing
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-fix-state-processing 
by this push:
     new 42cc355767 fix fmt
42cc355767 is described below

commit 42cc3557672686ba250f86b8054888be51faf34d
Author: Xinyuan Lin <[email protected]>
AuthorDate: Mon Apr 20 12:52:42 2026 -0700

    fix fmt
---
 .../main/python/core/runnables/test_main_loop.py   | 61 ++++++++++++++++++++++
 1 file changed, 61 insertions(+)

diff --git a/amber/src/main/python/core/runnables/test_main_loop.py 
b/amber/src/main/python/core/runnables/test_main_loop.py
index 5ad0afec9b..d903d6e58d 100644
--- a/amber/src/main/python/core/runnables/test_main_loop.py
+++ b/amber/src/main/python/core/runnables/test_main_loop.py
@@ -25,6 +25,8 @@ from threading import Thread
 from core.models import (
     DataFrame,
     InternalQueue,
+    State,
+    StateFrame,
     Tuple,
 )
 from core.models.internal_queue import (
@@ -1035,6 +1037,65 @@ class TestMainLoop:
 
         reraise()
 
+    @pytest.mark.timeout(2)
+    def test_process_state_can_emit_multiple_states(
+        self,
+        main_loop,
+        output_queue,
+        mock_data_output_channel,
+        monkeypatch,
+    ):
+        class DummyExecutor:
+            @staticmethod
+            def process_state(state: State, port: int) -> State:
+                output_state = State()
+                output_state["value"] = state["value"] + 1
+                output_state["port"] = port
+                return output_state
+
+        main_loop.context.executor_manager.executor = DummyExecutor()
+        monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: 
None)
+        monkeypatch.setattr(
+            main_loop.context.output_manager,
+            "emit_state",
+            lambda state: [(mock_data_output_channel.to_worker_id, 
StateFrame(state))],
+        )
+
+        switch_count = {"value": 0}
+
+        def fake_switch_context():
+            switch_count["value"] += 1
+            if switch_count["value"] % 3 == 2:
+                current_input_state = (
+                    
main_loop.context.state_processing_manager.current_input_state
+                )
+                
main_loop.context.state_processing_manager.current_output_state = (
+                    DummyExecutor.process_state(current_input_state, 0)
+                )
+
+        monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context)
+
+        first_state = State()
+        first_state["value"] = 1
+        second_state = State()
+        second_state["value"] = 41
+
+        main_loop._process_state(first_state)
+        main_loop._process_state(second_state)
+
+        first_output: DataElement = output_queue.get()
+        second_output: DataElement = output_queue.get()
+
+        assert first_output.tag == mock_data_output_channel
+        assert isinstance(first_output.payload, StateFrame)
+        assert first_output.payload.frame["value"] == 2
+        assert first_output.payload.frame["port"] == 0
+
+        assert second_output.tag == mock_data_output_channel
+        assert isinstance(second_output.payload, StateFrame)
+        assert second_output.payload.frame["value"] == 42
+        assert second_output.payload.frame["port"] == 0
+
     @staticmethod
     def send_pause(
         command_sequence,

Reply via email to