aglinxinyuan commented on code in PR #4748:
URL: https://github.com/apache/texera/pull/4748#discussion_r3177870812


##########
amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.messaginglayer
+
+import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
ChannelIdentity}
+import 
org.apache.texera.amber.engine.architecture.common.WorkflowActor.NetworkMessage
+import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, 
WorkflowFIFOMessage}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class CongestionControlSpec extends AnyFlatSpec {
+
+  private val channelId =
+    ChannelIdentity(ActorVirtualIdentity("from"), ActorVirtualIdentity("to"), 
isControl = false)
+
+  private def msg(id: Long): NetworkMessage =
+    NetworkMessage(id, WorkflowFIFOMessage(channelId, id, 
DataFrame(Array.empty)))
+
+  "CongestionControl.canSend" should "be true initially with empty in-transit 
set" in {
+    val cc = new CongestionControl()
+    assert(cc.canSend)
+  }
+
+  it should "become false once in-transit messages reach the window size" in {
+    val cc = new CongestionControl()
+    // initial windowSize = 1
+    cc.markMessageInTransit(msg(1L))
+    assert(!cc.canSend)

Review Comment:
   Addressed in 5542f6cef3 — CongestionControl tracks message *count*, not byte 
size, so a single message of any size still occupies one slot. The byte-size 
limit lives in FlowControl. Added a test that pins the count-only semantics: 
`markMessageInTransit` is a passive setter (it does not consult `canSend`), so 
when callers call it past the window the in-transit count grows but `canSend` 
stays false.



##########
amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/CongestionControlSpec.scala:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.messaginglayer
+
+import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
ChannelIdentity}
+import 
org.apache.texera.amber.engine.architecture.common.WorkflowActor.NetworkMessage
+import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, 
WorkflowFIFOMessage}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class CongestionControlSpec extends AnyFlatSpec {
+
+  private val channelId =
+    ChannelIdentity(ActorVirtualIdentity("from"), ActorVirtualIdentity("to"), 
isControl = false)
+
+  private def msg(id: Long): NetworkMessage =
+    NetworkMessage(id, WorkflowFIFOMessage(channelId, id, 
DataFrame(Array.empty)))
+
+  "CongestionControl.canSend" should "be true initially with empty in-transit 
set" in {
+    val cc = new CongestionControl()
+    assert(cc.canSend)
+  }
+
+  it should "become false once in-transit messages reach the window size" in {
+    val cc = new CongestionControl()
+    // initial windowSize = 1
+    cc.markMessageInTransit(msg(1L))
+    assert(!cc.canSend)
+  }
+
+  "CongestionControl.ack" should "be a no-op for an unknown message id" in {
+    val cc = new CongestionControl()
+    cc.markMessageInTransit(msg(1L))
+    cc.ack(99L)
+    assert(cc.getInTransitMessages.exists(_.messageId == 1L))

Review Comment:
   Addressed in 5542f6cef3 — turns out my previous comment was wrong: 
`CongestionControl.ack` does NOT log unknown ids. The class does not even mix 
in `AmberLogging`; the unknown-id branch just returns silently. Updated the 
spec comment to match reality (silent no-op, no log emitted). No log-capture 
added because there is no log to capture.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to