This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 43b698cb70 test(amber): add unit test coverage for engine common Utils 
(#4743)
43b698cb70 is described below

commit 43b698cb70e506e5d90b672024458fbddde7e305
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun May 3 01:52:29 2026 -0700

    test(amber): add unit test coverage for engine common Utils (#4743)
    
    ### What changes were proposed in this PR?
    
    Add `UtilsSpec` covering the helpers in
    `org.apache.texera.amber.engine.common.Utils`
    (`amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala`):
    
    - `aggregatedStateToString` round-trips every named
    `WorkflowAggregatedState` through `stringToAggregatedState`, and renders
    the `Unrecognized` variant with its raw value
    - `stringToAggregatedState` is case-insensitive, trims whitespace,
    accepts the `Initializing` alias for `READY`, and throws
    `IllegalArgumentException` for unknown names
    - `maptoStatusCode` returns the documented byte codes for known states
    and `-1` for the rest
    - `retry` returns immediately on success, retries on failure up to the
    attempt limit, and rethrows after exhaustion
    - `withLock` releases the lock after both normal return and exception
    
    ### Any related issues, documentation, discussions?
    
    Closes #4742
    
    ### How was this PR tested?
    
    `sbt "WorkflowExecutionService/testOnly
    org.apache.texera.amber.engine.common.UtilsSpec"` — 12/12 tests pass.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.7)
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../texera/amber/engine/common/UtilsSpec.scala     | 156 +++++++++++++++++++++
 1 file changed, 156 insertions(+)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/common/UtilsSpec.scala 
b/amber/src/test/scala/org/apache/texera/amber/engine/common/UtilsSpec.scala
new file mode 100644
index 0000000000..13674139f1
--- /dev/null
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/common/UtilsSpec.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.common
+
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
+import org.scalatest.flatspec.AnyFlatSpec
+
+import java.util.concurrent.locks.ReentrantLock
+
+class UtilsSpec extends AnyFlatSpec {
+
+  // -- aggregatedStateToString ----------------------------------------------
+
+  "Utils.aggregatedStateToString" should "round-trip every named 
WorkflowAggregatedState through stringToAggregatedState" in {
+    val namedStates = Seq(
+      WorkflowAggregatedState.UNINITIALIZED,
+      WorkflowAggregatedState.READY,
+      WorkflowAggregatedState.RUNNING,
+      WorkflowAggregatedState.PAUSING,
+      WorkflowAggregatedState.PAUSED,
+      WorkflowAggregatedState.RESUMING,
+      WorkflowAggregatedState.COMPLETED,
+      WorkflowAggregatedState.TERMINATED,
+      WorkflowAggregatedState.FAILED,
+      WorkflowAggregatedState.KILLED,
+      WorkflowAggregatedState.UNKNOWN
+    )
+    namedStates.foreach { state =>
+      assert(
+        Utils.stringToAggregatedState(Utils.aggregatedStateToString(state)) == 
state,
+        s"round-trip failed for $state"
+      )
+    }
+  }
+
+  it should "render an unrecognized aggregated state with its raw value" in {
+    val unrecognized = WorkflowAggregatedState.Unrecognized(99)
+    assert(Utils.aggregatedStateToString(unrecognized) == "Unrecognized(99)")
+  }
+
+  // -- stringToAggregatedState ----------------------------------------------
+
+  "Utils.stringToAggregatedState" should "be case-insensitive and tolerant of 
surrounding whitespace" in {
+    assert(Utils.stringToAggregatedState("RUNNING") == 
WorkflowAggregatedState.RUNNING)
+    assert(Utils.stringToAggregatedState("running") == 
WorkflowAggregatedState.RUNNING)
+    assert(Utils.stringToAggregatedState("  Running  ") == 
WorkflowAggregatedState.RUNNING)
+  }
+
+  it should "accept 'Initializing' as an alias for READY" in {
+    assert(Utils.stringToAggregatedState("Initializing") == 
WorkflowAggregatedState.READY)
+    assert(Utils.stringToAggregatedState("ready") == 
WorkflowAggregatedState.READY)
+  }
+
+  it should "throw IllegalArgumentException for an unrecognized state name" in 
{
+    assertThrows[IllegalArgumentException] {
+      Utils.stringToAggregatedState("not-a-real-state")
+    }
+  }
+
+  // -- maptoStatusCode ------------------------------------------------------
+
+  "Utils.maptoStatusCode" should "map known states to their documented byte 
codes" in {
+    assert(Utils.maptoStatusCode(WorkflowAggregatedState.UNINITIALIZED) == 
0.toByte)
+    assert(Utils.maptoStatusCode(WorkflowAggregatedState.READY) == 0.toByte)
+    assert(Utils.maptoStatusCode(WorkflowAggregatedState.RUNNING) == 1.toByte)
+    assert(Utils.maptoStatusCode(WorkflowAggregatedState.PAUSED) == 2.toByte)
+    assert(Utils.maptoStatusCode(WorkflowAggregatedState.COMPLETED) == 
3.toByte)
+    assert(Utils.maptoStatusCode(WorkflowAggregatedState.FAILED) == 4.toByte)
+    assert(Utils.maptoStatusCode(WorkflowAggregatedState.KILLED) == 5.toByte)
+  }
+
+  it should "return -1 for states that have no documented code" in {
+    Seq(
+      WorkflowAggregatedState.PAUSING,
+      WorkflowAggregatedState.RESUMING,
+      WorkflowAggregatedState.TERMINATED,
+      WorkflowAggregatedState.UNKNOWN
+    ).foreach { state =>
+      assert(Utils.maptoStatusCode(state) == -1.toByte, s"expected -1 for 
$state")
+    }
+  }
+
+  // -- retry ---------------------------------------------------------------
+
+  "Utils.retry" should "return the value on the first successful attempt 
without retrying" in {
+    var calls = 0
+    val result = Utils.retry(attempts = 3, baseBackoffTimeInMS = 0L) {
+      calls += 1
+      "ok"
+    }
+    assert(result == "ok")
+    assert(calls == 1)
+  }
+
+  it should "retry on failure until success and return the eventual result" in 
{
+    var calls = 0
+    val result = Utils.retry(attempts = 3, baseBackoffTimeInMS = 0L) {
+      calls += 1
+      if (calls < 2) throw new RuntimeException("transient")
+      "ok"
+    }
+    assert(result == "ok")
+    assert(calls == 2)
+  }
+
+  it should "rethrow the last exception after exhausting all attempts" in {
+    var calls = 0
+    val ex = intercept[RuntimeException] {
+      Utils.retry(attempts = 2, baseBackoffTimeInMS = 0L) {
+        calls += 1
+        throw new RuntimeException(s"failure-$calls")
+      }
+    }
+    assert(calls == 2)
+    assert(ex.getMessage == "failure-2")
+  }
+
+  // -- withLock ------------------------------------------------------------
+
+  "Utils.withLock" should "release the lock after the body returns" in {
+    implicit val lock: ReentrantLock = new ReentrantLock()
+    val result = Utils.withLock {
+      assert(lock.isHeldByCurrentThread)
+      42
+    }
+    assert(result == 42)
+    assert(!lock.isHeldByCurrentThread)
+  }
+
+  it should "release the lock when the body throws" in {
+    implicit val lock: ReentrantLock = new ReentrantLock()
+    intercept[RuntimeException] {
+      Utils.withLock[Unit] {
+        throw new RuntimeException("boom")
+      }
+    }
+    assert(!lock.isHeldByCurrentThread)
+  }
+}

Reply via email to