This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
     new 62d285443c update
62d285443c is described below

commit 62d285443ccde08478fe426a7636f1c356d38165
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed Apr 29 16:19:23 2026 -0700

    update
---
 .../main/python/core/runnables/data_processor.py   |  26 +--
 amber/src/main/python/core/runnables/main_loop.py  |   8 +-
 .../main/python/core/runnables/test_main_loop.py   | 175 +++++++++++++++++++--
 3 files changed, 177 insertions(+), 32 deletions(-)

diff --git a/amber/src/main/python/core/runnables/data_processor.py 
b/amber/src/main/python/core/runnables/data_processor.py
index 815e85a644..776aa35b87 100644
--- a/amber/src/main/python/core/runnables/data_processor.py
+++ b/amber/src/main/python/core/runnables/data_processor.py
@@ -49,16 +49,14 @@ class DataProcessor(Runnable, Stoppable):
         with self._context.tuple_processing_manager.context_switch_condition:
             
self._context.tuple_processing_manager.context_switch_condition.wait()
         self._running.set()
-        self._switch_context()
         while self._running.is_set():
-            marker = 
self._context.tuple_processing_manager.get_internal_marker()
-            state = self._context.state_processing_manager.get_input_state()
-            tuple_ = self._context.tuple_processing_manager.current_input_tuple
-            if marker is not None:
-                self.process_internal_marker(marker)
-            elif state is not None:
-                self.process_state(state)
-            elif tuple_ is not None:
+            tpm = self._context.tuple_processing_manager
+            spm = self._context.state_processing_manager
+            if tpm.current_internal_marker is not None:
+                self.process_internal_marker(tpm.get_internal_marker())
+            elif spm.current_input_state is not None:
+                self.process_state(spm.get_input_state())
+            elif tpm.current_input_tuple is not None:
                 self.process_tuple()
             else:
                 raise RuntimeError("No marker or tuple to process.")
@@ -85,9 +83,6 @@ class DataProcessor(Runnable, Stoppable):
             self._context.exception_manager.set_exception_info(exc_info)
             self._report_exception(exc_info)
 
-        finally:
-            self._switch_context()
-
     def process_state(self, state: State) -> None:
         """
         Process an input marker by invoking appropriate state
@@ -100,7 +95,6 @@ class DataProcessor(Runnable, Stoppable):
                 self._context.worker_id,
                 self._context.console_message_manager.print_buf,
             ):
-                self._switch_context()
                 self._set_output_state(executor.process_state(state, port_id))
 
         except Exception as err:
@@ -109,9 +103,6 @@ class DataProcessor(Runnable, Stoppable):
             self._context.exception_manager.set_exception_info(exc_info)
             self._report_exception(exc_info)
 
-        finally:
-            self._switch_context()
-
     def process_tuple(self) -> None:
         """
         Process an input tuple by invoking the executor's tuple processing 
method.
@@ -134,9 +125,6 @@ class DataProcessor(Runnable, Stoppable):
                 self._context.exception_manager.set_exception_info(exc_info)
                 self._report_exception(exc_info)
 
