Copilot commented on code in PR #4424:
URL: https://github.com/apache/texera/pull/4424#discussion_r3113417869
##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -241,6 +241,7 @@ def _process_tuple(self, tuple_: Tuple) -> None:
def _process_state(self, state_: State) -> None:
self.context.state_processing_manager.current_input_state = state_
+ self._switch_context()
self.process_input_state()
Review Comment:
The added `_switch_context()` here introduces an additional synchronization
step in the state-processing handshake. Since this is non-obvious and tightly
coupled to the DataProcessor-side context switching, please add an inline
comment describing which DataProcessor wait/notify it is pairing with (and why
it’s needed for repeated state inputs) to reduce the risk of accidental
removal/regression later.
##########
amber/src/main/python/core/runnables/test_main_loop.py:
##########
@@ -1035,6 +1037,65 @@ def
test_main_loop_thread_can_process_single_tuple_with_binary(
reraise()
+ @pytest.mark.timeout(2)
+ def test_process_state_can_emit_multiple_states(
+ self,
+ main_loop,
+ output_queue,
+ mock_data_output_channel,
+ monkeypatch,
+ ):
+ class DummyExecutor:
+ @staticmethod
+ def process_state(state: State, port: int) -> State:
+ output_state = State()
+ output_state["value"] = state["value"] + 1
+ output_state["port"] = port
+ return output_state
+
+ main_loop.context.executor_manager.executor = DummyExecutor()
+ monkeypatch.setattr(main_loop, "_check_and_process_control", lambda:
None)
+ monkeypatch.setattr(
+ main_loop.context.output_manager,
+ "emit_state",
+ lambda state: [(mock_data_output_channel.to_worker_id,
StateFrame(state))],
+ )
+
+ switch_count = {"value": 0}
+
+ def fake_switch_context():
+ switch_count["value"] += 1
+ if switch_count["value"] % 3 == 2:
+ current_input_state = (
+
main_loop.context.state_processing_manager.current_input_state
+ )
+
main_loop.context.state_processing_manager.current_output_state = (
+ DummyExecutor.process_state(current_input_state, 0)
+ )
Review Comment:
This regression test is tightly coupled to the exact number/order of
`_switch_context()` calls (the `% 3 == 2` logic). That makes it brittle to
future refactors that preserve behavior but change handshake steps. Consider
either (a) adding a short comment explaining why 3 switches per state is the
contract being asserted, or (b) rewriting the test to exercise the real
MainLoop/DataProcessor threads by sending two `StateFrame` DataElements through
`input_queue` and asserting two outputs, avoiding reliance on call counts.
##########
amber/src/main/python/core/runnables/data_processor.py:
##########
@@ -100,6 +100,7 @@ def process_state(self, state: State) -> None:
self._context.worker_id,
self._context.console_message_manager.print_buf,
):
Review Comment:
This new `_switch_context()` adds an extra handshake step before invoking
`executor.process_state(...)`. Since MainLoop now depends on this for correct
synchronization across repeated state inputs, please add a brief comment
explaining the expected switch sequence (i.e., which MainLoop-side
`_switch_context()` this pairs with) so future changes don’t accidentally break
the protocol.
```suggestion
):
# Extra handshake for state inputs: this `_switch_context()`
# pairs with MainLoop's `_switch_context()` after it
publishes
# the next input state, so repeated state inputs stay
# synchronized before `executor.process_state(...)` runs.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]