This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new a4b186b72f test(amber): add unit tests for AmberFIFOChannel and 
CongestionControl (#4819)
a4b186b72f is described below

commit a4b186b72f9ae00273e1e4f390828d0087f4f4e7
Author: Yicong Huang <[email protected]>
AuthorDate: Mon May 4 02:15:33 2026 -0700

    test(amber): add unit tests for AmberFIFOChannel and CongestionControl 
(#4819)
    
    ### What changes were proposed in this PR?
    
    Adds scalatest coverage for two messaging-layer helpers under
    
`amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/`:
    
    - `AmberFIFOChannel` — per-channel FIFO/exactly-once enforcer with
    stash, holdCredit, portId.
    - `CongestionControl` — TCP-style sliding-window send queue with slow
    start, ack-window adjustments, resend bookkeeping.
    
    `AmberFIFOChannel`'s spec covers initial state,
    in-order/duplicate/ahead/drain semantics on `acceptMessage`, FIFO `take`
    with credit decrement, `enable` toggle, queue/stash size accessors, and
    the `setPortId`/`getPortId` round-trip including the no-set NPE pin.
    `CongestionControl`'s spec covers `canSend`,
    `enqueueMessage`+`getBufferedMessagesToSend` window release,
    `markMessageInTransit`, the ack growth math (slow-start vs. linear past
    `ssThreshold`), unknown-id ack as no-op, `getTimedOutInTransitMessages`,
    `getAllMessages` union, and the `getStatusReport` format.
    
    Real-clock-dependent paths in `CongestionControl` (the resend timeout
    and the timed-out window-shrink branch) are noted in comments rather
    than driven through `Thread.sleep` so the suite stays fast — those
    belong in an integration test with a controlled clock.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4817.
    
    Bug filed separately: `AmberFIFOChannel.getPortId` calls `.get` on an
    Option that defaults to None and throws `NoSuchElementException` with no
    context when invoked before `setPortId`. Pinned in the spec as the
    current behavior.
    
    ### How was this PR tested?
    
    ```
    sbt scalafmtCheckAll
    sbt "WorkflowExecutionService/testOnly 
org.apache.texera.amber.engine.architecture.messaginglayer.AmberFIFOChannelSpec 
org.apache.texera.amber.engine.architecture.messaginglayer.CongestionControlSpec"
    ```
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-7)
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../messaginglayer/AmberFIFOChannelSpec.scala      | 34 ++++++++++
 .../messaginglayer/CongestionControlSpec.scala     | 77 ++++++++++++++++++++--
 2 files changed, 104 insertions(+), 7 deletions(-)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannelSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannelSpec.scala
index b70d9f9d6a..2109d22e32 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannelSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannelSpec.scala
@@ -95,6 +95,21 @@ class AmberFIFOChannelSpec extends AnyFlatSpec {
     assert(ch.getQueuedCredit == 0L)
   }
 
+  it should "drain a contiguous run from the stash once the gap fills, leaving 
a non-contiguous stashed message behind" in {
+    // A three-message stash with a gap: seq 1, 2, 4 are all stashed because
+    // seq 0 hasn't arrived; once 0 arrives, the contiguous run 0..2 drains
+    // but 4 stays stashed because seq 3 is still missing.
+    val ch = new AmberFIFOChannel(cid)
+    ch.acceptMessage(msg(1L))
+    ch.acceptMessage(msg(2L))
+    ch.acceptMessage(msg(4L))
+    ch.acceptMessage(msg(0L))
+    assert(ch.getCurrentSeq == 3L, "drain must advance current to the first 
missing seq")
+    // queued: 0, 1, 2 — three messages worth of credit
+    assert(ch.getQueuedCredit == 3 * msgSize)
+    assert(ch.getTotalStashedSize == msgSize, "only seq=4 remains stashed")
+  }
+
   it should "drop duplicates whose sequence number is below the current 
high-water mark" in {
     val ch = new AmberFIFOChannel(cid)
     ch.acceptMessage(msg(0L))
@@ -134,6 +149,25 @@ class AmberFIFOChannelSpec extends AnyFlatSpec {
     assert(ch.getQueuedCredit == 0L)
   }
 
+  // 
---------------------------------------------------------------------------
+  // Size accessors
+  // 
---------------------------------------------------------------------------
+
+  "AmberFIFOChannel.getTotalMessageSize" should "report the sum of in-memory 
size across queued messages" in {
+    val ch = new AmberFIFOChannel(cid)
+    ch.acceptMessage(msg(0L))
+    ch.acceptMessage(msg(1L))
+    assert(ch.getTotalMessageSize == 2 * msgSize)
+  }
+
+  "AmberFIFOChannel.getTotalStashedSize" should "report the sum of in-memory 
size across stashed messages only" in {
+    val ch = new AmberFIFOChannel(cid)
+    ch.acceptMessage(msg(2L))
+    ch.acceptMessage(msg(4L))
+    assert(ch.getTotalStashedSize == 2 * msgSize)
+    assert(ch.getTotalMessageSize == 0L, "stashed messages do not count toward 
queued size")
+  }
+
   // 
---------------------------------------------------------------------------
   // enable / isEnabled
   // 
