This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 43b698cb70 test(amber): add unit test coverage for engine common Utils
(#4743)
43b698cb70 is described below
commit 43b698cb70e506e5d90b672024458fbddde7e305
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun May 3 01:52:29 2026 -0700
test(amber): add unit test coverage for engine common Utils (#4743)
### What changes were proposed in this PR?
Add `UtilsSpec` covering the helpers in
`org.apache.texera.amber.engine.common.Utils`
(`amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala`):
- `aggregatedStateToString` round-trips every named
`WorkflowAggregatedState` through `stringToAggregatedState`, and renders
the `Unrecognized` variant with its raw value
- `stringToAggregatedState` is case-insensitive, trims whitespace,
accepts the `Initializing` alias for `READY`, and throws
`IllegalArgumentException` for unknown names
- `maptoStatusCode` returns the documented byte codes for known states
and `-1` for the rest
- `retry` returns immediately on success, retries on failure up to the
attempt limit, and rethrows after exhaustion
- `withLock` releases the lock after both normal return and exception
### Any related issues, documentation, discussions?
Closes #4742
### How was this PR tested?
`sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.common.UtilsSpec"` — 12/12 tests pass.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../texera/amber/engine/common/UtilsSpec.scala | 156 +++++++++++++++++++++
1 file changed, 156 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/common/UtilsSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/common/UtilsSpec.scala
new file mode 100644
index 0000000000..13674139f1
--- /dev/null
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/common/UtilsSpec.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.common
+
+import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
+import org.scalatest.flatspec.AnyFlatSpec
+
+import java.util.concurrent.locks.ReentrantLock
+
+class UtilsSpec extends AnyFlatSpec {
+
+ // -- aggregatedStateToString ----------------------------------------------
+
+ "Utils.aggregatedStateToString" should "round-trip every named
WorkflowAggregatedState through stringToAggregatedState" in {
+ val namedStates = Seq(
+ WorkflowAggregatedState.UNINITIALIZED,
+ WorkflowAggregatedState.READY,
+ WorkflowAggregatedState.RUNNING,
+ WorkflowAggregatedState.PAUSING,
+ WorkflowAggregatedState.PAUSED,
+ WorkflowAggregatedState.RESUMING,
+ WorkflowAggregatedState.COMPLETED,
+ WorkflowAggregatedState.TERMINATED,
+ WorkflowAggregatedState.FAILED,
+ WorkflowAggregatedState.KILLED,
+ WorkflowAggregatedState.UNKNOWN
+ )
+ namedStates.foreach { state =>
+ assert(
+ Utils.stringToAggregatedState(Utils.aggregatedStateToString(state)) ==
state,
+ s"round-trip failed for $state"
+ )
+ }
+ }
+
+ it should "render an unrecognized aggregated state with its raw value" in {
+ val unrecognized = WorkflowAggregatedState.Unrecognized(99)
+ assert(Utils.aggregatedStateToString(unrecognized) == "Unrecognized(99)")
+ }
+
+ // -- stringToAggregatedState ----------------------------------------------
+
+ "Utils.stringToAggregatedState" should "be case-insensitive and tolerant of
surrounding whitespace" in {
+ assert(Utils.stringToAggregatedState("RUNNING") ==
WorkflowAggregatedState.RUNNING)
+ assert(Utils.stringToAggregatedState("running") ==
WorkflowAggregatedState.RUNNING)
+ assert(Utils.stringToAggregatedState(" Running ") ==
WorkflowAggregatedState.RUNNING)
+ }
+
+ it should "accept 'Initializing' as an alias for READY" in {
+ assert(Utils.stringToAggregatedState("Initializing") ==
WorkflowAggregatedState.READY)
+ assert(Utils.stringToAggregatedState("ready") ==
WorkflowAggregatedState.READY)
+ }
+
+ it should "throw IllegalArgumentException for an unrecognized state name" in
{
+ assertThrows[IllegalArgumentException] {
+ Utils.stringToAggregatedState("not-a-real-state")
+ }
+ }
+
+ // -- maptoStatusCode ------------------------------------------------------
+
+ "Utils.maptoStatusCode" should "map known states to their documented byte
codes" in {
+ assert(Utils.maptoStatusCode(WorkflowAggregatedState.UNINITIALIZED) ==
0.toByte)
+ assert(Utils.maptoStatusCode(WorkflowAggregatedState.READY) == 0.toByte)
+ assert(Utils.maptoStatusCode(WorkflowAggregatedState.RUNNING) == 1.toByte)
+ assert(Utils.maptoStatusCode(WorkflowAggregatedState.PAUSED) == 2.toByte)
+ assert(Utils.maptoStatusCode(WorkflowAggregatedState.COMPLETED) ==
3.toByte)
+ assert(Utils.maptoStatusCode(WorkflowAggregatedState.FAILED) == 4.toByte)
+ assert(Utils.maptoStatusCode(WorkflowAggregatedState.KILLED) == 5.toByte)
+ }
+
+ it should "return -1 for states that have no documented code" in {
+ Seq(
+ WorkflowAggregatedState.PAUSING,
+ WorkflowAggregatedState.RESUMING,
+ WorkflowAggregatedState.TERMINATED,
+ WorkflowAggregatedState.UNKNOWN
+ ).foreach { state =>
+ assert(Utils.maptoStatusCode(state) == -1.toByte, s"expected -1 for
$state")
+ }
+ }
+
+ // -- retry ---------------------------------------------------------------
+
+ "Utils.retry" should "return the value on the first successful attempt
without retrying" in {
+ var calls = 0
+ val result = Utils.retry(attempts = 3, baseBackoffTimeInMS = 0L) {
+ calls += 1
+ "ok"
+ }
+ assert(result == "ok")
+ assert(calls == 1)
+ }
+
+ it should "retry on failure until success and return the eventual result" in
{
+ var calls = 0
+ val result = Utils.retry(attempts = 3, baseBackoffTimeInMS = 0L) {
+ calls += 1
+ if (calls < 2) throw new RuntimeException("transient")
+ "ok"
+ }
+ assert(result == "ok")
+ assert(calls == 2)
+ }
+
+ it should "rethrow the last exception after exhausting all attempts" in {
+ var calls = 0
+ val ex = intercept[RuntimeException] {
+ Utils.retry(attempts = 2, baseBackoffTimeInMS = 0L) {
+ calls += 1
+ throw new RuntimeException(s"failure-$calls")
+ }
+ }
+ assert(calls == 2)
+ assert(ex.getMessage == "failure-2")
+ }
+
+ // -- withLock ------------------------------------------------------------
+
+ "Utils.withLock" should "release the lock after the body returns" in {
+ implicit val lock: ReentrantLock = new ReentrantLock()
+ val result = Utils.withLock {
+ assert(lock.isHeldByCurrentThread)
+ 42
+ }
+ assert(result == 42)
+ assert(!lock.isHeldByCurrentThread)
+ }
+
+ it should "release the lock when the body throws" in {
+ implicit val lock: ReentrantLock = new ReentrantLock()
+ intercept[RuntimeException] {
+ Utils.withLock[Unit] {
+ throw new RuntimeException("boom")
+ }
+ }
+ assert(!lock.isHeldByCurrentThread)
+ }
+}