This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 6ceadd3b2 [CELEBORN-1487][FOLLOWUP] Fix updateProduceBytes
6ceadd3b2 is described below

commit 6ceadd3b2a8d87195b0c5d126fe5f5efb4585247
Author: caohaotian <[email protected]>
AuthorDate: Sat May 10 21:59:17 2025 +0800

    [CELEBORN-1487][FOLLOWUP] Fix updateProduceBytes
    
    ### What changes were proposed in this pull request?
    In the `updateProduceBytes` method, `userBufferInfo.bufferStatusHub` and 
`workerBufferStatusHub` add the same node. This results in the lastNode's 
numBytes being repeatedly accumulated in the `_deque` of both hubs if 
updateProduceBytes is called multiple times within one second, since the 
`lastNode` is the same object.
    
    This will cause the numBytes of `nodeToSeparate` to become negative when 
the `removeExpiredNodes` method is called, specifically at the line 
`nodeToSeparate.separateNode(removed.getRight());` , which will make 
avgBytesPerSec return 0.
    
    As shown in the figure below, this value remains at 0 for a long period of 
time in the Dashborad.
    
    
![image](https://github.com/user-attachments/assets/4d85d4d1-45f9-4ba0-8cc7-ad2ceb504abe)
    
    ### Why are the changes needed?
    Fix updateProduceBytes method.
    
    ### Does this PR introduce _any_ user-facing change?
    NO
    
    ### How was this patch tested?
    UT
    AND Dashborad:
    
![image](https://github.com/user-attachments/assets/08212f13-39d3-42a8-b178-8ec8be6d0822)
    
    Closes #3240 from vastian180/CELEBORN-1487-FOLLOWUP.
    
    Authored-by: caohaotian <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../UserCongestionControlContext.java              |  2 +-
 .../congestcontrol/TestCongestionController.java   | 22 ++++++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserCongestionControlContext.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserCongestionControlContext.java
index 340e1e98b..7017b4962 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserCongestionControlContext.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserCongestionControlContext.java
@@ -67,7 +67,7 @@ public class UserCongestionControlContext {
     long timeNow = System.currentTimeMillis();
     BufferStatusHub.BufferStatusNode node = new 
BufferStatusHub.BufferStatusNode(numBytes);
     userBufferInfo.updateInfo(timeNow, node);
-    workerBufferStatusHub.add(timeNow, node);
+    workerBufferStatusHub.add(timeNow, (BufferStatusHub.BufferStatusNode) 
node.clone());
   }
 
   public UserBufferInfo getUserBufferInfo() {
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
index 9fa3bfe2c..80a5df915 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
@@ -338,6 +338,28 @@ public class TestCongestionController {
     Assert.assertFalse(controller1.isUserCongested(context1));
   }
 
+  @Test
+  public void testUpdateProduceBytes() throws InterruptedException {
+    UserIdentifier user = new UserIdentifier("test", "celeborn");
+    UserCongestionControlContext userCongestionControlContext =
+        controller.getUserCongestionContext(user);
+    userCongestionControlContext.updateProduceBytes(1000);
+    userCongestionControlContext.updateProduceBytes(1000);
+    userCongestionControlContext.updateProduceBytes(1000);
+    userCongestionControlContext.updateProduceBytes(1000);
+
+    int timeWindowsInMills =
+        
userCongestionControlContext.getUserBufferInfo().getBufferStatusHub().timeWindowsInMills;
+    // Sleep to trigger removeExpiredNodes
+    Thread.sleep(timeWindowsInMills / 2 + 1);
+    userCongestionControlContext.updateProduceBytes(1000);
+    Thread.sleep(timeWindowsInMills / 2 + 1);
+
+    Assert.assertTrue(
+        
userCongestionControlContext.getUserBufferInfo().getBufferStatusHub().avgBytesPerSec()
 > 0);
+    Assert.assertTrue(controller.getProducedBufferStatusHub().avgBytesPerSec() 
> 0);
+  }
+
   private void clearBufferStatus(CongestionController controller) {
     controller.getProducedBufferStatusHub().clear();
     controller.getConsumedBufferStatusHub().clear();

Reply via email to