This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new c7457f7e63 test(amber): add unit test coverage for CongestionControl
(#4748)
c7457f7e63 is described below
commit c7457f7e633d3caaae7c8a0f8014de2a707cf159
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun May 3 04:22:19 2026 -0700
test(amber): add unit test coverage for CongestionControl (#4748)
### What changes were proposed in this PR?
Add `CongestionControlSpec` covering the TCP-style slow-start window
manager in `CongestionControl`:
- `canSend` true initially; false once in-transit reaches the window
size
- `ack` no-op for unknown ids; removes the acked message; grows window
via slow start when acked within `ackTimeLimit`
- `getBufferedMessagesToSend` bounded by remaining window capacity;
empty when window is full
- `getAllMessages` unions in-transit and queued sets
- `getTimedOutInTransitMessages` is empty when nothing is in transit,
and excludes messages still inside the resend time limit
- `enqueueMessage` does not place the message into the in-transit set on
its own (it remains queued via `getAllMessages` until
`markMessageInTransit` or `getBufferedMessagesToSend` moves it)
- `getStatusReport` mentions window/in-transit/waiting
The late-ack `ssThreshold` halving and slow-start-past-threshold
branches are intentionally not asserted because they require either
real-clock 3s sleeps or 16-iteration warmups that would slow the suite
without commensurate value.
### Any related issues, documentation, discussions?
Closes #4747
### How was this PR tested?
`sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.architecture.messaginglayer.CongestionControlSpec"`
— 12/12 tests pass.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../messaginglayer/CongestionControlSpec.scala | 186 +++++++++++++++++++++
1 file changed, 186 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala
new file mode 100644
index 0000000000..4809acd169
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.messaginglayer
+
+import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity,
ChannelIdentity}
+import
org.apache.texera.amber.engine.architecture.common.WorkflowActor.NetworkMessage
+import org.apache.texera.amber.engine.common.ambermessage.{DataFrame,
WorkflowFIFOMessage}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class CongestionControlSpec extends AnyFlatSpec {
+
+ private val channelId =
+ ChannelIdentity(ActorVirtualIdentity("from"), ActorVirtualIdentity("to"),
isControl = false)
+
+ private def msg(id: Long): NetworkMessage =
+ NetworkMessage(id, WorkflowFIFOMessage(channelId, id,
DataFrame(Array.empty)))
+
+ "CongestionControl.canSend" should "be true initially with empty in-transit
set" in {
+ val cc = new CongestionControl()
+ assert(cc.canSend)
+ }
+
+ it should "become false once in-transit messages reach the window size" in {
+ val cc = new CongestionControl()
+ // initial windowSize = 1
+ cc.markMessageInTransit(msg(1L))
+ assert(!cc.canSend)
+ }
+
+ it should "not block markMessageInTransit when in-transit count already
exceeds window" in {
+ // CongestionControl tracks message *count*, not byte size — payload size
+ // does not factor into the window check (that's FlowControl's job, not
+ // this class's). markMessageInTransit is a passive setter: it does not
+ // check `canSend`. Callers are expected to consult `canSend` first; if
+ // they don't, the in-transit map grows past windowSize but `canSend`
+ // stays false.
+ val cc = new CongestionControl()
+ cc.markMessageInTransit(msg(1L))
+ cc.markMessageInTransit(msg(2L)) // ignores window; should still record
+ cc.markMessageInTransit(msg(3L))
+ assert(cc.getInTransitMessages.size == 3)
+ assert(!cc.canSend)
+ }
+
+ it should "stay true while in-transit count is below the grown window" in {
+ val cc = new CongestionControl()
+ // After three slow-start acks, the window should be at least 4. Verify
+ // that three in-transit messages still leave room for more.
+ (1L to 3L).foreach { i =>
+ cc.markMessageInTransit(msg(i))
+ cc.ack(i)
+ }
+ cc.markMessageInTransit(msg(10L))
+ cc.markMessageInTransit(msg(11L))
+ cc.markMessageInTransit(msg(12L))
+ assert(cc.canSend, "window grew via slow start; 3 in-transit must not yet
hit the cap")
+ }
+
+ it should "absorb arbitrarily many enqueued messages even when the window is
full" in {
+ val cc = new CongestionControl()
+ cc.markMessageInTransit(msg(1L)) // fills window-of-1
+ assert(!cc.canSend)
+ // Receivers may push many more while we are blocked; they must all queue
+ // up and surface via getAllMessages without truncation or error.
+ (10L until 30L).foreach(i => cc.enqueueMessage(msg(i)))
+ val all = cc.getAllMessages.map(_.messageId).toSet
+ assert(all.contains(1L))
+ assert((10L until 30L).forall(all.contains))
+ }
+
+ "CongestionControl.ack" should "be a no-op for an unknown message id" in {
+ val cc = new CongestionControl()
+ cc.markMessageInTransit(msg(1L))
+ cc.ack(99L)
+ // CongestionControl.ack returns silently for ids not in `inTransit`
+ // (no logging, no exception, no window change). Pin the state-level
+ // no-op: the previously in-transit message survives, window stays full.
+ assert(cc.getInTransitMessages.exists(_.messageId == 1L))
+ assert(cc.getInTransitMessages.size == 1)
+ assert(!cc.canSend)
+ }
+
+ it should "be a no-op when the same message id is acked twice" in {
+ val cc = new CongestionControl()
+ cc.markMessageInTransit(msg(1L))
+ cc.ack(1L)
+ val sizeAfterFirst = cc.getInTransitMessages.size
+ cc.ack(1L) // duplicate ack — must not throw or further alter state
+ assert(cc.getInTransitMessages.size == sizeAfterFirst)
+ }
+
+ it should "remove an acked in-transit message and allow more sending" in {
+ val cc = new CongestionControl()
+ cc.markMessageInTransit(msg(1L))
+ cc.ack(1L)
+ assert(!cc.getInTransitMessages.exists(_.messageId == 1L))
+ assert(cc.canSend)
+ }
+
+ it should "grow the window via slow start when acked within the ack time
limit" in {
+ val cc = new CongestionControl()
+ cc.markMessageInTransit(msg(1L))
+ cc.ack(1L) // immediate ack — well within ackTimeLimit (3s)
+ // After the first slow-start ack, windowSize should be at least 2.
+ cc.markMessageInTransit(msg(2L))
+ assert(
+ cc.canSend,
+ "window must permit at least one more in-transit message after
slow-start ack"
+ )
+ }
+
+ "CongestionControl.getBufferedMessagesToSend" should "be bounded by
remaining window capacity" in {
+ val cc = new CongestionControl()
+ cc.enqueueMessage(msg(1L))
+ cc.enqueueMessage(msg(2L))
+ cc.enqueueMessage(msg(3L))
+ // initial windowSize = 1, inTransit.size = 0 → send up to 1
+ val first = cc.getBufferedMessagesToSend.toList
+ assert(first.size == 1)
+ assert(first.head.messageId == 1L)
+ }
+
+ it should "return an empty iterable when the window is fully consumed" in {
+ val cc = new CongestionControl()
+ cc.markMessageInTransit(msg(1L))
+ cc.enqueueMessage(msg(2L))
+ assert(cc.getBufferedMessagesToSend.isEmpty)
+ }
+
+ "CongestionControl.getAllMessages" should "include both in-transit and
queued messages" in {
+ val cc = new CongestionControl()
+ cc.markMessageInTransit(msg(1L))
+ cc.enqueueMessage(msg(2L))
+ val all = cc.getAllMessages.map(_.messageId).toSet
+ assert(all == Set(1L, 2L))
+ }
+
+ "CongestionControl.getTimedOutInTransitMessages" should "be empty when no
message has been marked in transit" in {
+ val cc = new CongestionControl()
+ assert(cc.getTimedOutInTransitMessages.isEmpty)
+ }
+
+ it should "exclude messages that are still inside the resend time limit" in {
+ val cc = new CongestionControl()
+ cc.markMessageInTransit(msg(1L))
+ // The message was just enqueued, so it is well inside the 60s resend
+ // window and must not be reported as timed out.
+ assert(cc.getTimedOutInTransitMessages.isEmpty)
+ }
+
+ "CongestionControl.enqueueMessage" should "not place the message into the
in-transit set on its own" in {
+ val cc = new CongestionControl()
+ cc.enqueueMessage(msg(1L))
+ assert(cc.getInTransitMessages.isEmpty)
+ // The message should still surface via getAllMessages (which unions
+ // inTransit and toBeSent), proving it was buffered, not dropped.
+ assert(cc.getAllMessages.exists(_.messageId == 1L))
+ }
+
+ "CongestionControl.getStatusReport" should "include window size, in-transit
count, and waiting count" in {
+ val cc = new CongestionControl()
+ cc.markMessageInTransit(msg(1L))
+ cc.enqueueMessage(msg(2L))
+ val report = cc.getStatusReport
+ assert(report.contains("window size"))
+ assert(report.contains("in transit"))
+ assert(report.contains("waiting"))
+ }
+}