This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new f89c07cffe fix(pyamber): allow worker state to transit READY ->
COMPLETED (#5214)
f89c07cffe is described below
commit f89c07cffe88eff6f88fcce89f6a4ac41ede239e
Author: Meng Wang <[email protected]>
AuthorDate: Mon May 25 16:52:36 2026 -0700
fix(pyamber): allow worker state to transit READY -> COMPLETED (#5214)
### What changes were proposed in this PR?
A Python UDF worker whose upstream produces zero tuples receives an
`EndChannel` marker before any data, so it never visits `RUNNING`. When
`_process_end_channel` calls `complete()`, the state machine in
`Context` rejected `READY → COMPLETED` and the worker died with
`InvalidTransitionException`, leaving the downstream operator stuck and
the workflow hung.
The Scala-side `WorkerStateManager` already lists `COMPLETED` in
`READY`'s allowed targets — this is a Python ↔ Scala parity drift. Add
`WorkerState.COMPLETED` to the Python `READY` set.
Also lift the state-transition graph out of `Context.__init__` into a
module-level `WORKER_STATE_TRANSITIONS` constant so the test fixture can
import it (single source of truth — the previous fixture independently
duplicated the graph, which is what masked the parity gap from existing
tests).
### Any related issues, documentation, discussions?
Closes #5197.
### How was this PR tested?
Added a regression test in `test_state_manager.py` covering `READY →
COMPLETED`. Also manually verified the issue's reproducer workflow now
completes; previously hung with the worker stuck in `READY`.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)
---
.../python/core/architecture/managers/context.py | 25 +++++++++++++++-------
.../architecture/managers/test_state_manager.py | 20 ++++++++---------
2 files changed, 27 insertions(+), 18 deletions(-)
diff --git a/amber/src/main/python/core/architecture/managers/context.py
b/amber/src/main/python/core/architecture/managers/context.py
index 3629a435a7..7a35c37c05 100644
--- a/amber/src/main/python/core/architecture/managers/context.py
+++ b/amber/src/main/python/core/architecture/managers/context.py
@@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
-from typing import Optional
+from typing import Dict, Optional, Set
from proto.org.apache.texera.amber.core import ActorVirtualIdentity,
ChannelIdentity
from proto.org.apache.texera.amber.engine.architecture.worker import
WorkerState
@@ -34,6 +34,21 @@ from ..packaging.output_manager import OutputManager
from ...models import InternalQueue
+# State-transition graph for the Python worker. Mirrors the Scala
+# `WorkerStateManager` so both language runtimes recognize the same worker
+# lifecycle. A worker may transition `READY -> COMPLETED` directly without
+# entering `RUNNING` — this is the path taken when there is nothing to
+# process (e.g. the upstream port emits zero tuples before signaling
+# end-of-stream).
+WORKER_STATE_TRANSITIONS: Dict[WorkerState, Set[WorkerState]] = {
+ WorkerState.UNINITIALIZED: {WorkerState.READY},
+ WorkerState.READY: {WorkerState.PAUSED, WorkerState.RUNNING,
WorkerState.COMPLETED},
+ WorkerState.RUNNING: {WorkerState.PAUSED, WorkerState.COMPLETED},
+ WorkerState.PAUSED: {WorkerState.RUNNING},
+ WorkerState.COMPLETED: set(),
+}
+
+
class Context:
"""
Manages context of command handlers. Many of those attributes belongs to
the DP
@@ -52,13 +67,7 @@ class Context:
self.state_processing_manager = StateProcessingManager()
self.exception_manager = ExceptionManager()
self.state_manager = StateManager(
- {
- WorkerState.UNINITIALIZED: {WorkerState.READY},
- WorkerState.READY: {WorkerState.PAUSED, WorkerState.RUNNING},
- WorkerState.RUNNING: {WorkerState.PAUSED,
WorkerState.COMPLETED},
- WorkerState.PAUSED: {WorkerState.RUNNING},
- WorkerState.COMPLETED: set(),
- },
+ WORKER_STATE_TRANSITIONS,
WorkerState.UNINITIALIZED,
)
diff --git
a/amber/src/test/python/core/architecture/managers/test_state_manager.py
b/amber/src/test/python/core/architecture/managers/test_state_manager.py
index 9ebe6e847c..464c64194f 100644
--- a/amber/src/test/python/core/architecture/managers/test_state_manager.py
+++ b/amber/src/test/python/core/architecture/managers/test_state_manager.py
@@ -17,6 +17,7 @@
import pytest
+from core.architecture.managers.context import WORKER_STATE_TRANSITIONS
from core.architecture.managers.state_manager import (
InvalidStateException,
InvalidTransitionException,
@@ -28,16 +29,7 @@ from
proto.org.apache.texera.amber.engine.architecture.worker import WorkerState
class TestStateManager:
@pytest.fixture
def state_manager(self):
- return StateManager(
- {
- WorkerState.UNINITIALIZED: {WorkerState.READY},
- WorkerState.READY: {WorkerState.PAUSED, WorkerState.RUNNING},
- WorkerState.RUNNING: {WorkerState.PAUSED,
WorkerState.COMPLETED},
- WorkerState.PAUSED: {WorkerState.RUNNING},
- WorkerState.COMPLETED: set(),
- },
- WorkerState.UNINITIALIZED,
- )
+ return StateManager(WORKER_STATE_TRANSITIONS,
WorkerState.UNINITIALIZED)
def test_it_can_init(self, state_manager):
pass
@@ -72,3 +64,11 @@ class TestStateManager:
with pytest.raises(InvalidStateException):
state_manager.assert_state(WorkerState.COMPLETED)
+
+ def test_it_can_transit_directly_from_ready_to_completed(self,
state_manager):
+ # A worker can complete directly from READY without first entering
+ # RUNNING. This path is taken when there is nothing to process
+ # (upstream signals end-of-stream before any data arrives).
+ state_manager.transit_to(WorkerState.READY)
+ state_manager.transit_to(WorkerState.COMPLETED)
+ state_manager.assert_state(WorkerState.COMPLETED)