-            finally:
-                self._switch_context()
-
     def _set_output_tuple(self, output_iterator: 
Iterator[Optional[TupleLike]]) -> None:
         """
         Set the output tuple after processing by the executor.
diff --git a/amber/src/main/python/core/runnables/main_loop.py 
b/amber/src/main/python/core/runnables/main_loop.py
index d42aedc7e4..8454808b05 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -225,9 +225,13 @@ class MainLoop(StoppableQueueBlockingRunnable):
                 )
 
     def process_input_state(self) -> None:
+        # Single switch handshake: DataProc parks at the run-loop's
+        # end-of-body switch (line 65) between tasks, so one switch from
+        # MainLoop drives a full pick-up -> executor -> output -> park-back
+        # cycle. By the time the switch returns, current_output_state holds
+        # the freshly produced output.
         self._switch_context()
         output_state = self.context.state_processing_manager.get_output_state()
-        self._switch_context()
         if output_state is not None:
             if isinstance(self.context.executor_manager.executor, 
LoopEndOperator):
                 self.context.output_manager.reset_output_storage()
@@ -287,7 +291,6 @@ class MainLoop(StoppableQueueBlockingRunnable):
 
     def _process_state(self, state_: State) -> None:
         self.context.state_processing_manager.current_input_state = state_
-        self._switch_context()
         self.process_input_state()
         self._check_and_process_control()
 
@@ -376,7 +379,6 @@ class MainLoop(StoppableQueueBlockingRunnable):
 
             if ecm.ecm_type != EmbeddedControlMessageType.NO_ALIGNMENT:
                 self.context.pause_manager.resume(PauseType.ECM_PAUSE)
-            self._switch_context()
             if self.context.tuple_processing_manager.current_internal_marker:
                 {
                     StartChannel: self._process_start_channel,
diff --git a/amber/src/main/python/core/runnables/test_main_loop.py 
b/amber/src/main/python/core/runnables/test_main_loop.py
index 62065e1b8c..d99049a9a3 100644
--- a/amber/src/main/python/core/runnables/test_main_loop.py
+++ b/amber/src/main/python/core/runnables/test_main_loop.py
@@ -361,6 +361,16 @@ class TestMainLoop:
         )
         return DCMElement(tag=mock_control_input_channel, payload=payload)
 
+    @pytest.fixture
+    def mock_state_data_elements(self, mock_data_input_channel):
+        return [
+            DataElement(
+                tag=mock_data_input_channel,
+                payload=StateFrame(frame={"value": value}),
+            )
+            for value in (1, 2, 3, 4)
+        ]
+
     @pytest.fixture
     def mock_initialize_batch_count_executor(
         self,
@@ -1101,17 +1111,15 @@ class TestMainLoop:
             lambda state: [(mock_data_output_channel.to_worker_id, 
StateFrame(state))],
         )
 
-        switch_count = {"value": 0}
-
         def fake_switch_context():
-            switch_count["value"] += 1
-            # xinyuan-state-only still uses the original two-switch state 
handshake:
-            # the DataProcessor produces output during the first switch of each
-            # process_input_state() call, before MainLoop reads 
current_output_state.
-            if switch_count["value"] % 2 == 1:
-                current_input_state = (
-                    
main_loop.context.state_processing_manager.current_input_state
-                )
+            # process_input_state now uses a single switch per call, mirroring
+            # the per-iteration switch in process_tuple_with_udf. Each switch
+            # simulates DataProc consuming the queued input state and writing
+            # current_output_state.
+            current_input_state = (
+                main_loop.context.state_processing_manager.current_input_state
+            )
+            if current_input_state is not None:
                 
main_loop.context.state_processing_manager.current_output_state = (
                     DummyExecutor.process_state(current_input_state, 0)
                 )
@@ -1290,3 +1298,150 @@ class TestMainLoop:
             "test-1"
         ] == b"pickle    " + pickle.dumps(mock_binary_tuple["test-1"])
         reraise()
+
+    @pytest.mark.timeout(5)
+    def test_main_loop_thread_can_process_state(
+        self,
+        mock_data_output_channel,
+        mock_control_output_channel,
+        input_queue,
+        output_queue,
+        main_loop,
+        main_loop_thread,
+        mock_assign_input_port,
+        mock_assign_output_port,
+        mock_add_input_channel,
+        mock_add_partitioning,
+        mock_initialize_executor,
+        mock_data_element,
+        mock_state_data_elements,
+        mock_end_of_upstream,
+        command_sequence,
+        reraise,
+    ):
+        # End-to-end coverage of the state-processing path through the real
+        # MainLoop + DataProcessor threads.
+        #
+        # The cooperative-threading handshake works like this:
+        #   - DataProcessor.run() peeks current_internal_marker /
+        #     current_input_state / current_input_tuple every iteration and
+        #     consumes only the slot whose branch it takes -- unhandled
+        #     inputs survive into the next iteration.
+        #   - process_state runs the executor inside replace_print() and
+        #     writes the result to current_output_state, then notifies
+        #     MainLoop via the finally _switch_context().
+        #   - MainLoop.process_input_state() switches twice and reads
+        #     current_output_state after both switches, so the read sees
+        #     the value DataProc has just written.
+        #
+        # The expected behavior is therefore: each state produces its own
+        # output in its own cycle (no lag), and an EndChannel ECM after
+        # the last state produces an additional output via
+        # produce_state_on_finish.
+        main_loop_thread.start()
+
+        for setup_msg in [
+            mock_assign_input_port,
+            mock_assign_output_port,
+            mock_add_input_channel,
+            mock_add_partitioning,
+            mock_initialize_executor,
+        ]:
+            input_queue.put(setup_msg)
+            assert output_queue.get() == DCMElement(
+                tag=mock_control_output_channel,
+                payload=DirectControlMessagePayloadV2(
+                    return_invocation=ReturnInvocation(
+                        command_id=command_sequence,
+                        return_value=ControlReturn(empty_return=EmptyReturn()),
+                    )
+                ),
+            )
+
+        # Replace the EchoOperator that mock_initialize_executor loaded with
+        # an in-process executor that tags processed states and emits a
+        # finish marker on EndChannel. Going through the InitializeExecutor
+        # RPC above sets up the rest of the worker state (output schema,
+        # partitioning bookkeeping); swapping the executor instance here
+        # lets the test observe whether process_state actually runs without
+        # depending on Python's cross-test module caching for the loaded
+        # operator class.
+        class StateProcessingExecutor:
+            @staticmethod
+            def process_tuple(tuple_, port):
+                yield tuple_
+
+            @staticmethod
+            def process_state(state, port):
+                return {**state, "processed_marker": "executed", "port": port}
+
+            @staticmethod
+            def produce_state_on_finish(port):
+                return {"finish_marker": "produce_state_on_finish_ran"}
+
+            @staticmethod
+            def on_finish(port):
+                yield
+
+            @staticmethod
+            def close():
+                pass
+
+        main_loop.context.executor_manager.executor = StateProcessingExecutor()
+
+        # Send four states directly -- no warm-up tuple needed. With the
+        # init switch in DataProc.run() removed, MainLoop's first switch
+        # lands DataProc directly in the while-loop where it processes
+        # the queued state, so even the first state cycle works.
+        for state_element in mock_state_data_elements:
+            input_queue.put(state_element)
+
+        for expected_value in (1, 2, 3, 4):
+            output_data_element: DataElement = output_queue.get()
+            assert output_data_element.tag == mock_data_output_channel
+            assert isinstance(output_data_element.payload, StateFrame), (
+                f"expected StateFrame for value={expected_value}, got "
+                f"{type(output_data_element.payload).__name__}"
+            )
+            output_state = output_data_element.payload.frame
+            assert output_state["value"] == expected_value, (
+                f"state outputs arrived out of order: expected value="
+                f"{expected_value}, got value={output_state['value']}"
+            )
+            assert output_state["processed_marker"] == "executed"
+            assert output_state["port"] == 0
+
+        # Send EndChannel to drive _process_end_channel. The executor's
+        # produce_state_on_finish writes a finish-marker state into
+        # current_output_state inside DataProc's process_internal_marker;
+        # MainLoop's process_input_state then emits it.
+        input_queue.put(mock_end_of_upstream)
+
+        # Drain the control reply messages so the next data
+        # output_queue.get() returns the post-EndChannel data emission.
+        output_queue.disable_data(InternalQueue.DisableType.DISABLE_BY_PAUSE)
+        for _ in range(3):
+            control_reply = output_queue.get()
+            assert isinstance(control_reply, DCMElement), (
+                f"expected DCMElement during EndChannel teardown, got "
+                f"{type(control_reply).__name__}"
+            )
+        output_queue.enable_data(InternalQueue.DisableType.DISABLE_BY_PAUSE)
+
+        end_channel_state_output: DataElement = output_queue.get()
+        assert end_channel_state_output.tag == mock_data_output_channel
+        assert isinstance(end_channel_state_output.payload, StateFrame), (
+            f"expected StateFrame for the EndChannel-driven emission, got "
+            f"{type(end_channel_state_output.payload).__name__}"
+        )
+        end_channel_state = end_channel_state_output.payload.frame
+        assert "finish_marker" in end_channel_state, (
+            f"EndChannel emission should be the finish-marker state from "
+            f"produce_state_on_finish, got {end_channel_state!r}"
+        )
+        assert (
+            end_channel_state["finish_marker"]
+            == "produce_state_on_finish_ran"
+        )
+
+        reraise()

Reply via email to