This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 810263c3ef test(amber): add unit test coverage for PauseType (#4914)
810263c3ef is described below

commit 810263c3ef769dd1f0def924e64390fe8b81739d
Author: Xinyuan Lin <[email protected]>
AuthorDate: Mon May 4 01:28:53 2026 -0700

    test(amber): add unit test coverage for PauseType (#4914)
    
    ### What changes were proposed in this PR?
    
    Adds `PauseTypeSpec` covering `PauseType`
    
(amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/PauseType.scala),
    the small sealed trait used by the worker pause subsystem (three
    singleton kinds + one case-class subtype). It has no test coverage
    today.
    
    The new spec pins:
    - The three pause singletons (`UserPause`, `BackpressurePause`,
    `OperatorLogicPause`) extend `PauseType`, are object-identity stable,
    and compare unequal to each other (widened to `PauseType` so the
    comparison is not folded away at compile time).
    - `ECMPause` is a case class that carries the
    `EmbeddedControlMessageIdentity` it was constructed with, supports value
    equality and consistent `hashCode` by id, and is distinct from every
    singleton PauseType.
    - Exhaustive `match` over `PauseType` distinguishes each subtype.
    - Set-based coexistence (the contract `PauseManager` actually relies on,
    since it stores active pauses in a `HashSet[PauseType]` with no
    priority): all four pause kinds coexist as distinct Set elements;
    identical pauses deduplicate (singletons collapse, same-id `ECMPause`
    collapses); `ECMPause` instances with different ids are independently
    tracked.
    
    The multi-pause behavior through
    `PauseManager.pause`/`resume`/`isPaused` is covered separately in
    `WorkerManagersSpec`.
    
    No production code changed; this is test-only.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4913
    
    ### How was this PR tested?
    
    Added 10 new unit tests in `PauseTypeSpec`. Verified locally:
    
    ```
    sbt 'WorkflowExecutionService/Test/testOnly 
org.apache.texera.amber.engine.architecture.worker.PauseTypeSpec'
    # → Tests: succeeded 10, failed 0
    
    sbt 'WorkflowExecutionService/Test/scalafmtCheck'
    # → clean
    ```
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../engine/architecture/worker/PauseTypeSpec.scala | 145 +++++++++++++++++++++
 1 file changed, 145 insertions(+)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/PauseTypeSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/PauseTypeSpec.scala
new file mode 100644
index 0000000000..68af4cec3b
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/PauseTypeSpec.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.worker
+
+import 
org.apache.texera.amber.core.virtualidentity.EmbeddedControlMessageIdentity
+import org.scalatest.flatspec.AnyFlatSpec
+
+class PauseTypeSpec extends AnyFlatSpec {
+
+  // --- singletons 
------------------------------------------------------------
+  //
+  // The sealed-trait subtype relationship is enforced at compile time by the
+  // type ascriptions (`val u: PauseType = UserPause`, etc.) used below. There
+  // is no runtime test for "singletons extend PauseType" because that would
+  // be tautological — if any singleton stopped extending the trait, this
+  // file would fail to compile.
+
+  "PauseType singletons" should "compare equal to themselves and unequal to 
each other" in {
+    // Widen to PauseType so the compiler doesn't reduce inter-singleton
+    // comparisons to constant `false` at compile time.
+    val u: PauseType = UserPause
+    val b: PauseType = BackpressurePause
+    val o: PauseType = OperatorLogicPause
+    assert(u == UserPause)
+    assert(b == BackpressurePause)
+    assert(o == OperatorLogicPause)
+    assert(u != b)
+    assert(u != o)
+    assert(b != o)
+  }
+
+  it should "be the same singleton instance per access (object identity)" in {
+    assert((UserPause: AnyRef) eq UserPause)
+    assert((BackpressurePause: AnyRef) eq BackpressurePause)
+    assert((OperatorLogicPause: AnyRef) eq OperatorLogicPause)
+  }
+
+  // --- ECMPause 
--------------------------------------------------------------
+
+  "ECMPause" should "carry the EmbeddedControlMessageIdentity it was 
constructed with" in {
+    val id = EmbeddedControlMessageIdentity("ckpt-1")
+    val p = ECMPause(id)
+    assert(p.id == id)
+  }
+
+  it should "support case-class value equality and hashCode (same id → equal)" 
in {
+    val a = ECMPause(EmbeddedControlMessageIdentity("ckpt-1"))
+    val b = ECMPause(EmbeddedControlMessageIdentity("ckpt-1"))
+    val c = ECMPause(EmbeddedControlMessageIdentity("ckpt-2"))
+    assert(a == b)
+    assert(a.hashCode == b.hashCode)
+    assert(a != c)
+  }
+
+  it should "not equal any of the singleton PauseTypes" in {
+    // Subtype relationship is already proven by the `: PauseType` ascription;
+    // what we actually want to lock down here is the cross-kind inequality:
+    // an ECMPause (with any id) must not collide with any singleton kind.
+    val p: PauseType = ECMPause(EmbeddedControlMessageIdentity("ckpt"))
+    assert(p != UserPause)
+    assert(p != BackpressurePause)
+    assert(p != OperatorLogicPause)
+  }
+
+  // --- pattern matching 
------------------------------------------------------
+
+  "PauseType" should "support exhaustive pattern matching that distinguishes 
each subtype" in {
+    def label(p: PauseType): String =
+      p match {
+        case UserPause          => "user"
+        case BackpressurePause  => "backpressure"
+        case OperatorLogicPause => "operator-logic"
+        case ECMPause(_)        => "ecm"
+      }
+    assert(label(UserPause) == "user")
+    assert(label(BackpressurePause) == "backpressure")
+    assert(label(OperatorLogicPause) == "operator-logic")
+    assert(label(ECMPause(EmbeddedControlMessageIdentity("x"))) == "ecm")
+  }
+
+  // --- Set-based coexistence (the contract PauseManager actually relies on) 
--
+  // PauseManager stores active pauses in a `HashSet[PauseType]` (additive,
+  // no priority — resuming one type only removes that type). The 
override-order
+  // semantics that the data type would need to support priorities don't exist
+  // in PauseType; the data type only has to behave well as Set elements.
+  // These tests pin that contract here. The multi-pause coexistence behavior
+  // through PauseManager.pause/resume/isPaused is covered separately in
+  // WorkerManagersSpec.
+
+  it should "coexist as distinct elements in a Set without aliasing" in {
+    val active: Set[PauseType] = Set(
+      UserPause,
+      BackpressurePause,
+      OperatorLogicPause,
+      ECMPause(EmbeddedControlMessageIdentity("ckpt-1"))
+    )
+    assert(active.size == 4, "all four pause kinds must be distinct Set 
elements")
+    assert(active.contains(UserPause))
+    assert(active.contains(BackpressurePause))
+    assert(active.contains(OperatorLogicPause))
+    assert(active.contains(ECMPause(EmbeddedControlMessageIdentity("ckpt-1"))))
+  }
+
+  it should "deduplicate identical pauses inside a Set" in {
+    // PauseManager.pause(t) treats duplicate pauses as a no-op. That works
+    // because Set deduplication leans on PauseType.equals/hashCode — pin it.
+    val active: Set[PauseType] = Set(
+      UserPause,
+      UserPause, // singleton — must collapse
+      ECMPause(EmbeddedControlMessageIdentity("ckpt-1")),
+      ECMPause(EmbeddedControlMessageIdentity("ckpt-1")) // same id — must 
collapse
+    )
+    assert(active.size == 2)
+  }
+
+  it should "treat ECMPause instances with different ids as distinct Set 
elements" in {
+    // Two checkpoint pauses with different ids must be independently
+    // tracked, so the manager can resume one without clearing the other.
+    val active: Set[PauseType] = Set(
+      ECMPause(EmbeddedControlMessageIdentity("ckpt-1")),
+      ECMPause(EmbeddedControlMessageIdentity("ckpt-2"))
+    )
+    assert(active.size == 2)
+    val afterResumeFirst = active - 
ECMPause(EmbeddedControlMessageIdentity("ckpt-1"))
+    assert(afterResumeFirst.size == 1)
+    
assert(afterResumeFirst.contains(ECMPause(EmbeddedControlMessageIdentity("ckpt-2"))))
+  }
+}

Reply via email to