This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new ae68fc992a fix(revert): "fix: add missing context switches for 
repeated state processing" (#4552)
ae68fc992a is described below

commit ae68fc992a77a1ec8003c205a4e49396756f3cec
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Apr 28 18:11:06 2026 -0700

    fix(revert): "fix: add missing context switches for repeated state 
processing" (#4552)
    
    Reverts apache/texera#4424 since it might change the lifecycle of the
    Python engine.
    
    Might be related to issue #4545
    
    We don't know if #4424 really causes the issue, but reverting it for
    testing purposes.
---
 .../main/python/core/runnables/data_processor.py   |  1 -
 amber/src/main/python/core/runnables/main_loop.py  |  1 -
 .../main/python/core/runnables/test_main_loop.py   | 61 ----------------------
 3 files changed, 63 deletions(-)

diff --git a/amber/src/main/python/core/runnables/data_processor.py 
b/amber/src/main/python/core/runnables/data_processor.py
index 815e85a644..4399b1a3a2 100644
--- a/amber/src/main/python/core/runnables/data_processor.py
+++ b/amber/src/main/python/core/runnables/data_processor.py
@@ -100,7 +100,6 @@ class DataProcessor(Runnable, Stoppable):
                 self._context.worker_id,
                 self._context.console_message_manager.print_buf,
             ):
-                self._switch_context()
                 self._set_output_state(executor.process_state(state, port_id))
 
         except Exception as err:
diff --git a/amber/src/main/python/core/runnables/main_loop.py 
b/amber/src/main/python/core/runnables/main_loop.py
index 9356542a08..cde2847206 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -247,7 +247,6 @@ class MainLoop(StoppableQueueBlockingRunnable):
 
     def _process_state(self, state_: State) -> None:
         self.context.state_processing_manager.current_input_state = state_
-        self._switch_context()
         self.process_input_state()
         self._check_and_process_control()
 
diff --git a/amber/src/main/python/core/runnables/test_main_loop.py 
b/amber/src/main/python/core/runnables/test_main_loop.py
index e6136b0420..5612e4b41a 100644
--- a/amber/src/main/python/core/runnables/test_main_loop.py
+++ b/amber/src/main/python/core/runnables/test_main_loop.py
@@ -26,8 +26,6 @@ from threading import Thread
 from core.models import (
     DataFrame,
     InternalQueue,
-    State,
-    StateFrame,
     Tuple,
 )
 from core.models.internal_queue import (
@@ -1038,65 +1036,6 @@ class TestMainLoop:
 
         reraise()
 
-    @pytest.mark.timeout(2)
-    def test_process_state_can_emit_multiple_states(
-        self,
-        main_loop,
-        output_queue,
-        mock_data_output_channel,
-        monkeypatch,
-    ):
-        class DummyExecutor:
-            @staticmethod
-            def process_state(state: State, port: int) -> State:
-                output_state = State()
-                output_state["value"] = state["value"] + 1
-                output_state["port"] = port
-                return output_state
-
-        main_loop.context.executor_manager.executor = DummyExecutor()
-        monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: 
None)
-        monkeypatch.setattr(
-            main_loop.context.output_manager,
-            "emit_state",
-            lambda state: [(mock_data_output_channel.to_worker_id, 
StateFrame(state))],
-        )
-
-        switch_count = {"value": 0}
-
-        def fake_switch_context():
-            switch_count["value"] += 1
-            if switch_count["value"] % 3 == 2:
-                current_input_state = (
-                    
main_loop.context.state_processing_manager.current_input_state
-                )
-                
main_loop.context.state_processing_manager.current_output_state = (
-                    DummyExecutor.process_state(current_input_state, 0)
-                )
-
-        monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context)
-
-        first_state = State()
-        first_state["value"] = 1
-        second_state = State()
-        second_state["value"] = 41
-
-        main_loop._process_state(first_state)
-        main_loop._process_state(second_state)
-
-        first_output: DataElement = output_queue.get()
-        second_output: DataElement = output_queue.get()
-
-        assert first_output.tag == mock_data_output_channel
-        assert isinstance(first_output.payload, StateFrame)
-        assert first_output.payload.frame["value"] == 2
-        assert first_output.payload.frame["port"] == 0
-
-        assert second_output.tag == mock_data_output_channel
-        assert isinstance(second_output.payload, StateFrame)
-        assert second_output.payload.frame["value"] == 42
-        assert second_output.payload.frame["port"] == 0
-
     @staticmethod
     def send_pause(
         command_sequence,

Reply via email to