Copilot commented on code in PR #4424:
URL: https://github.com/apache/texera/pull/4424#discussion_r3113417869


##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -241,6 +241,7 @@ def _process_tuple(self, tuple_: Tuple) -> None:
 
     def _process_state(self, state_: State) -> None:
         self.context.state_processing_manager.current_input_state = state_
+        self._switch_context()
         self.process_input_state()

Review Comment:
   The added `_switch_context()` here introduces an additional synchronization 
step in the state-processing handshake. Since this is non-obvious and tightly 
coupled to the DataProcessor-side context switching, please add an inline 
comment describing which DataProcessor wait/notify it is pairing with (and why 
it’s needed for repeated state inputs) to reduce the risk of accidental 
removal/regression later.



##########
amber/src/main/python/core/runnables/test_main_loop.py:
##########
@@ -1035,6 +1037,65 @@ def 
test_main_loop_thread_can_process_single_tuple_with_binary(
 
         reraise()
 
+    @pytest.mark.timeout(2)
+    def test_process_state_can_emit_multiple_states(
+        self,
+        main_loop,
+        output_queue,
+        mock_data_output_channel,
+        monkeypatch,
+    ):
+        class DummyExecutor:
+            @staticmethod
+            def process_state(state: State, port: int) -> State:
+                output_state = State()
+                output_state["value"] = state["value"] + 1
+                output_state["port"] = port
+                return output_state
+
+        main_loop.context.executor_manager.executor = DummyExecutor()
+        monkeypatch.setattr(main_loop, "_check_and_process_control", lambda: 
None)
+        monkeypatch.setattr(
+            main_loop.context.output_manager,
+            "emit_state",
+            lambda state: [(mock_data_output_channel.to_worker_id, 
StateFrame(state))],
+        )
+
+        switch_count = {"value": 0}
+
+        def fake_switch_context():
+            switch_count["value"] += 1
+            if switch_count["value"] % 3 == 2:
+                current_input_state = (
+                    
main_loop.context.state_processing_manager.current_input_state
+                )
+                
main_loop.context.state_processing_manager.current_output_state = (
+                    DummyExecutor.process_state(current_input_state, 0)
+                )

Review Comment:
   This regression test is tightly coupled to the exact number/order of 
`_switch_context()` calls (the `% 3 == 2` logic). That makes it brittle to 
future refactors that preserve behavior but change handshake steps. Consider 
either (a) adding a short comment explaining why 3 switches per state is the 
contract being asserted, or (b) rewriting the test to exercise the real 
MainLoop/DataProcessor threads by sending two `StateFrame` DataElements through 
`input_queue` and asserting two outputs, avoiding reliance on call counts.



##########
amber/src/main/python/core/runnables/data_processor.py:
##########
@@ -100,6 +100,7 @@ def process_state(self, state: State) -> None:
                 self._context.worker_id,
                 self._context.console_message_manager.print_buf,
             ):

Review Comment:
   This new `_switch_context()` adds an extra handshake step before invoking 
`executor.process_state(...)`. Since MainLoop now depends on this for correct 
synchronization across repeated state inputs, please add a brief comment 
explaining the expected switch sequence (i.e., which MainLoop-side 
`_switch_context()` this pairs with) so future changes don’t accidentally break 
the protocol.
   ```suggestion
               ):
                   # Extra handshake for state inputs: this `_switch_context()`
                   # pairs with MainLoop's `_switch_context()` after it 
publishes
                   # the next input state, so repeated state inputs stay
                   # synchronized before `executor.process_state(...)` runs.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to