This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5214-79a1d170b7bcf803f606f81fdc93d15bbc64fb99
in repository https://gitbox.apache.org/repos/asf/texera.git

commit f89c07cffe88eff6f88fcce89f6a4ac41ede239e
Author: Meng Wang <[email protected]>
AuthorDate: Mon May 25 16:52:36 2026 -0700

    fix(pyamber): allow worker state to transit READY -> COMPLETED (#5214)
    
    ### What changes were proposed in this PR?
    
    A Python UDF worker whose upstream produces zero tuples receives an
    `EndChannel` marker before any data, so it never visits `RUNNING`. When
    `_process_end_channel` calls `complete()`, the state machine in
    `Context` rejected `READY → COMPLETED` and the worker died with
    `InvalidTransitionException`, leaving the downstream operator stuck and
    the workflow hung.
    
    The Scala-side `WorkerStateManager` already lists `COMPLETED` in
    `READY`'s allowed targets — this is a Python ↔ Scala parity drift. Add
    `WorkerState.COMPLETED` to the Python `READY` set.
    
    Also lift the state-transition graph out of `Context.__init__` into a
    module-level `WORKER_STATE_TRANSITIONS` constant so the test fixture can
    import it (single source of truth — the previous fixture independently
    duplicated the graph, which is what masked the parity gap from existing
    tests).
    
    ### Any related issues, documentation, discussions?
    
    Closes #5197.
    
    ### How was this PR tested?
    
    Added a regression test in `test_state_manager.py` covering `READY →
    COMPLETED`. Also manually verified the issue's reproducer workflow now
    completes; previously hung with the worker stuck in `READY`.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-7)
---
 .../python/core/architecture/managers/context.py   | 25 +++++++++++++++-------
 .../architecture/managers/test_state_manager.py    | 20 ++++++++---------
 2 files changed, 27 insertions(+), 18 deletions(-)

diff --git a/amber/src/main/python/core/architecture/managers/context.py 
b/amber/src/main/python/core/architecture/managers/context.py
index 3629a435a7..7a35c37c05 100644
--- a/amber/src/main/python/core/architecture/managers/context.py
+++ b/amber/src/main/python/core/architecture/managers/context.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Optional
+from typing import Dict, Optional, Set
 
 from proto.org.apache.texera.amber.core import ActorVirtualIdentity, 
ChannelIdentity
 from proto.org.apache.texera.amber.engine.architecture.worker import 
WorkerState
@@ -34,6 +34,21 @@ from ..packaging.output_manager import OutputManager
 from ...models import InternalQueue
 
 
+# State-transition graph for the Python worker. Mirrors the Scala
+# `WorkerStateManager` so both language runtimes recognize the same worker
+# lifecycle. A worker may transition `READY -> COMPLETED` directly without
+# entering `RUNNING` — this is the path taken when there is nothing to
+# process (e.g. the upstream port emits zero tuples before signaling
+# end-of-stream).
+WORKER_STATE_TRANSITIONS: Dict[WorkerState, Set[WorkerState]] = {
+    WorkerState.UNINITIALIZED: {WorkerState.READY},
+    WorkerState.READY: {WorkerState.PAUSED, WorkerState.RUNNING, 
WorkerState.COMPLETED},
+    WorkerState.RUNNING: {WorkerState.PAUSED, WorkerState.COMPLETED},
+    WorkerState.PAUSED: {WorkerState.RUNNING},
+    WorkerState.COMPLETED: set(),
+}
+
+
 class Context:
     """
     Manages context of command handlers. Many of those attributes belongs to 
the DP
@@ -52,13 +67,7 @@ class Context:
         self.state_processing_manager = StateProcessingManager()
         self.exception_manager = ExceptionManager()
         self.state_manager = StateManager(
-            {
-                WorkerState.UNINITIALIZED: {WorkerState.READY},
-                WorkerState.READY: {WorkerState.PAUSED, WorkerState.RUNNING},
-                WorkerState.RUNNING: {WorkerState.PAUSED, 
WorkerState.COMPLETED},
-                WorkerState.PAUSED: {WorkerState.RUNNING},
-                WorkerState.COMPLETED: set(),
-            },
+            WORKER_STATE_TRANSITIONS,
             WorkerState.UNINITIALIZED,
         )
 
diff --git 
a/amber/src/test/python/core/architecture/managers/test_state_manager.py 
b/amber/src/test/python/core/architecture/managers/test_state_manager.py
index 9ebe6e847c..464c64194f 100644
--- a/amber/src/test/python/core/architecture/managers/test_state_manager.py
+++ b/amber/src/test/python/core/architecture/managers/test_state_manager.py
@@ -17,6 +17,7 @@
 
 import pytest
 
+from core.architecture.managers.context import WORKER_STATE_TRANSITIONS
 from core.architecture.managers.state_manager import (
     InvalidStateException,
     InvalidTransitionException,
@@ -28,16 +29,7 @@ from 
proto.org.apache.texera.amber.engine.architecture.worker import WorkerState
 class TestStateManager:
     @pytest.fixture
     def state_manager(self):
-        return StateManager(
-            {
-                WorkerState.UNINITIALIZED: {WorkerState.READY},
-                WorkerState.READY: {WorkerState.PAUSED, WorkerState.RUNNING},
-                WorkerState.RUNNING: {WorkerState.PAUSED, 
WorkerState.COMPLETED},
-                WorkerState.PAUSED: {WorkerState.RUNNING},
-                WorkerState.COMPLETED: set(),
-            },
-            WorkerState.UNINITIALIZED,
-        )
+        return StateManager(WORKER_STATE_TRANSITIONS, 
WorkerState.UNINITIALIZED)
 
     def test_it_can_init(self, state_manager):
         pass
@@ -72,3 +64,11 @@ class TestStateManager:
 
         with pytest.raises(InvalidStateException):
             state_manager.assert_state(WorkerState.COMPLETED)
+
+    def test_it_can_transit_directly_from_ready_to_completed(self, 
state_manager):
+        # A worker can complete directly from READY without first entering
+        # RUNNING. This path is taken when there is nothing to process
+        # (upstream signals end-of-stream before any data arrives).
+        state_manager.transit_to(WorkerState.READY)
+        state_manager.transit_to(WorkerState.COMPLETED)
+        state_manager.assert_state(WorkerState.COMPLETED)

Reply via email to