This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 810263c3ef test(amber): add unit test coverage for PauseType (#4914)
810263c3ef is described below
commit 810263c3ef769dd1f0def924e64390fe8b81739d
Author: Xinyuan Lin <[email protected]>
AuthorDate: Mon May 4 01:28:53 2026 -0700
test(amber): add unit test coverage for PauseType (#4914)
### What changes were proposed in this PR?
Adds `PauseTypeSpec` covering `PauseType`
(amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/PauseType.scala),
the small sealed trait used by the worker pause subsystem (three
singleton kinds + one case-class subtype). It has no test coverage
today.
The new spec pins:
- The three pause singletons (`UserPause`, `BackpressurePause`,
`OperatorLogicPause`) extend `PauseType`, are object-identity stable,
and compare unequal to each other (widened to `PauseType` so the
comparison is not folded away at compile time).
- `ECMPause` is a case class that carries the
`EmbeddedControlMessageIdentity` it was constructed with, supports value
equality and consistent `hashCode` by id, and is distinct from every
singleton PauseType.
- Exhaustive `match` over `PauseType` distinguishes each subtype.
- Set-based coexistence (the contract `PauseManager` actually relies on,
since it stores active pauses in a `HashSet[PauseType]` with no
priority): all four pause kinds coexist as distinct Set elements;
identical pauses deduplicate (singletons collapse, same-id `ECMPause`
collapses); `ECMPause` instances with different ids are independently
tracked.
The multi-pause behavior through
`PauseManager.pause`/`resume`/`isPaused` is covered separately in
`WorkerManagersSpec`.
No production code changed; this is test-only.
### Any related issues, documentation, discussions?
Closes #4913
### How was this PR tested?
Added 10 new unit tests in `PauseTypeSpec`. Verified locally:
```
sbt 'WorkflowExecutionService/Test/testOnly
org.apache.texera.amber.engine.architecture.worker.PauseTypeSpec'
# → Tests: succeeded 10, failed 0
sbt 'WorkflowExecutionService/Test/scalafmtCheck'
# → clean
```
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../engine/architecture/worker/PauseTypeSpec.scala | 145 +++++++++++++++++++++
1 file changed, 145 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/PauseTypeSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/PauseTypeSpec.scala
new file mode 100644
index 0000000000..68af4cec3b
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/PauseTypeSpec.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.worker
+
+import
org.apache.texera.amber.core.virtualidentity.EmbeddedControlMessageIdentity
+import org.scalatest.flatspec.AnyFlatSpec
+
+class PauseTypeSpec extends AnyFlatSpec {
+
+ // --- singletons
------------------------------------------------------------
+ //
+ // The sealed-trait subtype relationship is enforced at compile time by the
+ // type ascriptions (`val u: PauseType = UserPause`, etc.) used below. There
+ // is no runtime test for "singletons extend PauseType" because that would
+ // be tautological — if any singleton stopped extending the trait, this
+ // file would fail to compile.
+
+ "PauseType singletons" should "compare equal to themselves and unequal to
each other" in {
+ // Widen to PauseType so the compiler doesn't reduce inter-singleton
+ // comparisons to constant `false` at compile time.
+ val u: PauseType = UserPause
+ val b: PauseType = BackpressurePause
+ val o: PauseType = OperatorLogicPause
+ assert(u == UserPause)
+ assert(b == BackpressurePause)
+ assert(o == OperatorLogicPause)
+ assert(u != b)
+ assert(u != o)
+ assert(b != o)
+ }
+
+ it should "be the same singleton instance per access (object identity)" in {
+ assert((UserPause: AnyRef) eq UserPause)
+ assert((BackpressurePause: AnyRef) eq BackpressurePause)
+ assert((OperatorLogicPause: AnyRef) eq OperatorLogicPause)
+ }
+
+ // --- ECMPause
--------------------------------------------------------------
+
+ "ECMPause" should "carry the EmbeddedControlMessageIdentity it was
constructed with" in {
+ val id = EmbeddedControlMessageIdentity("ckpt-1")
+ val p = ECMPause(id)
+ assert(p.id == id)
+ }
+
+ it should "support case-class value equality and hashCode (same id → equal)"
in {
+ val a = ECMPause(EmbeddedControlMessageIdentity("ckpt-1"))
+ val b = ECMPause(EmbeddedControlMessageIdentity("ckpt-1"))
+ val c = ECMPause(EmbeddedControlMessageIdentity("ckpt-2"))
+ assert(a == b)
+ assert(a.hashCode == b.hashCode)
+ assert(a != c)
+ }
+
+ it should "not equal any of the singleton PauseTypes" in {
+ // Subtype relationship is already proven by the `: PauseType` ascription;
+ // what we actually want to lock down here is the cross-kind inequality:
+ // an ECMPause (with any id) must not collide with any singleton kind.
+ val p: PauseType = ECMPause(EmbeddedControlMessageIdentity("ckpt"))
+ assert(p != UserPause)
+ assert(p != BackpressurePause)
+ assert(p != OperatorLogicPause)
+ }
+
+ // --- pattern matching
------------------------------------------------------
+
+ "PauseType" should "support exhaustive pattern matching that distinguishes
each subtype" in {
+ def label(p: PauseType): String =
+ p match {
+ case UserPause => "user"
+ case BackpressurePause => "backpressure"
+ case OperatorLogicPause => "operator-logic"
+ case ECMPause(_) => "ecm"
+ }
+ assert(label(UserPause) == "user")
+ assert(label(BackpressurePause) == "backpressure")
+ assert(label(OperatorLogicPause) == "operator-logic")
+ assert(label(ECMPause(EmbeddedControlMessageIdentity("x"))) == "ecm")
+ }
+
+ // --- Set-based coexistence (the contract PauseManager actually relies on)
--
+ // PauseManager stores active pauses in a `HashSet[PauseType]` (additive,
+ // no priority — resuming one type only removes that type). The
override-order
+ // semantics that the data type would need to support priorities don't exist
+ // in PauseType; the data type only has to behave well as Set elements.
+ // These tests pin that contract here. The multi-pause coexistence behavior
+ // through PauseManager.pause/resume/isPaused is covered separately in
+ // WorkerManagersSpec.
+
+ it should "coexist as distinct elements in a Set without aliasing" in {
+ val active: Set[PauseType] = Set(
+ UserPause,
+ BackpressurePause,
+ OperatorLogicPause,
+ ECMPause(EmbeddedControlMessageIdentity("ckpt-1"))
+ )
+ assert(active.size == 4, "all four pause kinds must be distinct Set
elements")
+ assert(active.contains(UserPause))
+ assert(active.contains(BackpressurePause))
+ assert(active.contains(OperatorLogicPause))
+ assert(active.contains(ECMPause(EmbeddedControlMessageIdentity("ckpt-1"))))
+ }
+
+ it should "deduplicate identical pauses inside a Set" in {
+ // PauseManager.pause(t) treats duplicate pauses as a no-op. That works
+ // because Set deduplication leans on PauseType.equals/hashCode — pin it.
+ val active: Set[PauseType] = Set(
+ UserPause,
+ UserPause, // singleton — must collapse
+ ECMPause(EmbeddedControlMessageIdentity("ckpt-1")),
+ ECMPause(EmbeddedControlMessageIdentity("ckpt-1")) // same id — must
collapse
+ )
+ assert(active.size == 2)
+ }
+
+ it should "treat ECMPause instances with different ids as distinct Set
elements" in {
+ // Two checkpoint pauses with different ids must be independently
+ // tracked, so the manager can resume one without clearing the other.
+ val active: Set[PauseType] = Set(
+ ECMPause(EmbeddedControlMessageIdentity("ckpt-1")),
+ ECMPause(EmbeddedControlMessageIdentity("ckpt-2"))
+ )
+ assert(active.size == 2)
+ val afterResumeFirst = active -
ECMPause(EmbeddedControlMessageIdentity("ckpt-1"))
+ assert(afterResumeFirst.size == 1)
+
assert(afterResumeFirst.contains(ECMPause(EmbeddedControlMessageIdentity("ckpt-2"))))
+ }
+}