This is an automated email from the ASF dual-hosted git repository. aglinxinyuan pushed a commit to branch revert-4424-xinyuan-fix-state-processing in repository https://gitbox.apache.org/repos/asf/texera.git
commit 72849a7eab8e83a7c7c8f21453873d6b699e33a7 Author: Xinyuan Lin <[email protected]> AuthorDate: Tue Apr 28 17:19:11 2026 -0700 Revert "fix: add missing context switches for repeated state processing (#4424)" This reverts commit ef66190f22f256ab85ea9bfe0b67f0de960e2d9b. --- .../main/python/core/runnables/data_processor.py | 1 - amber/src/main/python/core/runnables/main_loop.py | 1 - .../main/python/core/runnables/test_main_loop.py | 61 ---------------------- 3 files changed, 63 deletions(-) diff --git a/amber/src/main/python/core/runnables/data_processor.py b/amber/src/main/python/core/runnables/data_processor.py index 815e85a644..4399b1a3a2 100644 --- a/amber/src/main/python/core/runnables/data_processor.py +++ b/amber/src/main/python/core/runnables/data_processor.py @@ -100,7 +100,6 @@ class DataProcessor(Runnable, Stoppable): self._context.worker_id, self._context.console_message_manager.print_buf, ): - self._switch_context() self._set_output_state(executor.process_state(state, port_id)) except Exception as err: diff --git a/amber/src/main/python/core/runnables/main_loop.py b/amber/src/main/python/core/runnables/main_loop.py index 9356542a08..cde2847206 100644 --- a/amber/src/main/python/core/runnables/main_loop.py +++ b/amber/src/main/python/core/runnables/main_loop.py @@ -247,7 +247,6 @@ class MainLoop(StoppableQueueBlockingRunnable): def _process_state(self, state_: State) -> None: self.context.state_processing_manager.current_input_state = state_ - self._switch_context() self.process_input_state() self._check_and_process_control() diff --git a/amber/src/main/python/core/runnables/test_main_loop.py b/amber/src/main/python/core/runnables/test_main_loop.py index e6136b0420..5612e4b41a 100644 --- a/amber/src/main/python/core/runnables/test_main_loop.py +++ b/amber/src/main/python/core/runnables/test_main_loop.py @@ -26,8 +26,6 @@ from threading import Thread from core.models import ( DataFrame, InternalQueue, - State, - StateFrame, Tuple, ) from core.models.internal_queue import ( @@ -1038,65 +1036,6 @@ class TestMainLoop: reraise() - @pytest.mark.timeout(2) - def test_process_state_can_emit_multiple_states( - self, - main_loop, - output_queue, - mock_data_output_channel, - monkeypatch, - ): - class DummyExecutor: - @staticmethod - def process_state(state: State, port: int) -> State: - output_state = State() - output_state["value"] = state["value"] + 1 - output_state["port"] = port - return output_state - - main_loop.context.executor_manager.executor = DummyExecutor() - monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: None) - monkeypatch.setattr( - main_loop.context.output_manager, - "emit_state", - lambda state: [(mock_data_output_channel.to_worker_id, StateFrame(state))], - ) - - switch_count = {"value": 0} - - def fake_switch_context(): - switch_count["value"] += 1 - if switch_count["value"] % 3 == 2: - current_input_state = ( - main_loop.context.state_processing_manager.current_input_state - ) - main_loop.context.state_processing_manager.current_output_state = ( - DummyExecutor.process_state(current_input_state, 0) - ) - - monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context) - - first_state = State() - first_state["value"] = 1 - second_state = State() - second_state["value"] = 41 - - main_loop._process_state(first_state) - main_loop._process_state(second_state) - - first_output: DataElement = output_queue.get() - second_output: DataElement = output_queue.get() - - assert first_output.tag == mock_data_output_channel - assert isinstance(first_output.payload, StateFrame) - assert first_output.payload.frame["value"] == 2 - assert first_output.payload.frame["port"] == 0 - - assert second_output.tag == mock_data_output_channel - assert isinstance(second_output.payload, StateFrame) - assert second_output.payload.frame["value"] == 42 - assert second_output.payload.frame["port"] == 0 - @staticmethod def send_pause( command_sequence,
