This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 43f276e897 refactor(pyamber): collapse DataProcessor per-call 
boilerplate (#4685)
43f276e897 is described below

commit 43f276e89721a97f6bc28f037fb3b73d15dc1544
Author: Yicong Huang <[email protected]>
AuthorDate: Tue May 5 07:38:57 2026 -0700

    refactor(pyamber): collapse DataProcessor per-call boilerplate (#4685)
    
    ### What changes were proposed in this PR?
    
    Cleanup-only refactor of `core/runnables/data_processor.py`: collapse
    the repeated `try / except / finally` + `replace_print` boilerplate from
    `process_internal_marker`, `process_state`, and `process_tuple` into one
    `_executor_session` `@contextmanager` that yields `(executor, port_id)`.
    No behavior change.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4684.
    
    ### How was this PR tested?
    
    Unit tests added in `test_data_processor.py` and
    `test_main_loop_exception_ordering.py`
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Opus 4.7 (Claude Code)
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../main/python/core/runnables/data_processor.py   | 103 ++++------
 .../python/core/runnables/test_data_processor.py   | 207 +++++++++++++++++++++
 .../main/python/core/runnables/test_main_loop.py   |  57 ++++++
 3 files changed, 302 insertions(+), 65 deletions(-)

diff --git a/amber/src/main/python/core/runnables/data_processor.py 
b/amber/src/main/python/core/runnables/data_processor.py
index 276a1669f5..089a162228 100644
--- a/amber/src/main/python/core/runnables/data_processor.py
+++ b/amber/src/main/python/core/runnables/data_processor.py
@@ -18,6 +18,7 @@
 import os
 import sys
 import traceback
+from contextlib import contextmanager
 from loguru import logger
 from threading import Event
 from typing import Iterator, Optional
@@ -49,19 +50,18 @@ class DataProcessor(Runnable, Stoppable):
         with self._context.tuple_processing_manager.context_switch_condition:
             
self._context.tuple_processing_manager.context_switch_condition.wait()
         self._running.set()
-        self._pre_loop_checks()
+        self._check_and_process_debug_command()
         while self._running.is_set():
             tpm = self._context.tuple_processing_manager
             spm = self._context.state_processing_manager
             has_marker = tpm.current_internal_marker is not None
             has_state = spm.current_input_state is not None
             has_tuple = tpm.current_input_tuple is not None
-            queued = has_marker + has_state + has_tuple
             # MainLoop is single-threaded and sets at most one of
             # current_internal_marker / current_input_state /
             # current_input_tuple per cycle before switching to here, so
             # exactly one slot must be populated on every iteration.
-            if queued != 1:
+            if has_marker + has_state + has_tuple != 1:
                 raise RuntimeError(
                     "DataProcessor expected exactly one queued input per "
                     f"iteration, got marker={has_marker}, state={has_state}, "
@@ -75,34 +75,45 @@ class DataProcessor(Runnable, Stoppable):
                 self.process_tuple()
 
     def process_internal_marker(self, internal_marker: InternalMarker) -> None:
-        try:
-            executor = self._context.executor_manager.executor
-            port_id = 
self._context.tuple_processing_manager.get_input_port_id()
-            with replace_print(
-                self._context.worker_id,
-                self._context.console_message_manager.print_buf,
-            ):
-                if isinstance(internal_marker, StartChannel):
-                    
self._set_output_state(executor.produce_state_on_start(port_id))
-                elif isinstance(internal_marker, EndChannel):
-                    
self._set_output_state(executor.produce_state_on_finish(port_id))
-                    self._switch_context()
-                    self._set_output_tuple(executor.on_finish(port_id))
-
-        except Exception as err:
-            logger.exception(err)
-            exc_info = sys.exc_info()
-            self._context.exception_manager.set_exception_info(exc_info)
-            self._report_exception(exc_info)
-
-        finally:
-            self._switch_context()
+        with self._executor_session() as (executor, port_id):
+            if isinstance(internal_marker, StartChannel):
+                
self._set_output_state(executor.produce_state_on_start(port_id))
+            elif isinstance(internal_marker, EndChannel):
+                
self._set_output_state(executor.produce_state_on_finish(port_id))
+                # Flush the state to MainLoop before producing tuples so the
+                # state and the tuple stream don't share a single switch.
+                self._switch_context()
+                self._set_output_tuple(executor.on_finish(port_id))
 
     def process_state(self, state: State) -> None:
         """
         Process an input marker by invoking appropriate state
         or tuple generation based on the marker type.
         """
+        with self._executor_session() as (executor, port_id):
+            self._set_output_state(executor.process_state(state, port_id))
+
+    def process_tuple(self) -> None:
+        """
+        Process an input tuple by invoking the executor's tuple processing 
method.
+        """
+        finished_current = 
self._context.tuple_processing_manager.finished_current
+        while not finished_current.is_set():
+            with self._executor_session() as (executor, port_id):
+                tuple_ = 
self._context.tuple_processing_manager.get_input_tuple()
+                self._set_output_tuple(executor.process_tuple(tuple_, port_id))
+
+    @contextmanager
+    def _executor_session(self):
+        """
+        Open one executor invocation: hand back (executor, port_id) under a
+        print-capture session, route any exception to the exception manager
+        and queue the stack trace as a console message, and always switch
+        back to MainLoop on exit. Reporting must happen *before* the
+        switch: MainLoop's post-switch hook flushes console messages and
+        then enters EXCEPTION_PAUSE, so anything queued after the switch
+        would arrive at the controller only after the worker resumes.
+        """
         try:
             executor = self._context.executor_manager.executor
             port_id = 
self._context.tuple_processing_manager.get_input_port_id()
@@ -110,42 +121,15 @@ class DataProcessor(Runnable, Stoppable):
                 self._context.worker_id,
                 self._context.console_message_manager.print_buf,
             ):
-                self._set_output_state(executor.process_state(state, port_id))
-
+                yield executor, port_id
         except Exception as err:
             logger.exception(err)
             exc_info = sys.exc_info()
             self._context.exception_manager.set_exception_info(exc_info)
             self._report_exception(exc_info)
-
         finally:
             self._switch_context()
 
-    def process_tuple(self) -> None:
-        """
-        Process an input tuple by invoking the executor's tuple processing 
method.
-        """
-        finished_current = 
self._context.tuple_processing_manager.finished_current
-        while not finished_current.is_set():
-            try:
-                executor = self._context.executor_manager.executor
-                port_id = 
self._context.tuple_processing_manager.get_input_port_id()
-                tuple_ = 
self._context.tuple_processing_manager.get_input_tuple()
-                with replace_print(
-                    self._context.worker_id,
-                    self._context.console_message_manager.print_buf,
-                ):
-                    self._set_output_tuple(executor.process_tuple(tuple_, 
port_id))
-
-            except Exception as err:
-                logger.exception(err)
-                exc_info = sys.exc_info()
-                self._context.exception_manager.set_exception_info(exc_info)
-                self._report_exception(exc_info)
-
-            finally:
-                self._switch_context()
-
     def _set_output_tuple(self, output_iterator: 
Iterator[Optional[TupleLike]]) -> None:
         """
         Set the output tuple after processing by the executor.
@@ -179,7 +163,7 @@ class DataProcessor(Runnable, Stoppable):
         with self._context.tuple_processing_manager.context_switch_condition:
             
self._context.tuple_processing_manager.context_switch_condition.notify()
             
self._context.tuple_processing_manager.context_switch_condition.wait()
-        self._post_switch_context_checks()
+        self._check_and_process_debug_command()
 
     def _check_and_process_debug_command(self) -> None:
         """
@@ -191,17 +175,6 @@ class DataProcessor(Runnable, Stoppable):
             # This line has no side effects on the current debugger state.
             self._context.debug_manager.debugger.set_trace()
 
-    def _post_switch_context_checks(self):
-        self._check_and_process_debug_command()
-
-    def _pre_loop_checks(self) -> None:
-        # Runs once after init and before the first task so that a debug
-        # command queued during worker setup fires before any
-        # tuple / state / marker is processed. Only the debug-command
-        # check is needed here -- no task has run yet, so there is no
-        # exception to surface.
-        self._check_and_process_debug_command()
-
     def _report_exception(self, exc_info: ExceptionInfo):
         tb = traceback.extract_tb(exc_info[2])
         filename, line_number, func_name, text = tb[-1]
diff --git a/amber/src/main/python/core/runnables/test_data_processor.py 
b/amber/src/main/python/core/runnables/test_data_processor.py
new file mode 100644
index 0000000000..61cc5bf7e9
--- /dev/null
+++ b/amber/src/main/python/core/runnables/test_data_processor.py
@@ -0,0 +1,207 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from core.architecture.managers import Context
+from core.models import State
+from core.models.internal_queue import InternalQueue
+from core.models.internal_marker import EndChannel, StartChannel
+from core.runnables.data_processor import DataProcessor
+from proto.org.apache.texera.amber.engine.architecture.rpc import 
ConsoleMessageType
+
+
[email protected]
+def context():
+    return Context(worker_id="test-worker", input_queue=InternalQueue())
+
+
[email protected]
+def data_processor(context, monkeypatch):
+    """
+    DataProcessor with `_switch_context` swapped for a counter so each test
+    can drive the synchronous parts of the per-call boilerplate without
+    blocking on the cross-thread handshake.
+    """
+    dp = DataProcessor(context)
+    dp.switch_calls = 0
+
+    def fake_switch():
+        dp.switch_calls += 1
+
+    monkeypatch.setattr(dp, "_switch_context", fake_switch)
+    return dp
+
+
+class _StubExecutor:
+    """
+    Records what `process_internal_marker` invokes on it so the test can
+    assert the StartChannel / EndChannel branches of `data_processor`
+    without standing up a real Operator.
+    """
+
+    def __init__(self):
+        self.calls = []
+
+    def produce_state_on_start(self, port_id):
+        self.calls.append(("produce_state_on_start", port_id))
+        return {"phase": "start"}
+
+    def produce_state_on_finish(self, port_id):
+        self.calls.append(("produce_state_on_finish", port_id))
+        return {"phase": "finish"}
+
+    def on_finish(self, port_id):
+        self.calls.append(("on_finish", port_id))
+        return iter([])
+
+
+class TestProcessInternalMarker:
+    @pytest.mark.timeout(2)
+    def test_start_channel_invokes_produce_state_on_start(
+        self, context, data_processor
+    ):
+        executor = _StubExecutor()
+        context.executor_manager.executor = executor
+
+        data_processor.process_internal_marker(StartChannel())
+
+        # StartChannel routes to produce_state_on_start with the current
+        # input port id (0 when no upstream is set), and the returned dict
+        # is wrapped into a State on the output slot.
+        assert executor.calls == [("produce_state_on_start", 0)]
+        out = context.state_processing_manager.current_output_state
+        assert isinstance(out, State)
+        assert out["phase"] == "start"
+        # `_executor_session` always switches once on exit.
+        assert data_processor.switch_calls == 1
+
+    @pytest.mark.timeout(2)
+    def test_end_channel_flushes_state_then_drains_on_finish(
+        self, context, data_processor
+    ):
+        executor = _StubExecutor()
+        context.executor_manager.executor = executor
+
+        data_processor.process_internal_marker(EndChannel())
+
+        # EndChannel must call produce_state_on_finish first, switch
+        # context to flush that state separately from the on_finish
+        # tuple stream, then drain on_finish. The session itself adds
+        # its own trailing switch on exit.
+        assert executor.calls == [
+            ("produce_state_on_finish", 0),
+            ("on_finish", 0),
+        ]
+        # 1 switch from the explicit flush + 1 from `_executor_session`
+        # exit. `_set_output_tuple` exits early on an empty iterator and
+        # does not switch.
+        assert data_processor.switch_calls == 2
+
+
+class TestExecutorSession:
+    @pytest.mark.timeout(2)
+    def test_exception_inside_session_is_reported_before_the_switch(
+        self, context, data_processor
+    ):
+        # Order matters: MainLoop's _check_exception flushes pending
+        # console messages and then immediately enters EXCEPTION_PAUSE,
+        # so the stack trace must already be in the buffer at the moment
+        # _executor_session calls _switch_context. Capture the buffer
+        # state from inside the fake switch to pin that ordering.
+        seen_at_switch = []
+
+        def capturing_switch():
+            seen_at_switch.extend(
+                context.console_message_manager.get_messages(force_flush=True)
+            )
+            data_processor.switch_calls += 1
+
+        data_processor._switch_context = capturing_switch
+
+        with data_processor._executor_session() as session:
+            assert session is not None
+            raise RuntimeError("boom-from-executor")
+
+        # Exception was routed into the manager so MainLoop's
+        # _check_exception can see it.
+        assert context.exception_manager.has_exception()
+        exc_info = context.exception_manager.get_exc_info()
+        assert exc_info[0] is RuntimeError
+        assert "boom-from-executor" in str(exc_info[1])
+        # And the stack-trace console message was queued *before* the
+        # finally-clause switch — without this, the worker would pause
+        # before ever sending the error to the controller.
+        assert len(seen_at_switch) == 1
+        msg = seen_at_switch[0]
+        assert msg.worker_id == "test-worker"
+        assert msg.msg_type == ConsoleMessageType.ERROR
+        assert "RuntimeError: boom-from-executor" in msg.title
+        # Exit always switches back to MainLoop, even on the failure path.
+        assert data_processor.switch_calls == 1
+
+    @pytest.mark.timeout(2)
+    def test_clean_session_does_not_record_an_exception(self, context, 
data_processor):
+        with data_processor._executor_session():
+            pass
+
+        assert not context.exception_manager.has_exception()
+        assert (
+            
list(context.console_message_manager.get_messages(force_flush=True)) == []
+        )
+        # Even on the success path, the finally clause yields control
+        # back to MainLoop exactly once.
+        assert data_processor.switch_calls == 1
+
+
+class TestRunInvariant:
+    """
+    `run()` enforces that exactly one of marker / state / tuple is queued per
+    iteration. The invariant raises a RuntimeError otherwise — that branch
+    is otherwise unreachable in the integration tests, so cover it directly.
+    """
+
+    @staticmethod
+    def _drive_run_synchronously(context, monkeypatch) -> DataProcessor:
+        # `run()` opens with a condition.wait() so MainLoop can hand off
+        # control. Stub that out so the test thread can call run() directly
+        # and reach the invariant check on the very first iteration without
+        # any cross-thread coordination.
+        cond = context.tuple_processing_manager.context_switch_condition
+        monkeypatch.setattr(cond, "wait", lambda *a, **kw: None)
+        return DataProcessor(context)
+
+    @pytest.mark.timeout(2)
+    def test_zero_queued_inputs_raises_invariant_error(self, context, 
monkeypatch):
+        dp = self._drive_run_synchronously(context, monkeypatch)
+        # Nothing is set on tpm/spm — has_marker + has_state + has_tuple == 0.
+        with pytest.raises(RuntimeError) as excinfo:
+            dp.run()
+        assert "expected exactly one queued input" in str(excinfo.value)
+        assert "marker=False, state=False, tuple=False" in str(excinfo.value)
+
+    @pytest.mark.timeout(2)
+    def test_two_queued_inputs_raises_invariant_error(self, context, 
monkeypatch):
+        dp = self._drive_run_synchronously(context, monkeypatch)
+        # Populate two slots — has_marker + has_tuple == 2.
+        context.tuple_processing_manager.current_internal_marker = 
StartChannel()
+        context.tuple_processing_manager.current_input_tuple = ("payload",)
+        with pytest.raises(RuntimeError) as excinfo:
+            dp.run()
+        assert "expected exactly one queued input" in str(excinfo.value)
+        assert "marker=True" in str(excinfo.value)
+        assert "tuple=True" in str(excinfo.value)
diff --git a/amber/src/main/python/core/runnables/test_main_loop.py 
b/amber/src/main/python/core/runnables/test_main_loop.py
index c9daa633f5..400a7f2a90 100644
--- a/amber/src/main/python/core/runnables/test_main_loop.py
+++ b/amber/src/main/python/core/runnables/test_main_loop.py
@@ -20,6 +20,7 @@ import pandas
 import pickle
 import pyarrow
 import pytest
+import sys
 import time
 from threading import Thread
 
@@ -48,6 +49,8 @@ from proto.org.apache.texera.amber.core import (
     OpExecInitInfo,
     EmbeddedControlMessageIdentity,
 )
+from core.architecture.managers.pause_manager import PauseType
+from core.util.console_message.timestamp import current_time_in_local_timezone
 from proto.org.apache.texera.amber.engine.architecture.rpc import (
     ControlRequest,
     AssignPortRequest,
@@ -65,6 +68,8 @@ from proto.org.apache.texera.amber.engine.architecture.rpc 
import (
     WorkerStateResponse,
     EmbeddedControlMessageType,
     EmbeddedControlMessage,
+    ConsoleMessage,
+    ConsoleMessageType,
 )
 from proto.org.apache.texera.amber.engine.architecture.sendsemantics import (
     OneToOnePartitioning,
@@ -1563,3 +1568,55 @@ class TestMainLoop:
             assert output_state["processed_marker"] == "executed"
 
         reraise()
+
+    @pytest.mark.timeout(2)
+    def test_console_message_rpc_fires_before_exception_pause(
+        self, main_loop, monkeypatch
+    ):
+        # Pin the controller-facing contract: when DataProcessor raises
+        # during an executor call, the stack-trace ConsoleMessage must
+        # reach the controller *before* the worker enters EXCEPTION_PAUSE
+        # — otherwise the UI sees a paused worker with no error to show
+        # until the user resumes. The DataProcessor side queues the
+        # message before the switch (covered by
+        # test_data_processor.TestExecutorSession); this test pins the
+        # MainLoop side: post-switch hook flushes RPCs first, pauses last.
+        events = []
+
+        monkeypatch.setattr(
+            main_loop,
+            "_send_console_message",
+            lambda msg: events.append(("rpc", msg)),
+        )
+        monkeypatch.setattr(
+            main_loop.context.pause_manager,
+            "pause",
+            lambda pause_type, change_state=True: events.append(("pause", 
pause_type)),
+        )
+
+        try:
+            raise RuntimeError("boom-from-executor")
+        except RuntimeError:
+            exc_info = sys.exc_info()
+        main_loop.context.exception_manager.set_exception_info(exc_info)
+        main_loop.context.console_message_manager.put_message(
+            ConsoleMessage(
+                worker_id="dummy_worker_id",
+                timestamp=current_time_in_local_timezone(),
+                msg_type=ConsoleMessageType.ERROR,
+                source="test:_capture_exc_info:0",
+                title="RuntimeError: boom-from-executor",
+                message="RuntimeError: boom-from-executor",
+            )
+        )
+
+        main_loop._post_switch_context_checks()
+
+        kinds = [e[0] for e in events]
+        assert kinds == ["rpc", "pause"], (
+            "console message must reach controller before pause; "
+            f"observed order: {kinds}"
+        )
+        assert events[0][1].msg_type == ConsoleMessageType.ERROR
+        assert "boom-from-executor" in events[0][1].title
+        assert events[1][1] is PauseType.EXCEPTION_PAUSE

Reply via email to