This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 6ceadd3b2 [CELEBORN-1487][FOLLOWUP] Fix updateProduceBytes
6ceadd3b2 is described below
commit 6ceadd3b2a8d87195b0c5d126fe5f5efb4585247
Author: caohaotian <[email protected]>
AuthorDate: Sat May 10 21:59:17 2025 +0800
[CELEBORN-1487][FOLLOWUP] Fix updateProduceBytes
### What changes were proposed in this pull request?
In the `updateProduceBytes` method, `userBufferInfo.bufferStatusHub` and
`workerBufferStatusHub` add the same node. This results in the lastNode's
numBytes being repeatedly accumulated in the `_deque` of both hubs if
updateProduceBytes is called multiple times within one second, since the
`lastNode` is the same object.
This will cause the numBytes of `nodeToSeparate` to become negative when
the `removeExpiredNodes` method is called, specifically at the line
`nodeToSeparate.separateNode(removed.getRight());` , which will make
avgBytesPerSec return 0.
As shown in the figure below, this value remains at 0 for a long period of
time in the Dashborad.

### Why are the changes needed?
Fix updateProduceBytes method.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
UT
AND Dashborad:

Closes #3240 from vastian180/CELEBORN-1487-FOLLOWUP.
Authored-by: caohaotian <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../UserCongestionControlContext.java | 2 +-
.../congestcontrol/TestCongestionController.java | 22 ++++++++++++++++++++++
2 files changed, 23 insertions(+), 1 deletion(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserCongestionControlContext.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserCongestionControlContext.java
index 340e1e98b..7017b4962 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserCongestionControlContext.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserCongestionControlContext.java
@@ -67,7 +67,7 @@ public class UserCongestionControlContext {
long timeNow = System.currentTimeMillis();
BufferStatusHub.BufferStatusNode node = new
BufferStatusHub.BufferStatusNode(numBytes);
userBufferInfo.updateInfo(timeNow, node);
- workerBufferStatusHub.add(timeNow, node);
+ workerBufferStatusHub.add(timeNow, (BufferStatusHub.BufferStatusNode)
node.clone());
}
public UserBufferInfo getUserBufferInfo() {
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
index 9fa3bfe2c..80a5df915 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
@@ -338,6 +338,28 @@ public class TestCongestionController {
Assert.assertFalse(controller1.isUserCongested(context1));
}
+ @Test
+ public void testUpdateProduceBytes() throws InterruptedException {
+ UserIdentifier user = new UserIdentifier("test", "celeborn");
+ UserCongestionControlContext userCongestionControlContext =
+ controller.getUserCongestionContext(user);
+ userCongestionControlContext.updateProduceBytes(1000);
+ userCongestionControlContext.updateProduceBytes(1000);
+ userCongestionControlContext.updateProduceBytes(1000);
+ userCongestionControlContext.updateProduceBytes(1000);
+
+ int timeWindowsInMills =
+
userCongestionControlContext.getUserBufferInfo().getBufferStatusHub().timeWindowsInMills;
+ // Sleep to trigger removeExpiredNodes
+ Thread.sleep(timeWindowsInMills / 2 + 1);
+ userCongestionControlContext.updateProduceBytes(1000);
+ Thread.sleep(timeWindowsInMills / 2 + 1);
+
+ Assert.assertTrue(
+
userCongestionControlContext.getUserBufferInfo().getBufferStatusHub().avgBytesPerSec()
> 0);
+ Assert.assertTrue(controller.getProducedBufferStatusHub().avgBytesPerSec()
> 0);
+ }
+
private void clearBufferStatus(CongestionController controller) {
controller.getProducedBufferStatusHub().clear();
controller.getConsumedBufferStatusHub().clear();