This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new a4b186b72f test(amber): add unit tests for AmberFIFOChannel and
CongestionControl (#4819)
a4b186b72f is described below
commit a4b186b72f9ae00273e1e4f390828d0087f4f4e7
Author: Yicong Huang <[email protected]>
AuthorDate: Mon May 4 02:15:33 2026 -0700
test(amber): add unit tests for AmberFIFOChannel and CongestionControl
(#4819)
### What changes were proposed in this PR?
Adds scalatest coverage for two messaging-layer helpers under
`amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/`:
- `AmberFIFOChannel` — per-channel FIFO/exactly-once enforcer with
stash, holdCredit, portId.
- `CongestionControl` — TCP-style sliding-window send queue with slow
start, ack-window adjustments, resend bookkeeping.
`AmberFIFOChannel`'s spec covers initial state,
in-order/duplicate/ahead/drain semantics on `acceptMessage`, FIFO `take`
with credit decrement, `enable` toggle, queue/stash size accessors, and
the `setPortId`/`getPortId` round-trip including the no-set NPE pin.
`CongestionControl`'s spec covers `canSend`,
`enqueueMessage`+`getBufferedMessagesToSend` window release,
`markMessageInTransit`, the ack growth math (slow-start vs. linear past
`ssThreshold`), unknown-id ack as no-op, `getTimedOutInTransitMessages`,
`getAllMessages` union, and the `getStatusReport` format.
Real-clock-dependent paths in `CongestionControl` (the resend timeout
and the timed-out window-shrink branch) are noted in comments rather
than driven through `Thread.sleep` so the suite stays fast — those
belong in an integration test with a controlled clock.
### Any related issues, documentation, discussions?
Closes #4817.
Bug filed separately: `AmberFIFOChannel.getPortId` calls `.get` on an
Option that defaults to None and throws `NoSuchElementException` with no
context when invoked before `setPortId`. Pinned in the spec as the
current behavior.
### How was this PR tested?
```
sbt scalafmtCheckAll
sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.architecture.messaginglayer.AmberFIFOChannelSpec
org.apache.texera.amber.engine.architecture.messaginglayer.CongestionControlSpec"
```
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../messaginglayer/AmberFIFOChannelSpec.scala | 34 ++++++++++
.../messaginglayer/CongestionControlSpec.scala | 77 ++++++++++++++++++++--
2 files changed, 104 insertions(+), 7 deletions(-)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannelSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannelSpec.scala
index b70d9f9d6a..2109d22e32 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannelSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannelSpec.scala
@@ -95,6 +95,21 @@ class AmberFIFOChannelSpec extends AnyFlatSpec {
assert(ch.getQueuedCredit == 0L)
}
+ it should "drain a contiguous run from the stash once the gap fills, leaving
a non-contiguous stashed message behind" in {
+ // A three-message stash with a gap: seq 1, 2, 4 are all stashed because
+ // seq 0 hasn't arrived; once 0 arrives, the contiguous run 0..2 drains
+ // but 4 stays stashed because seq 3 is still missing.
+ val ch = new AmberFIFOChannel(cid)
+ ch.acceptMessage(msg(1L))
+ ch.acceptMessage(msg(2L))
+ ch.acceptMessage(msg(4L))
+ ch.acceptMessage(msg(0L))
+ assert(ch.getCurrentSeq == 3L, "drain must advance current to the first
missing seq")
+ // queued: 0, 1, 2 — three messages worth of credit
+ assert(ch.getQueuedCredit == 3 * msgSize)
+ assert(ch.getTotalStashedSize == msgSize, "only seq=4 remains stashed")
+ }
+
it should "drop duplicates whose sequence number is below the current
high-water mark" in {
val ch = new AmberFIFOChannel(cid)
ch.acceptMessage(msg(0L))
@@ -134,6 +149,25 @@ class AmberFIFOChannelSpec extends AnyFlatSpec {
assert(ch.getQueuedCredit == 0L)
}
+ //
---------------------------------------------------------------------------
+ // Size accessors
+ //
---------------------------------------------------------------------------
+
+ "AmberFIFOChannel.getTotalMessageSize" should "report the sum of in-memory
size across queued messages" in {
+ val ch = new AmberFIFOChannel(cid)
+ ch.acceptMessage(msg(0L))
+ ch.acceptMessage(msg(1L))
+ assert(ch.getTotalMessageSize == 2 * msgSize)
+ }
+
+ "AmberFIFOChannel.getTotalStashedSize" should "report the sum of in-memory
size across stashed messages only" in {
+ val ch = new AmberFIFOChannel(cid)
+ ch.acceptMessage(msg(2L))
+ ch.acceptMessage(msg(4L))
+ assert(ch.getTotalStashedSize == 2 * msgSize)
+ assert(ch.getTotalMessageSize == 0L, "stashed messages do not count toward
queued size")
+ }
+
//
---------------------------------------------------------------------------
// enable / isEnabled
//
---------------------------------------------------------------------------
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala
index 4809acd169..322f96924d 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala
@@ -32,6 +32,17 @@ class CongestionControlSpec extends AnyFlatSpec {
private def msg(id: Long): NetworkMessage =
NetworkMessage(id, WorkflowFIFOMessage(channelId, id,
DataFrame(Array.empty)))
+ // Backdate `sentTime` for `id` so the timeout branches (ack > ackTimeLimit
+ // and getTimedOutInTransitMessages > resendTimeLimit) become reachable
+ // without sleeping. The field is `private val sentTime: LongMap[Long]`,
+ // accessed via Java reflection on the instance's backing field.
+ private def backdateSentTime(cc: CongestionControl, id: Long, ageMillis:
Long): Unit = {
+ val field = classOf[CongestionControl].getDeclaredField("sentTime")
+ field.setAccessible(true)
+ val map =
field.get(cc).asInstanceOf[scala.collection.mutable.LongMap[Long]]
+ map(id) = System.currentTimeMillis() - ageMillis
+ }
+
"CongestionControl.canSend" should "be true initially with empty in-transit
set" in {
val cc = new CongestionControl()
assert(cc.canSend)
@@ -126,6 +137,43 @@ class CongestionControlSpec extends AnyFlatSpec {
)
}
+ it should "double the window during slow start, then increment linearly past
ssThreshold" in {
+ // ssThreshold defaults to 16 and windowSize to 1. Five quick acks should
+ // double 1→2→4→8→16, then increment to 17 on the next ack (the fifth ack
+ // hits the linear branch because windowSize == ssThreshold == 16).
+ val cc = new CongestionControl()
+ for (i <- 0 until 5) {
+ cc.markMessageInTransit(msg(i.toLong))
+ cc.ack(i.toLong)
+ }
+ assert(
+ cc.getStatusReport.contains("current window size = 17"),
+ s"unexpected status: ${cc.getStatusReport}"
+ )
+ }
+
+ "CongestionControl.ack outside ackTimeLimit" should
+ "halve ssThreshold and snap windowSize back to ssThreshold" in {
+ // Drive windowSize up to 16 (== ssThreshold) via four in-window acks,
+ // then backdate the next send so the ack falls outside ackTimeLimit.
+ // The timeout branch should halve ssThreshold to 8 and snap windowSize
+ // back to 8.
+ val cc = new CongestionControl()
+ for (i <- 0 until 4) {
+ cc.markMessageInTransit(msg(i.toLong))
+ cc.ack(i.toLong)
+ }
+ assert(cc.getStatusReport.contains("current window size = 16"))
+
+ cc.markMessageInTransit(msg(99L))
+ backdateSentTime(cc, 99L, 5000) // > ackTimeLimit (3000)
+ cc.ack(99L)
+ assert(
+ cc.getStatusReport.contains("current window size = 8"),
+ s"unexpected status: ${cc.getStatusReport}"
+ )
+ }
+
"CongestionControl.getBufferedMessagesToSend" should "be bounded by
remaining window capacity" in {
val cc = new CongestionControl()
cc.enqueueMessage(msg(1L))
@@ -165,6 +213,18 @@ class CongestionControlSpec extends AnyFlatSpec {
assert(cc.getTimedOutInTransitMessages.isEmpty)
}
+ it should "return only the messages whose sentTime is older than
resendTimeLimit" in {
+ // Cover the AkkaMessageTransferService.checkResend() retransmission path:
+ // the in-transit message that has been sitting past the 60s
+ // resendTimeLimit must surface; the freshly-sent one must not.
+ val cc = new CongestionControl()
+ cc.markMessageInTransit(msg(0L))
+ cc.markMessageInTransit(msg(1L))
+ backdateSentTime(cc, 0L, 70000) // > resendTimeLimit (60000)
+ val timedOut = cc.getTimedOutInTransitMessages.toList.map(_.messageId)
+ assert(timedOut == List(0L))
+ }
+
"CongestionControl.enqueueMessage" should "not place the message into the
in-transit set on its own" in {
val cc = new CongestionControl()
cc.enqueueMessage(msg(1L))
@@ -174,13 +234,16 @@ class CongestionControlSpec extends AnyFlatSpec {
assert(cc.getAllMessages.exists(_.messageId == 1L))
}
- "CongestionControl.getStatusReport" should "include window size, in-transit
count, and waiting count" in {
+ "CongestionControl.getStatusReport" should
+ "format the three core counters in the documented order" in {
+ // Pin the exact format string (separator + ordering) so a reorder of
+ // the three fields or a tab-vs-comma swap fails this spec.
val cc = new CongestionControl()
- cc.markMessageInTransit(msg(1L))
- cc.enqueueMessage(msg(2L))
- val report = cc.getStatusReport
- assert(report.contains("window size"))
- assert(report.contains("in transit"))
- assert(report.contains("waiting"))
+ cc.markMessageInTransit(msg(0L))
+ cc.enqueueMessage(msg(1L))
+ assert(
+ cc.getStatusReport == "current window size = 1 \t in transit = 1 \t
waiting = 1",
+ s"unexpected format: ${cc.getStatusReport}"
+ )
}
}