aglinxinyuan opened a new pull request, #4560:
URL: https://github.com/apache/texera/pull/4560

   ### What changes were proposed in this PR?
   
   Fixes the one-cycle lag in the Python state-processing handshake described 
in #4559. With the lag, a Python operator's `produce_state_on_finish(port)` 
output is silently dropped on EndChannel: `MainLoop.process_input_state` reads 
`current_output_state` *between* its two `_switch_context()` calls, but 
`DataProcessor.process_state` only writes that field after the *second* switch 
returns, so MainLoop always captures the previous cycle's value. The 
finish-state written by `process_internal_marker` for `EndChannel` lands in 
`current_output_state` after MainLoop already captured the local 
`output_state`, and nothing further re-reads the slot — the finish-state is 
lost.
   
   The fix collapses the handshake to a single switch and removes the per-task 
`finally: _switch_context()` blocks that forced DataProcessor to park inside 
the per-task functions:
   
   - **`MainLoop.process_input_state`**: one `_switch_context()` followed by a 
single read of `current_output_state`. The switch returns once DataProcessor 
has run the executor and written the output, so the read sees the freshly 
produced state for *this* cycle.
   
   - **`DataProcessor.process_state` / `process_internal_marker` / 
`process_tuple`**: drop the per-task `finally: _switch_context()`. 
DataProcessor now always parks at the run-loop's end-of-body switch (line 65) 
between tasks. One MainLoop switch wakes it, drives a full pick-up → executor → 
output → park-back cycle, and notifies MainLoop back from line 65.
   
   - **`DataProcessor.run`**: drop the post-init `_switch_context()`. With the 
per-task finallys gone, MainLoop's first switch lands DataProcessor directly in 
the while-loop where it consumes whatever input MainLoop has just queued; an 
extra init switch would burn the first MainLoop switch on a no-op handshake and 
silently drop the first state/tuple/marker.
   
   - **Run-loop input dispatch**: change from peek-and-consume 
(`get_internal_marker()` / `get_input_state()` always clear the slot) to 
peek-then-consume — read `current_*` without clearing, and only call `get_*` on 
the slot whose branch is taken. Inputs MainLoop populated but this iteration is 
not handling survive into the next iteration.
   
   Net diff: 2 source files, 12 insertions, 19 deletions.
   
   ### Any related issues, documentation, discussions?
   
   Fixes #4559.
   
   ### How was this PR tested?
   
   Existing `core/runnables/test_main_loop.py` tests pass unchanged.
   
   Two new tests added on top:
   
   - `test_process_state_can_emit_multiple_states` — stub-level coverage of the 
single-switch handshake. Stubs `_switch_context` to simulate DataProcessor 
consuming `current_input_state` and writing `current_output_state` on each 
switch, then drives `_process_state` twice and asserts both states emit through 
`process_input_state`.
   
   - `test_main_loop_thread_can_process_state` — full real-thread coverage. 
Initializes the worker normally, then swaps in an in-process executor that tags 
processed states (`process_state`) and emits a finish marker on EndChannel 
(`produce_state_on_finish`). Sends four states and asserts each emits in order, 
confirming the pipeline has no one-state lag. Then sends an EndChannel ECM and 
asserts the finish-marker state is emitted via `_process_end_channel -> 
process_input_state`. This second test times out on plain `main` (the bug from 
#4559) and passes on this branch.
   
   ```
   $ python -m pytest core/runnables/test_main_loop.py -v
   ======================== 7 passed, 5 warnings in 1.78s 
========================
   ```
   
   ### Was this PR authored or co-authored using generative AI tooling?
   
   Generated-by: Anthropic Claude Opus 4.7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to