---------------------------------------------------------------------------
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala
index 4809acd169..322f96924d 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala
@@ -32,6 +32,17 @@ class CongestionControlSpec extends AnyFlatSpec {
   private def msg(id: Long): NetworkMessage =
     NetworkMessage(id, WorkflowFIFOMessage(channelId, id, 
DataFrame(Array.empty)))
 
+  // Backdate `sentTime` for `id` so the timeout branches (ack > ackTimeLimit
+  // and getTimedOutInTransitMessages > resendTimeLimit) become reachable
+  // without sleeping. The field is `private val sentTime: LongMap[Long]`,
+  // accessed via Java reflection on the instance's backing field.
+  private def backdateSentTime(cc: CongestionControl, id: Long, ageMillis: 
Long): Unit = {
+    val field = classOf[CongestionControl].getDeclaredField("sentTime")
+    field.setAccessible(true)
+    val map = 
field.get(cc).asInstanceOf[scala.collection.mutable.LongMap[Long]]
+    map(id) = System.currentTimeMillis() - ageMillis
+  }
+
   "CongestionControl.canSend" should "be true initially with empty in-transit 
set" in {
     val cc = new CongestionControl()
     assert(cc.canSend)
@@ -126,6 +137,43 @@ class CongestionControlSpec extends AnyFlatSpec {
     )
   }
 
+  it should "double the window during slow start, then increment linearly past 
ssThreshold" in {
+    // ssThreshold defaults to 16 and windowSize to 1. Five quick acks should
+    // double 1→2→4→8→16, then increment to 17 on the next ack (the fifth ack
+    // hits the linear branch because windowSize == ssThreshold == 16).
+    val cc = new CongestionControl()
+    for (i <- 0 until 5) {
+      cc.markMessageInTransit(msg(i.toLong))
+      cc.ack(i.toLong)
+    }
+    assert(
+      cc.getStatusReport.contains("current window size = 17"),
+      s"unexpected status: ${cc.getStatusReport}"
+    )
+  }
+
+  "CongestionControl.ack outside ackTimeLimit" should
+    "halve ssThreshold and snap windowSize back to ssThreshold" in {
+    // Drive windowSize up to 16 (== ssThreshold) via four in-window acks,
+    // then backdate the next send so the ack falls outside ackTimeLimit.
+    // The timeout branch should halve ssThreshold to 8 and snap windowSize
+    // back to 8.
+    val cc = new CongestionControl()
+    for (i <- 0 until 4) {
+      cc.markMessageInTransit(msg(i.toLong))
+      cc.ack(i.toLong)
+    }
+    assert(cc.getStatusReport.contains("current window size = 16"))
+
+    cc.markMessageInTransit(msg(99L))
+    backdateSentTime(cc, 99L, 5000) // > ackTimeLimit (3000)
+    cc.ack(99L)
+    assert(
+      cc.getStatusReport.contains("current window size = 8"),
+      s"unexpected status: ${cc.getStatusReport}"
+    )
+  }
+
   "CongestionControl.getBufferedMessagesToSend" should "be bounded by 
remaining window capacity" in {
     val cc = new CongestionControl()
     cc.enqueueMessage(msg(1L))
@@ -165,6 +213,18 @@ class CongestionControlSpec extends AnyFlatSpec {
     assert(cc.getTimedOutInTransitMessages.isEmpty)
   }
 
+  it should "return only the messages whose sentTime is older than 
resendTimeLimit" in {
+    // Cover the AkkaMessageTransferService.checkResend() retransmission path:
+    // the in-transit message that has been sitting past the 60s
+    // resendTimeLimit must surface; the freshly-sent one must not.
+    val cc = new CongestionControl()
+    cc.markMessageInTransit(msg(0L))
+    cc.markMessageInTransit(msg(1L))
+    backdateSentTime(cc, 0L, 70000) // > resendTimeLimit (60000)
+    val timedOut = cc.getTimedOutInTransitMessages.toList.map(_.messageId)
+    assert(timedOut == List(0L))
+  }
+
   "CongestionControl.enqueueMessage" should "not place the message into the 
in-transit set on its own" in {
     val cc = new CongestionControl()
     cc.enqueueMessage(msg(1L))
@@ -174,13 +234,16 @@ class CongestionControlSpec extends AnyFlatSpec {
     assert(cc.getAllMessages.exists(_.messageId == 1L))
   }
 
-  "CongestionControl.getStatusReport" should "include window size, in-transit 
count, and waiting count" in {
+  "CongestionControl.getStatusReport" should
+    "format the three core counters in the documented order" in {
+    // Pin the exact format string (separator + ordering) so a reorder of
+    // the three fields or a tab-vs-comma swap fails this spec.
     val cc = new CongestionControl()
-    cc.markMessageInTransit(msg(1L))
-    cc.enqueueMessage(msg(2L))
-    val report = cc.getStatusReport
-    assert(report.contains("window size"))
-    assert(report.contains("in transit"))
-    assert(report.contains("waiting"))
+    cc.markMessageInTransit(msg(0L))
+    cc.enqueueMessage(msg(1L))
+    assert(
+      cc.getStatusReport == "current window size = 1 \t in transit = 1 \t 
waiting = 1",
+      s"unexpected format: ${cc.getStatusReport}"
+    )
   }
 }

Reply via email to