This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 2812cad659 test(amber): add unit test coverage for WorkerStateManager 
(#4740)
2812cad659 is described below

commit 2812cad659871849edea85d4018a452e1300c6ba
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat May 2 23:21:17 2026 -0700

    test(amber): add unit test coverage for WorkerStateManager (#4740)
    
    ### What changes were proposed in this PR?
    
    Add `WorkerStateManagerSpec` covering the state-transition graph
    hardcoded by `WorkerStateManager`
    
(`amber/src/main/scala/org/apache/texera/amber/engine/common/statetransition/WorkerStateManager.scala`):
    
    - Default initial state is UNINITIALIZED; explicit initial state is
    honored
    - Allowed: UNINITIALIZED → READY, READY → {RUNNING, PAUSED, COMPLETED},
    RUNNING → {PAUSED, COMPLETED}, PAUSED → RUNNING
    - COMPLETED is terminal (rejects forward transitions; self-transition is
    a no-op per `StateManager.transitTo` semantics)
    - TERMINATED is absent from the graph and rejected from any state
    - PAUSED → COMPLETED is rejected (only RUNNING → COMPLETED is permitted)
    
    ### Any related issues, documentation, discussions?
    
    Closes #4739
    
    ### How was this PR tested?
    
    `sbt "WorkflowExecutionService/testOnly
    
org.apache.texera.amber.engine.common.statetransition.WorkerStateManagerSpec"`
    — 9/9 tests pass.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.7)
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../statetransition/WorkerStateManagerSpec.scala   | 131 +++++++++++++++++++++
 1 file changed, 131 insertions(+)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/common/statetransition/WorkerStateManagerSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/common/statetransition/WorkerStateManagerSpec.scala
new file mode 100644
index 0000000000..751c029ee4
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/common/statetransition/WorkerStateManagerSpec.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.common.statetransition
+
+import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
+import 
org.apache.texera.amber.engine.architecture.worker.statistics.WorkerState
+import 
org.apache.texera.amber.engine.architecture.worker.statistics.WorkerState._
+import 
org.apache.texera.amber.engine.common.statetransition.StateManager.InvalidTransitionException
+import org.scalatest.flatspec.AnyFlatSpec
+
+class WorkerStateManagerSpec extends AnyFlatSpec {
+
+  private val actorId: ActorVirtualIdentity = 
ActorVirtualIdentity("test-worker")
+
+  // No default here on purpose: the "default to UNINITIALIZED" test below must
+  // exercise WorkerStateManager's own default-parameter contract, not the
+  // helper's.
+  private def newManager(initial: WorkerState): WorkerStateManager =
+    new WorkerStateManager(actorId, initial)
+
+  "WorkerStateManager" should "default to the UNINITIALIZED state" in {
+    // Construct without an explicit initial so the test would catch a change
+    // to WorkerStateManager's default-parameter value.
+    assert(new WorkerStateManager(actorId).getCurrentState == UNINITIALIZED)
+  }
+
+  it should "honor the explicit initial state when provided" in {
+    assert(newManager(READY).getCurrentState == READY)
+  }
+
+  // -- Allowed transitions per the documented graph (one test per edge) --
+
+  it should "allow UNINITIALIZED -> READY" in {
+    val sm = newManager(UNINITIALIZED)
+    sm.transitTo(READY)
+    assert(sm.getCurrentState == READY)
+  }
+
+  it should "allow READY -> RUNNING" in {
+    val sm = newManager(READY)
+    sm.transitTo(RUNNING)
+    assert(sm.getCurrentState == RUNNING)
+  }
+
+  it should "allow RUNNING -> PAUSED" in {
+    val sm = newManager(RUNNING)
+    sm.transitTo(PAUSED)
+    assert(sm.getCurrentState == PAUSED)
+  }
+
+  it should "allow PAUSED -> RUNNING" in {
+    val sm = newManager(PAUSED)
+    sm.transitTo(RUNNING)
+    assert(sm.getCurrentState == RUNNING)
+  }
+
+  it should "allow RUNNING -> COMPLETED" in {
+    val sm = newManager(RUNNING)
+    sm.transitTo(COMPLETED)
+    assert(sm.getCurrentState == COMPLETED)
+  }
+
+  it should "allow READY -> PAUSED" in {
+    val sm = newManager(READY)
+    sm.transitTo(PAUSED)
+    assert(sm.getCurrentState == PAUSED)
+  }
+
+  it should "allow READY -> COMPLETED" in {
+    val sm = newManager(READY)
+    sm.transitTo(COMPLETED)
+    assert(sm.getCurrentState == COMPLETED)
+  }
+
+  // -- Disallowed transitions --
+
+  it should "reject UNINITIALIZED -> RUNNING (must go through READY)" in {
+    val sm = newManager(UNINITIALIZED)
+    intercept[InvalidTransitionException] {
+      sm.transitTo(RUNNING)
+    }
+  }
+
+  it should "treat COMPLETED as a terminal state" in {
+    val sm = newManager(COMPLETED)
+    intercept[InvalidTransitionException] {
+      sm.transitTo(RUNNING)
+    }
+    intercept[InvalidTransitionException] {
+      sm.transitTo(READY)
+    }
+    // Self-transition is a no-op, not an exception.
+    sm.transitTo(COMPLETED)
+    assert(sm.getCurrentState == COMPLETED)
+  }
+
+  it should "reject transitions into TERMINATED from every reachable source 
state" in {
+    // TERMINATED is absent from the graph entirely, so it must be unreachable
+    // from any state in the graph — including the COMPLETED terminal state.
+    Seq(UNINITIALIZED, READY, RUNNING, PAUSED, COMPLETED).foreach { from =>
+      val sm = newManager(from)
+      intercept[InvalidTransitionException] {
+        sm.transitTo(TERMINATED)
+      }
+    }
+  }
+
+  it should "reject PAUSED -> COMPLETED (only RUNNING -> COMPLETED is 
permitted)" in {
+    val sm = newManager(PAUSED)
+    intercept[InvalidTransitionException] {
+      sm.transitTo(COMPLETED)
+    }
+  }
+}

Reply via email to