aglinxinyuan opened a new pull request, #4560: URL: https://github.com/apache/texera/pull/4560
### What changes were proposed in this PR? Fixes the one-cycle lag in the Python state-processing handshake described in #4559. With the lag, a Python operator's `produce_state_on_finish(port)` output is silently dropped on EndChannel: `MainLoop.process_input_state` reads `current_output_state` *between* its two `_switch_context()` calls, but `DataProcessor.process_state` only writes that field after the *second* switch returns, so MainLoop always captures the previous cycle's value. The finish-state written by `process_internal_marker` for `EndChannel` lands in `current_output_state` after MainLoop already captured the local `output_state`, and nothing further re-reads the slot — the finish-state is lost. The fix collapses the handshake to a single switch and removes the per-task `finally: _switch_context()` blocks that forced DataProcessor to park inside the per-task functions: - **`MainLoop.process_input_state`**: one `_switch_context()` followed by a single read of `current_output_state`. The switch returns once DataProcessor has run the executor and written the output, so the read sees the freshly produced state for *this* cycle. - **`DataProcessor.process_state` / `process_internal_marker` / `process_tuple`**: drop the per-task `finally: _switch_context()`. DataProcessor now always parks at the run-loop's end-of-body switch (line 65) between tasks. One MainLoop switch wakes it, drives a full pick-up → executor → output → park-back cycle, and notifies MainLoop back from line 65. - **`DataProcessor.run`**: drop the post-init `_switch_context()`. With the per-task finallys gone, MainLoop's first switch lands DataProcessor directly in the while-loop where it consumes whatever input MainLoop has just queued; an extra init switch would burn the first MainLoop switch on a no-op handshake and silently drop the first state/tuple/marker. - **Run-loop input dispatch**: change from peek-and-consume (`get_internal_marker()` / `get_input_state()` always clear the slot) to peek-then-consume — read `current_*` without clearing, and only call `get_*` on the slot whose branch is taken. Inputs MainLoop populated but this iteration is not handling survive into the next iteration. Net diff: 2 source files, 12 insertions, 19 deletions. ### Any related issues, documentation, discussions? Fixes #4559. ### How was this PR tested? Existing `core/runnables/test_main_loop.py` tests pass unchanged. Two new tests added on top: - `test_process_state_can_emit_multiple_states` — stub-level coverage of the single-switch handshake. Stubs `_switch_context` to simulate DataProcessor consuming `current_input_state` and writing `current_output_state` on each switch, then drives `_process_state` twice and asserts both states emit through `process_input_state`. - `test_main_loop_thread_can_process_state` — full real-thread coverage. Initializes the worker normally, then swaps in an in-process executor that tags processed states (`process_state`) and emits a finish marker on EndChannel (`produce_state_on_finish`). Sends four states and asserts each emits in order, confirming the pipeline has no one-state lag. Then sends an EndChannel ECM and asserts the finish-marker state is emitted via `_process_end_channel -> process_input_state`. This second test times out on plain `main` (the bug from #4559) and passes on this branch. ``` $ python -m pytest core/runnables/test_main_loop.py -v ======================== 7 passed, 5 warnings in 1.78s ======================== ``` ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Anthropic Claude Opus 4.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
