This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 9474b9de2b fix(amber): emit Python operator state outputs reliably
(#4560)
9474b9de2b is described below
commit 9474b9de2bd2acea85ca67577b30bcf39c3a3bf0
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 30 16:23:07 2026 -0700
fix(amber): emit Python operator state outputs reliably (#4560)
### What changes were proposed in this PR?
Restores reliable state-output emission for Python operators after the
#4552 revert. After this PR, both per-input-state outputs
(`Operator.process_state(...)`) and the end-of-input-port output
(`Operator.produce_state_on_finish(...)`) reach downstream channels.
`MainLoop.process_input_state` previously did two `_switch_context()`
calls with the read of `current_output_state` in between. The executor
only writes that field during the *second* switch — so `MainLoop` always
captured the previous cycle's value, and the finish-state set on
`EndChannel` ended up in `current_output_state` after `MainLoop` had
returned, never to be read again. This PR collapses the read to a single
switch + read-after, drops the duplicate post-init and end-of-body
switches in `DataProcessor.run`, and makes the run-loop's input dispatch
peek-then-consume so `current_internal_marker` keeps the atomic
single-consume semantics whose absence was the root cause of #4545.
<details>
<summary>History — third attempt at this fix</summary>
- #4421 reported that a Python operator could process its first state
input but not its second.
- PR #4424 added three `_switch_context()` calls to keep `MainLoop` and
`DataProcessor` in sync, closed #4421, but changed
`current_internal_marker` lifetime and broke the source-propagation case
in `ReconfigurationSpec` (#4545).
- PR #4547 tried to restore atomic marker consumption on top of #4424
and re-enabled the source-propagation case in `ReconfigurationSpec`. CI
continued to fail.
- PR #4552 reverted #4424 outright as a stop-gap. State-processing is
back to its pre-#4424 broken state — see #4559.
</details>
### Any related issues, documentation, discussions?
Fixes #4559. Follow-up to #4421 / #4424 / #4545 / #4547 / #4552.
### How was this PR tested?
Existing `core/runnables/test_main_loop.py` tests pass unchanged. Added
three new tests:
- `test_process_state_can_emit_multiple_states` — stub-level coverage of
the #4421 "second state not processed" scenario.
- `test_main_loop_thread_can_process_state` — full real-thread coverage
of state DataElements and `produce_state_on_finish` on `EndChannel`.
Times out on plain `main` (#4559); passes on this branch.
- `test_main_loop_thread_can_process_state_after_tuple` — coverage for
the mixed `tuple → state` input sequence.
`ReconfigurationSpec`'s source-propagation case (re-enabled in #4547)
should be re-run on this branch to confirm the new handshake does not
re-introduce #4545.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Anthropic Claude Opus 4.7
---
.../main/python/core/runnables/data_processor.py | 42 ++-
amber/src/main/python/core/runnables/main_loop.py | 1 -
.../main/python/core/runnables/test_main_loop.py | 290 +++++++++++++++++++++
3 files changed, 320 insertions(+), 13 deletions(-)
diff --git a/amber/src/main/python/core/runnables/data_processor.py
b/amber/src/main/python/core/runnables/data_processor.py
index 4399b1a3a2..35d2a75d1d 100644
--- a/amber/src/main/python/core/runnables/data_processor.py
+++ b/amber/src/main/python/core/runnables/data_processor.py
@@ -49,20 +49,30 @@ class DataProcessor(Runnable, Stoppable):
with self._context.tuple_processing_manager.context_switch_condition:
self._context.tuple_processing_manager.context_switch_condition.wait()
self._running.set()
- self._switch_context()
+ self._pre_loop_checks()
while self._running.is_set():
- marker =
self._context.tuple_processing_manager.get_internal_marker()
- state = self._context.state_processing_manager.get_input_state()
- tuple_ = self._context.tuple_processing_manager.current_input_tuple
- if marker is not None:
- self.process_internal_marker(marker)
- elif state is not None:
- self.process_state(state)
- elif tuple_ is not None:
- self.process_tuple()
+ tpm = self._context.tuple_processing_manager
+ spm = self._context.state_processing_manager
+ has_marker = tpm.current_internal_marker is not None
+ has_state = spm.current_input_state is not None
+ has_tuple = tpm.current_input_tuple is not None
+ queued = has_marker + has_state + has_tuple
+ # MainLoop is single-threaded and sets at most one of
+ # current_internal_marker / current_input_state /
+ # current_input_tuple per cycle before switching to here, so
+ # exactly one slot must be populated on every iteration.
+ if queued != 1:
+ raise RuntimeError(
+ "DataProcessor expected exactly one queued input per "
+ f"iteration, got marker={has_marker}, state={has_state}, "
+ f"tuple={has_tuple}"
+ )
+ if has_marker:
+ self.process_internal_marker(tpm.get_internal_marker())
+ elif has_state:
+ self.process_state(spm.get_input_state())
else:
- raise RuntimeError("No marker or tuple to process.")
- self._switch_context()
+ self.process_tuple()
def process_internal_marker(self, internal_marker: InternalMarker) -> None:
try:
@@ -182,6 +192,14 @@ class DataProcessor(Runnable, Stoppable):
def _post_switch_context_checks(self):
self._check_and_process_debug_command()
+ def _pre_loop_checks(self) -> None:
+ # Runs once after init and before the first task so that a debug
+ # command queued during worker setup fires before any
+ # tuple / state / marker is processed. Only the debug-command
+ # check is needed here -- no task has run yet, so there is no
+ # exception to surface.
+ self._check_and_process_debug_command()
+
def _report_exception(self, exc_info: ExceptionInfo):
tb = traceback.extract_tb(exc_info[2])
filename, line_number, func_name, text = tb[-1]
diff --git a/amber/src/main/python/core/runnables/main_loop.py
b/amber/src/main/python/core/runnables/main_loop.py
index cde2847206..ab35cda81b 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -192,7 +192,6 @@ class MainLoop(StoppableQueueBlockingRunnable):
def process_input_state(self) -> None:
self._switch_context()
output_state = self.context.state_processing_manager.get_output_state()
- self._switch_context()
if output_state is not None:
for to, batch in
self.context.output_manager.emit_state(output_state):
self._output_queue.put(
diff --git a/amber/src/main/python/core/runnables/test_main_loop.py
b/amber/src/main/python/core/runnables/test_main_loop.py
index 5612e4b41a..cc6969d964 100644
--- a/amber/src/main/python/core/runnables/test_main_loop.py
+++ b/amber/src/main/python/core/runnables/test_main_loop.py
@@ -26,6 +26,8 @@ from threading import Thread
from core.models import (
DataFrame,
InternalQueue,
+ State,
+ StateFrame,
Tuple,
)
from core.models.internal_queue import (
@@ -160,6 +162,57 @@ class TestMainLoop:
),
)
+ @pytest.fixture
+ def mock_state_data_elements(self, mock_data_input_channel):
+ elements = []
+ for value in (1, 2, 3, 4):
+ state = State()
+ state.add("value", value)
+ elements.append(
+ DataElement(
+ tag=mock_data_input_channel,
+ payload=StateFrame(frame=state),
+ )
+ )
+ return elements
+
+ @pytest.fixture
+ def state_processing_executor(self):
+ # In-process executor for the state-pipeline tests. Tags processed
+ # states with `processed_marker` and emits a finish-marker state
+ # from `produce_state_on_finish` so EndChannel handling can be
+ # observed.
+ class StateProcessingExecutor:
+ @staticmethod
+ def process_tuple(tuple_, port):
+ yield tuple_
+
+ @staticmethod
+ def process_state(state: State, port: int) -> State:
+ new_state = State()
+ for key, value in state.__dict__.items():
+ if key != "schema":
+ new_state.add(key, value)
+ new_state.add("processed_marker", "executed")
+ new_state.add("port", port)
+ return new_state
+
+ @staticmethod
+ def produce_state_on_finish(port: int) -> State:
+ finish_state = State()
+ finish_state.add("finish_marker",
"produce_state_on_finish_ran")
+ return finish_state
+
+ @staticmethod
+ def on_finish(port):
+ yield
+
+ @staticmethod
+ def close():
+ pass
+
+ return StateProcessingExecutor()
+
@pytest.fixture
def mock_binary_data_element(self, mock_binary_tuple,
mock_data_input_channel):
return DataElement(
@@ -1231,3 +1284,240 @@ class TestMainLoop:
"test-1"
] == b"pickle " + pickle.dumps(mock_binary_tuple["test-1"])
reraise()
+
+ @pytest.mark.timeout(2)
+ def test_process_state_can_emit_multiple_states(
+ self,
+ main_loop,
+ output_queue,
+ mock_data_output_channel,
+ monkeypatch,
+ ):
+ # Stub-level coverage of the single-switch state handshake. Each
+ # call to the (stubbed) _switch_context simulates DataProc
+ # consuming the queued input state and writing
+ # current_output_state, mirroring what real DataProc.process_state
+ # does between MainLoop's switches.
+ class DummyExecutor:
+ @staticmethod
+ def process_state(state: State, port: int) -> State:
+ output_state = State()
+ output_state.add("value", state["value"] + 1)
+ output_state.add("port", port)
+ return output_state
+
+ main_loop.context.executor_manager.executor = DummyExecutor()
+ monkeypatch.setattr(main_loop, "_check_and_process_control", lambda:
None)
+ monkeypatch.setattr(
+ main_loop.context.output_manager,
+ "emit_state",
+ lambda state: [(mock_data_output_channel.to_worker_id,
StateFrame(state))],
+ )
+
+ def fake_switch_context():
+ current_input_state = (
+ main_loop.context.state_processing_manager.current_input_state
+ )
+ if current_input_state is not None:
+
main_loop.context.state_processing_manager.current_output_state = (
+ DummyExecutor.process_state(current_input_state, 0)
+ )
+
+ monkeypatch.setattr(main_loop, "_switch_context", fake_switch_context)
+
+ first_state = State()
+ first_state.add("value", 1)
+ second_state = State()
+ second_state.add("value", 41)
+
+ main_loop._process_state(first_state)
+ main_loop._process_state(second_state)
+
+ first_output: DataElement = output_queue.get()
+ second_output: DataElement = output_queue.get()
+
+ assert first_output.tag == mock_data_output_channel
+ assert isinstance(first_output.payload, StateFrame)
+ assert first_output.payload.frame["value"] == 2
+ assert first_output.payload.frame["port"] == 0
+
+ assert second_output.tag == mock_data_output_channel
+ assert isinstance(second_output.payload, StateFrame)
+ assert second_output.payload.frame["value"] == 42
+ assert second_output.payload.frame["port"] == 0
+
+ @pytest.mark.timeout(2)
+ def test_main_loop_thread_can_process_state(
+ self,
+ mock_data_output_channel,
+ mock_control_output_channel,
+ input_queue,
+ output_queue,
+ main_loop,
+ main_loop_thread,
+ mock_assign_input_port,
+ mock_assign_output_port,
+ mock_add_input_channel,
+ mock_add_partitioning,
+ mock_initialize_executor,
+ mock_state_data_elements,
+ mock_end_of_upstream,
+ state_processing_executor,
+ command_sequence,
+ reraise,
+ ):
+ # End-to-end coverage of the state-processing path through the real
+ # MainLoop + DataProcessor threads. The single-switch state handshake
+ # in MainLoop.process_input_state means each state is emitted in its
+ # own cycle (no lag), and an EndChannel ECM after the last state
+ # produces an additional output via produce_state_on_finish.
+ main_loop_thread.start()
+
+ for setup_msg in [
+ mock_assign_input_port,
+ mock_assign_output_port,
+ mock_add_input_channel,
+ mock_add_partitioning,
+ mock_initialize_executor,
+ ]:
+ input_queue.put(setup_msg)
+ assert output_queue.get() == DCMElement(
+ tag=mock_control_output_channel,
+ payload=DirectControlMessagePayloadV2(
+ return_invocation=ReturnInvocation(
+ command_id=command_sequence,
+ return_value=ControlReturn(empty_return=EmptyReturn()),
+ )
+ ),
+ )
+
+ # Going through the InitializeExecutor RPC above sets up the rest of
+ # the worker state (output schema, partitioning bookkeeping). Swap
+ # the executor instance with the test helper here so the test can
+ # assert the executor's process_state and produce_state_on_finish
+ # actually ran, without depending on Python's cross-test module
+ # caching for operator classes loaded via OpExecWithCode.
+ main_loop.context.executor_manager.executor = state_processing_executor
+
+ # Send four states. With the lag-free state pipeline we expect each
+ # state to produce its own output in order.
+ for state_element in mock_state_data_elements:
+ input_queue.put(state_element)
+
+ for expected_value in (1, 2, 3, 4):
+ output_data_element: DataElement = output_queue.get()
+ assert output_data_element.tag == mock_data_output_channel
+ assert isinstance(output_data_element.payload, StateFrame), (
+ f"expected StateFrame for value={expected_value}, got "
+ f"{type(output_data_element.payload).__name__}"
+ )
+ output_state = output_data_element.payload.frame
+ assert output_state["value"] == expected_value, (
+ f"state outputs arrived out of order: expected value="
+ f"{expected_value}, got value={output_state['value']}"
+ )
+ assert output_state["processed_marker"] == "executed"
+ assert output_state["port"] == 0
+
+ # Send EndChannel to drive _process_end_channel. The executor's
+ # produce_state_on_finish writes a finish-marker state into
+ # current_output_state inside DataProc's process_internal_marker;
+ # MainLoop's process_input_state then emits it.
+ input_queue.put(mock_end_of_upstream)
+
+ # Drain the control reply messages so the next data
+ # output_queue.get() returns the post-EndChannel data emission.
+ output_queue.disable_data(InternalQueue.DisableType.DISABLE_BY_PAUSE)
+ for _ in range(3):
+ control_reply = output_queue.get()
+ assert isinstance(control_reply, DCMElement), (
+ f"expected DCMElement during EndChannel teardown, got "
+ f"{type(control_reply).__name__}"
+ )
+ output_queue.enable_data(InternalQueue.DisableType.DISABLE_BY_PAUSE)
+
+ end_channel_state_output: DataElement = output_queue.get()
+ assert end_channel_state_output.tag == mock_data_output_channel
+ assert isinstance(end_channel_state_output.payload, StateFrame), (
+ f"expected StateFrame for the EndChannel-driven emission, got "
+ f"{type(end_channel_state_output.payload).__name__}"
+ )
+ end_channel_state = end_channel_state_output.payload.frame
+ assert "finish_marker" in end_channel_state.__dict__, (
+ f"EndChannel emission should be the finish-marker state from "
+ f"produce_state_on_finish, got {end_channel_state!r}"
+ )
+ assert end_channel_state["finish_marker"] ==
"produce_state_on_finish_ran"
+
+ reraise()
+
+ @pytest.mark.timeout(2)
+ def test_main_loop_thread_can_process_state_after_tuple(
+ self,
+ mock_data_output_channel,
+ mock_control_output_channel,
+ input_queue,
+ output_queue,
+ main_loop,
+ main_loop_thread,
+ mock_assign_input_port,
+ mock_assign_output_port,
+ mock_add_input_channel,
+ mock_add_partitioning,
+ mock_initialize_executor,
+ mock_data_element,
+ mock_state_data_elements,
+ state_processing_executor,
+ command_sequence,
+ reraise,
+ ):
+ # Coverage for the mixed (tuple, then state) input sequence: a
+ # tuple followed by several state DataElements should still emit
+ # every state's processed output in order.
+ main_loop_thread.start()
+
+ for setup_msg in [
+ mock_assign_input_port,
+ mock_assign_output_port,
+ mock_add_input_channel,
+ mock_add_partitioning,
+ mock_initialize_executor,
+ ]:
+ input_queue.put(setup_msg)
+ assert output_queue.get() == DCMElement(
+ tag=mock_control_output_channel,
+ payload=DirectControlMessagePayloadV2(
+ return_invocation=ReturnInvocation(
+ command_id=command_sequence,
+ return_value=ControlReturn(empty_return=EmptyReturn()),
+ )
+ ),
+ )
+
+ main_loop.context.executor_manager.executor = state_processing_executor
+
+ # Tuple first, then four states.
+ input_queue.put(mock_data_element)
+ warmup_output: DataElement = output_queue.get()
+ assert warmup_output.tag == mock_data_output_channel
+ assert isinstance(warmup_output.payload, DataFrame)
+
+ for state_element in mock_state_data_elements:
+ input_queue.put(state_element)
+
+ for expected_value in (1, 2, 3, 4):
+ output_data_element: DataElement = output_queue.get()
+ assert output_data_element.tag == mock_data_output_channel
+ assert isinstance(output_data_element.payload, StateFrame), (
+ f"expected StateFrame for value={expected_value}, got "
+ f"{type(output_data_element.payload).__name__}"
+ )
+ output_state = output_data_element.payload.frame
+ assert output_state["value"] == expected_value, (
+ f"state outputs after a tuple arrived out of order: "
+ f"expected value={expected_value}, "
+ f"got value={output_state['value']}"
+ )
+ assert output_state["processed_marker"] == "executed"
+
+ reraise()