This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 8b8a8149dc test(amber): add unit tests for workflow-core
PartitionInfo, WorkflowRuntimeException, WorkflowContext (#4806)
8b8a8149dc is described below
commit 8b8a8149dc5bec0caf41f6562ac6d438adfd37f6
Author: Yicong Huang <[email protected]>
AuthorDate: Mon May 4 02:04:28 2026 -0700
test(amber): add unit tests for workflow-core PartitionInfo,
WorkflowRuntimeException, WorkflowContext (#4806)
### What changes were proposed in this PR?
Adds scalatest coverage for three small `common/workflow-core` modules
that had no dedicated specs:
- `PartitionInfoSpec` — `satisfies` reflexivity + UnknownPartition
universal-accepter + non-symmetric semantics, `merge`
same/different/RangePartition-override, `RangePartition.apply` factory's
empty-names fallback to UnknownPartition, `@JsonSubTypes` registration
set, case-class equality.
- `WorkflowRuntimeExceptionSpec` — all four constructor overloads
(message-only, message + cause + workerId, cause + workerId, cause-only,
no-arg), null-cause fallback, `toString` returning the raw message
field.
- `WorkflowContextSpec` — companion-object pinned defaults (workflow id
`1L`, execution id `1L`), default-constructed instance honours those,
`var` field reassignment, constructor argument override.
### Any related issues, documentation, discussions?
Closes #4804.
Bug pinned in the spec with explanatory comment (filed separately as a
Bug issue): the `@JsonSubTypes` annotation on `PartitionInfo` does not
register `OneToOnePartition`, so Jackson cannot deserialize a
polymorphic payload that selects that subclass via the `type` field. The
other five concrete partitions are all registered.
### How was this PR tested?
```
sbt scalafmtCheckAll
sbt "WorkflowCore/testOnly
org.apache.texera.amber.core.workflow.PartitionInfoSpec
org.apache.texera.amber.core.WorkflowRuntimeExceptionSpec
org.apache.texera.amber.core.workflow.WorkflowContextSpec"
```
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../amber/core/WorkflowRuntimeExceptionSpec.scala | 83 ++++++++++++++++++++++
.../amber/core/workflow/PartitionInfoSpec.scala | 58 +++++++++++++++
.../amber/core/workflow/WorkflowContextSpec.scala | 62 ++++++++++++++++
3 files changed, 203 insertions(+)
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/WorkflowRuntimeExceptionSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/WorkflowRuntimeExceptionSpec.scala
new file mode 100644
index 0000000000..da77b9a21f
--- /dev/null
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/WorkflowRuntimeExceptionSpec.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.core
+
+import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class WorkflowRuntimeExceptionSpec extends AnyFlatSpec with Matchers {
+
+ private val worker = ActorVirtualIdentity("Worker:WF1-myOp-main-0")
+
+ "WorkflowRuntimeException(message)" should "carry the message and default to
no related worker" in {
+ val ex = new WorkflowRuntimeException("boom")
+ ex.message shouldBe "boom"
+ ex.relatedWorkerId shouldBe None
+ ex.getCause shouldBe null
+ }
+
+ "WorkflowRuntimeException(message, cause, workerId)" should "preserve
message, attach cause, and record the worker" in {
+ val cause = new IllegalStateException("inner")
+ val ex = new WorkflowRuntimeException("outer", cause, Some(worker))
+ ex.message shouldBe "outer"
+ ex.getCause should be theSameInstanceAs cause
+ ex.relatedWorkerId shouldBe Some(worker)
+ }
+
+ "WorkflowRuntimeException(cause, workerId)" should "derive the message from
cause.toString" in {
+ val cause = new IllegalArgumentException("bad arg")
+ val ex = new WorkflowRuntimeException(cause, Some(worker))
+ ex.message shouldBe cause.toString
+ ex.getCause should be theSameInstanceAs cause
+ ex.relatedWorkerId shouldBe Some(worker)
+ }
+
+ "WorkflowRuntimeException(cause)" should "derive the message and leave the
worker unset" in {
+ val cause = new RuntimeException("inner")
+ val ex = new WorkflowRuntimeException(cause)
+ ex.message shouldBe cause.toString
+ ex.getCause should be theSameInstanceAs cause
+ ex.relatedWorkerId shouldBe None
+ }
+
+ it should "fall back to a null message when the cause is null" in {
+ // Pin: `Option(cause).map(_.toString).orNull` returns null for a null
+ // cause, which then propagates into RuntimeException(null) — the parent
+ // exception accepts that and reports getMessage as null.
+ val ex = new WorkflowRuntimeException(null: Throwable)
+ ex.message shouldBe null
+ ex.getCause shouldBe null
+ }
+
+ "WorkflowRuntimeException()" should "produce a message-less exception with
no cause and no worker" in {
+ val ex = new WorkflowRuntimeException()
+ ex.message shouldBe null
+ ex.relatedWorkerId shouldBe None
+ ex.getCause shouldBe null
+ }
+
+ "toString" should "return the raw message field rather than the JVM default"
in {
+ // The override returns `message` (or null) — not RuntimeException's
+ // default `<class>: <message>` format.
+ val ex = new WorkflowRuntimeException("oops")
+ ex.toString shouldBe "oops"
+ }
+}
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/PartitionInfoSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/PartitionInfoSpec.scala
index ad7c7d65dc..867c6b8e79 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/PartitionInfoSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/PartitionInfoSpec.scala
@@ -19,6 +19,7 @@
package org.apache.texera.amber.core.workflow
+import com.fasterxml.jackson.annotation.JsonSubTypes
import org.scalatest.flatspec.AnyFlatSpec
class PartitionInfoSpec extends AnyFlatSpec {
@@ -156,4 +157,61 @@ class PartitionInfoSpec extends AnyFlatSpec {
assert(rp.rangeMin == 0L)
assert(rp.rangeMax == 10L)
}
+
+ //
---------------------------------------------------------------------------
+ // HashPartition default attribute list
+ //
---------------------------------------------------------------------------
+
+ "HashPartition()" should "default to an empty hash attribute list" in {
+ assert(HashPartition().hashAttributeNames.isEmpty)
+ }
+
+ //
---------------------------------------------------------------------------
+ // JsonSubTypes registration
+ //
---------------------------------------------------------------------------
+
+ "PartitionInfo @JsonSubTypes" should
+ "list the current registration set (omits OneToOnePartition)" in {
+ // Pin: the @JsonSubTypes annotation on PartitionInfo currently registers
+ // HashPartition, RangePartition, SinglePartition, BroadcastPartition,
+ // and UnknownPartition — but NOT OneToOnePartition. The "all" claim is
+ // documented separately in the pendingUntilFixed test below so this
+ // spec only documents the present-day set.
+ val annotation =
classOf[PartitionInfo].getAnnotation(classOf[JsonSubTypes])
+ val registered =
annotation.value().toList.map(_.value().getSimpleName).toSet
+ assert(
+ registered == Set(
+ "HashPartition",
+ "RangePartition",
+ "SinglePartition",
+ "BroadcastPartition",
+ "UnknownPartition"
+ )
+ )
+ }
+
+ it should "eventually register every concrete PartitionInfo subclass
(pendingUntilFixed)" in pendingUntilFixed {
+ // Intended contract: every concrete PartitionInfo subtype must be
+ // reachable through the polymorphic dispatch on `type`, otherwise
+ // Jackson cannot deserialize the missing payload (today: OneToOne-
+ // Partition). Asserting `contains "OneToOnePartition"` here flips
+ // this test from Pending to a real pass once the bug is fixed —
+ // pendingUntilFixed inverts that and turns the now-passing
+ // assertion into a failure so the fix has to delete the marker
+ // deliberately.
+ val annotation =
classOf[PartitionInfo].getAnnotation(classOf[JsonSubTypes])
+ val registered =
annotation.value().toList.map(_.value().getSimpleName).toSet
+ assert(registered.contains("OneToOnePartition"))
+ }
+
+ //
---------------------------------------------------------------------------
+ // case-class equality
+ //
---------------------------------------------------------------------------
+
+ "PartitionInfo case classes" should "use structural equality (case-class
semantics)" in {
+ assert(HashPartition(List("k")) == HashPartition(List("k")))
+ assert(HashPartition(List("k")) != HashPartition(List("other")))
+ assert(SinglePartition() == SinglePartition())
+ assert(UnknownPartition() == UnknownPartition())
+ }
}
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowContextSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowContextSpec.scala
new file mode 100644
index 0000000000..2407531e99
--- /dev/null
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowContextSpec.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.core.workflow
+
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class WorkflowContextSpec extends AnyFlatSpec with Matchers {
+
+ "WorkflowContext companion" should "expose pinned defaults" in {
+ // These constants seed every default-constructed WorkflowContext, and the
+ // engine's bootstrap path relies on the exact 1L identifiers for both
+ // workflow and execution. Pinning so a renumber is reviewed deliberately.
+ WorkflowContext.DEFAULT_WORKFLOW_ID shouldBe WorkflowIdentity(1L)
+ WorkflowContext.DEFAULT_EXECUTION_ID shouldBe ExecutionIdentity(1L)
+ WorkflowContext.DEFAULT_WORKFLOW_SETTINGS shouldBe WorkflowSettings()
+ }
+
+ "default WorkflowContext" should "use the companion-object defaults" in {
+ val ctx = new WorkflowContext()
+ ctx.workflowId shouldBe WorkflowContext.DEFAULT_WORKFLOW_ID
+ ctx.executionId shouldBe WorkflowContext.DEFAULT_EXECUTION_ID
+ ctx.workflowSettings shouldBe WorkflowContext.DEFAULT_WORKFLOW_SETTINGS
+ }
+
+ "WorkflowContext fields" should "be reassignable through their var
accessors" in {
+ val ctx = new WorkflowContext()
+ ctx.workflowId = WorkflowIdentity(42L)
+ ctx.executionId = ExecutionIdentity(7L)
+ ctx.workflowId shouldBe WorkflowIdentity(42L)
+ ctx.executionId shouldBe ExecutionIdentity(7L)
+ }
+
+ "WorkflowContext constructor" should "accept overridden defaults at
construction time" in {
+ val ctx = new WorkflowContext(
+ workflowId = WorkflowIdentity(99L),
+ executionId = ExecutionIdentity(123L)
+ )
+ ctx.workflowId shouldBe WorkflowIdentity(99L)
+ ctx.executionId shouldBe ExecutionIdentity(123L)
+ // Settings argument was not overridden, so the companion default holds.
+ ctx.workflowSettings shouldBe WorkflowContext.DEFAULT_WORKFLOW_SETTINGS
+ }
+}