This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 88c7a21ea1 test(amber): add unit tests for OrderingEnforcer (#4721)
88c7a21ea1 is described below

commit 88c7a21ea15c6e440de54786e31700b3a9cf7c22
Author: Yicong Huang <[email protected]>
AuthorDate: Sat May 2 19:44:27 2026 -0700

    test(amber): add unit tests for OrderingEnforcer (#4721)
    
    ### What changes were proposed in this PR?
    
    Adds scalatest coverage for
    
`amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OrderingEnforcer.scala`.
    The FIFO/exactly-once helper had no dedicated spec.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4720.
    
    ### How was this PR tested?
    
    ```
    sbt scalafmtCheckAll
    sbt "WorkflowExecutionService/testOnly 
org.apache.texera.amber.engine.architecture.messaginglayer.OrderingEnforcerSpec"
    ```
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-7)
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
    Co-authored-by: Xinyuan Lin <[email protected]>
---
 .../messaginglayer/OrderingEnforcerSpec.scala      | 150 +++++++++++++++++++++
 1 file changed, 150 insertions(+)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OrderingEnforcerSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OrderingEnforcerSpec.scala
new file mode 100644
index 0000000000..be0e34eacf
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OrderingEnforcerSpec.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.messaginglayer
+
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class OrderingEnforcerSpec extends AnyFlatSpec with Matchers {
+
+  // ----- initial state -----
+
+  "OrderingEnforcer" should "start with current=0 and an empty stash" in {
+    val enforcer = new OrderingEnforcer[String]
+    enforcer.current shouldBe 0L
+    enforcer.ofoMap shouldBe empty
+  }
+
+  // ----- setCurrent -----
+
+  "setCurrent" should "advance the current cursor and shift the duplicate 
threshold" in {
+    val enforcer = new OrderingEnforcer[String]
+    enforcer.setCurrent(10L)
+    enforcer.current shouldBe 10L
+    enforcer.isDuplicated(9L) shouldBe true
+    enforcer.isDuplicated(10L) shouldBe false
+  }
+
+  // ----- isDuplicated -----
+
+  "isDuplicated" should "treat sequence numbers below current as duplicates" 
in {
+    val enforcer = new OrderingEnforcer[String]
+    enforcer.setCurrent(5L)
+    enforcer.isDuplicated(0L) shouldBe true
+    enforcer.isDuplicated(4L) shouldBe true
+  }
+
+  it should "treat sequence numbers >= current that are not stashed as not 
duplicated" in {
+    val enforcer = new OrderingEnforcer[String]
+    enforcer.setCurrent(5L)
+    enforcer.isDuplicated(5L) shouldBe false
+    enforcer.isDuplicated(7L) shouldBe false
+  }
+
+  it should "report stashed future sequence numbers as duplicated" in {
+    val enforcer = new OrderingEnforcer[String]
+    enforcer.stash(7L, "seven")
+    enforcer.isDuplicated(7L) shouldBe true
+  }
+
+  // ----- isAhead -----
+
+  "isAhead" should "be true only for sequence numbers strictly greater than 
current" in {
+    val enforcer = new OrderingEnforcer[String]
+    enforcer.setCurrent(5L)
+    enforcer.isAhead(6L) shouldBe true
+    enforcer.isAhead(5L) shouldBe false
+    enforcer.isAhead(4L) shouldBe false
+  }
+
+  // ----- stash -----
+
+  "stash" should "store data under its sequence number for later draining" in {
+    val enforcer = new OrderingEnforcer[String]
+    enforcer.stash(2L, "two")
+    enforcer.ofoMap(2L) shouldBe "two"
+  }
+
+  it should "overwrite an existing stash entry at the same sequence number" in 
{
+    // Pin: there is no guard against re-stashing the same sequence number.
+    // Callers rely on isDuplicated to skip the second stash, but a direct
+    // re-stash still overwrites silently.
+    val enforcer = new OrderingEnforcer[String]
+    enforcer.stash(2L, "first")
+    enforcer.stash(2L, "second")
+    enforcer.ofoMap(2L) shouldBe "second"
+  }
+
+  // ----- enforceFIFO -----
+
+  "enforceFIFO" should "advance current by one and emit just the input when no 
stash is queued" in {
+    val enforcer = new OrderingEnforcer[String]
+    enforcer.enforceFIFO("zero") shouldBe List("zero")
+    enforcer.current shouldBe 1L
+  }
+
+  it should "drain a single contiguous stashed entry after the input" in {
+    val enforcer = new OrderingEnforcer[String]
+    enforcer.stash(1L, "one")
+    enforcer.enforceFIFO("zero") shouldBe List("zero", "one")
+    enforcer.current shouldBe 2L
+    enforcer.ofoMap should not contain key(1L)
+  }
+
+  it should "drain a contiguous run from the stash and stop at the first gap" 
in {
+    val enforcer = new OrderingEnforcer[String]
+    enforcer.stash(1L, "one")
+    enforcer.stash(2L, "two")
+    enforcer.stash(4L, "four") // gap at 3
+    val emitted = enforcer.enforceFIFO("zero")
+    emitted shouldBe List("zero", "one", "two")
+    enforcer.current shouldBe 3L
+    enforcer.ofoMap.keys.toList shouldBe List(4L)
+  }
+
+  it should "leave the stash untouched when none of the queued entries are 
contiguous" in {
+    val enforcer = new OrderingEnforcer[String]
+    enforcer.stash(5L, "five")
+    enforcer.stash(7L, "seven")
+    val emitted = enforcer.enforceFIFO("zero")
+    emitted shouldBe List("zero")
+    enforcer.current shouldBe 1L
+    enforcer.ofoMap.keys.toSet shouldBe Set(5L, 7L)
+  }
+
+  it should "respect a non-zero starting current when draining" in {
+    // Setting the cursor manually mimics replay/recovery: the enforcer skips
+    // past prior messages and only drains entries with sequence numbers
+    // strictly greater than the current value at call time.
+    val enforcer = new OrderingEnforcer[String]
+    enforcer.setCurrent(10L)
+    enforcer.stash(11L, "eleven")
+    enforcer.stash(12L, "twelve")
+    val emitted = enforcer.enforceFIFO("ten")
+    emitted shouldBe List("ten", "eleven", "twelve")
+    enforcer.current shouldBe 13L
+  }
+
+  it should "support int payloads via the type parameter" in {
+    val enforcer = new OrderingEnforcer[Int]
+    enforcer.stash(1L, 100)
+    enforcer.enforceFIFO(0) shouldBe List(0, 100)
+  }
+}

Reply via email to