This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-fix-state-processing
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-fix-state-processing
by this push:
new 42cc355767 fix fmt
42cc355767 is described below
commit 42cc3557672686ba250f86b8054888be51faf34d
Author: Xinyuan Lin <[email protected]>
AuthorDate: Mon Apr 20 12:52:42 2026 -0700
fix fmt
---
.../main/python/core/runnables/test_main_loop.py | 61 ++++++++++++++++++++++
1 file changed, 61 insertions(+)
diff --git a/amber/src/main/python/core/runnables/test_main_loop.py
b/amber/src/main/python/core/runnables/test_main_loop.py
index 5ad0afec9b..d903d6e58d 100644
--- a/amber/src/main/python/core/runnables/test_main_loop.py
+++ b/amber/src/main/python/core/runnables/test_main_loop.py
@@ -25,6 +25,8 @@ from threading import Thread
from core.models import (
DataFrame,
InternalQueue,
+ State,
+ StateFrame,
Tuple,
)
from core.models.internal_queue import (
@@ -1035,6 +1037,65 @@ class TestMainLoop:
reraise()
+ @pytest.mark.timeout(2)
+ def test_process_state_can_emit_multiple_states(
+ self,
+ main_loop,
+ output_queue,
+ mock_data_output_channel,
+ monkeypatch,
+ ):
+ class DummyExecutor:
+ @staticmethod
+ def process_state(state: State, port: int) -> State:
+ output_state = State()
+ output_state["value"] = state["value"] + 1
+ output_state["port"] = port
+ return output_state
+
+ main_loop.context.executor_manager.executor = DummyExecutor()
+ monkeypatch.setattr(main_loop, "_check_and_process_control", lambda:
None)
+ monkeypatch.setattr(
+ main_loop.context.output_manager,
+ "emit_state",
+ lambda state: [(mock_data_output_channel.to_worker_id,
StateFrame(state))],
+ )
+
+ switch_count = {"value": 0}
+
+ def fake_switch_context():
+ switch_count["value"] += 1
+ if switch_count["value"] % 3 == 2:
+ current_input_state = (
+
main_loop.context.state_processing_manager.current_input_state
+ )
+
main_loop.context.state_processing_manager.current_output_state = (
+ DummyExecutor.process_state(current_input_state, 0)
+ )
+
+ monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context)
+
+ first_state = State()
+ first_state["value"] = 1
+ second_state = State()
+ second_state["value"] = 41
+
+ main_loop._process_state(first_state)
+ main_loop._process_state(second_state)
+
+ first_output: DataElement = output_queue.get()
+ second_output: DataElement = output_queue.get()
+
+ assert first_output.tag == mock_data_output_channel
+ assert isinstance(first_output.payload, StateFrame)
+ assert first_output.payload.frame["value"] == 2
+ assert first_output.payload.frame["port"] == 0
+
+ assert second_output.tag == mock_data_output_channel
+ assert isinstance(second_output.payload, StateFrame)
+ assert second_output.payload.frame["value"] == 42
+ assert second_output.payload.frame["port"] == 0
+
@staticmethod
def send_pause(
command_sequence,