This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5214-79a1d170b7bcf803f606f81fdc93d15bbc64fb99 in repository https://gitbox.apache.org/repos/asf/texera.git
commit f89c07cffe88eff6f88fcce89f6a4ac41ede239e Author: Meng Wang <[email protected]> AuthorDate: Mon May 25 16:52:36 2026 -0700 fix(pyamber): allow worker state to transit READY -> COMPLETED (#5214) ### What changes were proposed in this PR? A Python UDF worker whose upstream produces zero tuples receives an `EndChannel` marker before any data, so it never visits `RUNNING`. When `_process_end_channel` calls `complete()`, the state machine in `Context` rejected `READY → COMPLETED` and the worker died with `InvalidTransitionException`, leaving the downstream operator stuck and the workflow hung. The Scala-side `WorkerStateManager` already lists `COMPLETED` in `READY`'s allowed targets — this is a Python ↔ Scala parity drift. Add `WorkerState.COMPLETED` to the Python `READY` set. Also lift the state-transition graph out of `Context.__init__` into a module-level `WORKER_STATE_TRANSITIONS` constant so the test fixture can import it (single source of truth — the previous fixture independently duplicated the graph, which is what masked the parity gap from existing tests). ### Any related issues, documentation, discussions? Closes #5197. ### How was this PR tested? Added a regression test in `test_state_manager.py` covering `READY → COMPLETED`. Also manually verified the issue's reproducer workflow now completes; previously hung with the worker stuck in `READY`. ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-7) --- .../python/core/architecture/managers/context.py | 25 +++++++++++++++------- .../architecture/managers/test_state_manager.py | 20 ++++++++--------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/amber/src/main/python/core/architecture/managers/context.py b/amber/src/main/python/core/architecture/managers/context.py index 3629a435a7..7a35c37c05 100644 --- a/amber/src/main/python/core/architecture/managers/context.py +++ b/amber/src/main/python/core/architecture/managers/context.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -from typing import Optional +from typing import Dict, Optional, Set from proto.org.apache.texera.amber.core import ActorVirtualIdentity, ChannelIdentity from proto.org.apache.texera.amber.engine.architecture.worker import WorkerState @@ -34,6 +34,21 @@ from ..packaging.output_manager import OutputManager from ...models import InternalQueue +# State-transition graph for the Python worker. Mirrors the Scala +# `WorkerStateManager` so both language runtimes recognize the same worker +# lifecycle. A worker may transition `READY -> COMPLETED` directly without +# entering `RUNNING` — this is the path taken when there is nothing to +# process (e.g. the upstream port emits zero tuples before signaling +# end-of-stream). +WORKER_STATE_TRANSITIONS: Dict[WorkerState, Set[WorkerState]] = { + WorkerState.UNINITIALIZED: {WorkerState.READY}, + WorkerState.READY: {WorkerState.PAUSED, WorkerState.RUNNING, WorkerState.COMPLETED}, + WorkerState.RUNNING: {WorkerState.PAUSED, WorkerState.COMPLETED}, + WorkerState.PAUSED: {WorkerState.RUNNING}, + WorkerState.COMPLETED: set(), +} + + class Context: """ Manages context of command handlers. Many of those attributes belongs to the DP @@ -52,13 +67,7 @@ class Context: self.state_processing_manager = StateProcessingManager() self.exception_manager = ExceptionManager() self.state_manager = StateManager( - { - WorkerState.UNINITIALIZED: {WorkerState.READY}, - WorkerState.READY: {WorkerState.PAUSED, WorkerState.RUNNING}, - WorkerState.RUNNING: {WorkerState.PAUSED, WorkerState.COMPLETED}, - WorkerState.PAUSED: {WorkerState.RUNNING}, - WorkerState.COMPLETED: set(), - }, + WORKER_STATE_TRANSITIONS, WorkerState.UNINITIALIZED, ) diff --git a/amber/src/test/python/core/architecture/managers/test_state_manager.py b/amber/src/test/python/core/architecture/managers/test_state_manager.py index 9ebe6e847c..464c64194f 100644 --- a/amber/src/test/python/core/architecture/managers/test_state_manager.py +++ b/amber/src/test/python/core/architecture/managers/test_state_manager.py @@ -17,6 +17,7 @@ import pytest +from core.architecture.managers.context import WORKER_STATE_TRANSITIONS from core.architecture.managers.state_manager import ( InvalidStateException, InvalidTransitionException, @@ -28,16 +29,7 @@ from proto.org.apache.texera.amber.engine.architecture.worker import WorkerState class TestStateManager: @pytest.fixture def state_manager(self): - return StateManager( - { - WorkerState.UNINITIALIZED: {WorkerState.READY}, - WorkerState.READY: {WorkerState.PAUSED, WorkerState.RUNNING}, - WorkerState.RUNNING: {WorkerState.PAUSED, WorkerState.COMPLETED}, - WorkerState.PAUSED: {WorkerState.RUNNING}, - WorkerState.COMPLETED: set(), - }, - WorkerState.UNINITIALIZED, - ) + return StateManager(WORKER_STATE_TRANSITIONS, WorkerState.UNINITIALIZED) def test_it_can_init(self, state_manager): pass @@ -72,3 +64,11 @@ class TestStateManager: with pytest.raises(InvalidStateException): state_manager.assert_state(WorkerState.COMPLETED) + + def test_it_can_transit_directly_from_ready_to_completed(self, state_manager): + # A worker can complete directly from READY without first entering + # RUNNING. This path is taken when there is nothing to process + # (upstream signals end-of-stream before any data arrives). + state_manager.transit_to(WorkerState.READY) + state_manager.transit_to(WorkerState.COMPLETED) + state_manager.assert_state(WorkerState.COMPLETED)
