aglinxinyuan commented on code in PR #4560:
URL: https://github.com/apache/texera/pull/4560#discussion_r3164918761


##########
amber/src/main/python/core/runnables/data_processor.py:
##########
@@ -133,9 +125,6 @@ def process_tuple(self) -> None:
                 self._context.exception_manager.set_exception_info(exc_info)
                 self._report_exception(exc_info)
 
-            finally:
-                self._switch_context()
-

Review Comment:
   Good question — the change to `process_tuple` is for shared handshake state, 
not specific to tuple processing.
   
   The single-switch state handshake assumes `DataProcessor` is parked at the 
run-loop's end-of-body switch (line 65) between tasks. With `process_tuple`'s 
per-task `finally: self._switch_context()` kept, `DataProcessor` parks inside 
`process_tuple`'s `finally` instead after a tuple — so when a state arrives 
next, MainLoop's single switch wakes `DataProcessor` from that finally, 
`DataProcessor` exits `process_tuple`, hits the run-loop's line 65 switch, 
notifies MainLoop back, and MainLoop returns reading `current_output_state` 
while it's still empty (the executor hasn't run for the new state yet). The 
first state after a tuple is silently dropped.
   
   This isn't covered by `test_main_loop_thread_can_process_messages` because 
that test sends `tuple → EndChannel`, and EndChannel is a marker, not a state — 
`_process_end_channel` drives extra switches via `process_input_tuple → 
process_tuple_with_udf` that recover the timing.
   
   Pushed `fd946b36bb` adding 
`test_main_loop_thread_can_process_state_after_tuple`, which sends a tuple 
followed by four states and asserts all four emit in order. With this PR's full 
redesign, all 8 tests pass. With `process_tuple`'s `finally: 
self._switch_context()` restored (and everything else unchanged), the new test 
fails with `state outputs after a tuple arrived out of order: expected value=1, 
got value=2` — the first state after the tuple is dropped.
   
   So the three `finally` removals (`process_state`, `process_internal_marker`, 
`process_tuple`) form a single contract: `DataProcessor` always parks at line 
65 between tasks, regardless of which task type ran. Touching only the 
state-related two would leave the handshake inconsistent across mixed `tuple → 
state` input sequences.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to