Copilot commented on code in PR #4685:
URL: https://github.com/apache/texera/pull/4685#discussion_r3177835237
##########
amber/src/main/python/core/runnables/data_processor.py:
##########
@@ -75,77 +75,58 @@ def run(self) -> None:
self.process_tuple()
def process_internal_marker(self, internal_marker: InternalMarker) -> None:
- try:
- executor = self._context.executor_manager.executor
- port_id =
self._context.tuple_processing_manager.get_input_port_id()
- with replace_print(
- self._context.worker_id,
- self._context.console_message_manager.print_buf,
- ):
- if isinstance(internal_marker, StartChannel):
-
self._set_output_state(executor.produce_state_on_start(port_id))
- elif isinstance(internal_marker, EndChannel):
-
self._set_output_state(executor.produce_state_on_finish(port_id))
- self._switch_context()
- self._set_output_tuple(executor.on_finish(port_id))
-
- except Exception as err:
- logger.exception(err)
- exc_info = sys.exc_info()
- self._context.exception_manager.set_exception_info(exc_info)
- self._report_exception(exc_info)
-
- finally:
- self._switch_context()
+ with self._executor_session() as (executor, port_id):
+ if isinstance(internal_marker, StartChannel):
+
self._set_output_state(executor.produce_state_on_start(port_id))
+ elif isinstance(internal_marker, EndChannel):
+
self._set_output_state(executor.produce_state_on_finish(port_id))
+ # Flush the state to MainLoop before producing tuples so the
+ # state and the tuple stream don't share a single switch.
+ self._switch_context()
+ self._set_output_tuple(executor.on_finish(port_id))
def process_state(self, state: State) -> None:
"""
Process an input marker by invoking appropriate state
or tuple generation based on the marker type.
"""
+ with self._executor_session() as (executor, port_id):
+ self._set_output_state(executor.process_state(state, port_id))
+
+ def process_tuple(self) -> None:
+ """
+ Process an input tuple by invoking the executor's tuple processing
method.
+ """
+ finished_current =
self._context.tuple_processing_manager.finished_current
+ while not finished_current.is_set():
+ with self._executor_session() as (executor, port_id):
+ tuple_ =
self._context.tuple_processing_manager.get_input_tuple()
+ self._set_output_tuple(executor.process_tuple(tuple_, port_id))
+
+ @contextmanager
+ def _executor_session(self):
+ """
+ Open one executor invocation: hand back (executor, port_id) under a
+ print-capture session, route any exception into the exception
+ manager, and always switch back to MainLoop on exit. Reporting and
+ the post-resolution yield happen in `_post_switch_context_checks`
+ so they live in one place rather than being duplicated in every
+ process_* method.
+ """
try:
executor = self._context.executor_manager.executor
port_id =
self._context.tuple_processing_manager.get_input_port_id()
with replace_print(
self._context.worker_id,
self._context.console_message_manager.print_buf,
):
- self._set_output_state(executor.process_state(state, port_id))
-
+ yield executor, port_id
except Exception as err:
logger.exception(err)
- exc_info = sys.exc_info()
- self._context.exception_manager.set_exception_info(exc_info)
- self._report_exception(exc_info)
-
+ self._context.exception_manager.set_exception_info(sys.exc_info())
Review Comment:
Deferring `_report_exception` until after `_switch_context()` changes the
observable behavior on failures: `MainLoop._post_switch_context_checks()` sends
buffered console messages before it checks `exception_manager` and enters
`EXCEPTION_PAUSE`. With this refactor, the first switch back after an executor
error has no error message queued yet, so the worker pauses without sending the
stack trace to the controller/UI; the error is only buffered after the user
resumes.
##########
amber/src/main/python/core/runnables/test_data_processor.py:
##########
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import sys
+
+import pytest
+
+from core.architecture.managers import Context
+from core.models.internal_queue import InternalQueue
+from core.runnables.data_processor import DataProcessor
+from proto.org.apache.texera.amber.engine.architecture.rpc import
ConsoleMessageType
+
+
[email protected]
+def context():
+ return Context(worker_id="test-worker", input_queue=InternalQueue())
+
+
[email protected]
+def data_processor(context, monkeypatch):
+ """
+ DataProcessor with `_switch_context` swapped for a counter so the
+ `post_switch` checks can yield without blocking the test thread and the
+ test can assert exactly how many extra switches happened.
+ """
+ dp = DataProcessor(context)
+ dp.switch_calls = 0
+
+ def fake_switch():
+ dp.switch_calls += 1
+
+ monkeypatch.setattr(dp, "_switch_context", fake_switch)
Review Comment:
These tests replace `_switch_context()` with a counter, so they never
exercise the real DataProcessor/MainLoop handshake. That misses the key
regression here: after an executor exception, `MainLoop` now sees
`exception_manager.has_exception()` before any error console message has been
enqueued. Please add an integration-style test that runs the real context
switch and asserts the stack trace is delivered to the controller before the
worker enters exception pause.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]