Copilot commented on code in PR #3397:
URL: https://github.com/apache/celeborn/pull/3397#discussion_r2239471497


##########
common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java:
##########
@@ -262,12 +274,15 @@ public void cleanup() {
         totalInflightReqs.sum(),
         inflightBatchesPerAddress.values().stream().mapToInt(Set::size).sum());
     cleaned = true;
-    inflightBatchesPerAddress.clear();
     pushStrategy.clear();
-    if (maxInFlightBytesSizeEnabled) {
-      logger.info("Cleanup {} bytes in flight.", totalInflightBytes.sum());
-      inflightBytesSizePerAddress.clear();
-      inflightBatchBytesSizes.clear();
+    synchronized (lock) {

Review Comment:
   The cleanup method moves `inflightBatchesPerAddress.clear()` inside the 
synchronized block, but the `cleaned = true` assignment and 
`pushStrategy.clear()` remain outside. This creates a potential race condition 
where other threads might see `cleaned = true` but still access 
`inflightBatchesPerAddress` before it's cleared. Consider moving all cleanup 
operations inside the synchronized block or ensuring proper ordering.



##########
common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java:
##########
@@ -75,36 +77,42 @@ public InFlightRequestTracker(CelebornConf conf, PushState 
pushState) {
   }
 
   public void addBatch(int batchId, int batchBytesSize, String 
hostAndPushPort) {
-    Set<Integer> batchIdSetPerPair =
-        inflightBatchesPerAddress.computeIfAbsent(
-            hostAndPushPort, id -> ConcurrentHashMap.newKeySet());
-    batchIdSetPerPair.add(batchId);
-    totalInflightReqs.increment();
-    if (maxInFlightBytesSizeEnabled) {
-      LongAdder bytesSizePerPair =
-          inflightBytesSizePerAddress.computeIfAbsent(hostAndPushPort, id -> 
new LongAdder());
-      bytesSizePerPair.add(batchBytesSize);
-      inflightBatchBytesSizes.put(batchId, batchBytesSize);
-      totalInflightBytes.add(batchBytesSize);
+    synchronized (lock) {
+      Set<Integer> batchIdSetPerPair =
+          inflightBatchesPerAddress.computeIfAbsent(
+              hostAndPushPort, id -> ConcurrentHashMap.newKeySet());
+      batchIdSetPerPair.add(batchId);
+      totalInflightReqs.increment();
+      if (maxInFlightBytesSizeEnabled) {
+        LongAdder bytesSizePerPair =
+            inflightBytesSizePerAddress.computeIfAbsent(hostAndPushPort, id -> 
new LongAdder());
+        bytesSizePerPair.add(batchBytesSize);
+        inflightBatchBytesSizes.put(batchId, batchBytesSize);
+        totalInflightBytes.add(batchBytesSize);
+      }
+      lock.notifyAll();

Review Comment:
   The `notifyAll()` is called inside the `addBatch` method even when no batch 
was actually added (when `batchIdSet.add(batchId)` returns false). This could 
lead to unnecessary thread wake-ups. Consider only calling `notifyAll()` when a 
batch is actually added to improve efficiency.



##########
common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java:
##########
@@ -75,36 +77,42 @@ public InFlightRequestTracker(CelebornConf conf, PushState 
pushState) {
   }
 
   public void addBatch(int batchId, int batchBytesSize, String 
hostAndPushPort) {
-    Set<Integer> batchIdSetPerPair =
-        inflightBatchesPerAddress.computeIfAbsent(
-            hostAndPushPort, id -> ConcurrentHashMap.newKeySet());
-    batchIdSetPerPair.add(batchId);
-    totalInflightReqs.increment();
-    if (maxInFlightBytesSizeEnabled) {
-      LongAdder bytesSizePerPair =
-          inflightBytesSizePerAddress.computeIfAbsent(hostAndPushPort, id -> 
new LongAdder());
-      bytesSizePerPair.add(batchBytesSize);
-      inflightBatchBytesSizes.put(batchId, batchBytesSize);
-      totalInflightBytes.add(batchBytesSize);
+    synchronized (lock) {
+      Set<Integer> batchIdSetPerPair =
+          inflightBatchesPerAddress.computeIfAbsent(
+              hostAndPushPort, id -> ConcurrentHashMap.newKeySet());
+      batchIdSetPerPair.add(batchId);
+      totalInflightReqs.increment();
+      if (maxInFlightBytesSizeEnabled) {
+        LongAdder bytesSizePerPair =
+            inflightBytesSizePerAddress.computeIfAbsent(hostAndPushPort, id -> 
new LongAdder());
+        bytesSizePerPair.add(batchBytesSize);
+        inflightBatchBytesSizes.put(batchId, batchBytesSize);
+        totalInflightBytes.add(batchBytesSize);
+      }
+      lock.notifyAll();
     }
   }
 
   public void removeBatch(int batchId, String hostAndPushPort) {
-    Set<Integer> batchIdSet = inflightBatchesPerAddress.get(hostAndPushPort);
-    if (batchIdSet != null) {
-      batchIdSet.remove(batchId);
-    } else {
-      logger.info("Batches of {} in flight is null.", hostAndPushPort);
-    }
-    totalInflightReqs.decrement();
-    if (maxInFlightBytesSizeEnabled) {
-      int inflightBatchBytesSize =
-          
-Optional.ofNullable(inflightBatchBytesSizes.remove(batchId)).orElse(0);
-      LongAdder inflightBytesSize = 
inflightBytesSizePerAddress.get(hostAndPushPort);
-      if (inflightBytesSize != null) {
-        inflightBytesSize.add(inflightBatchBytesSize);
+    synchronized (lock) {
+      Set<Integer> batchIdSet = inflightBatchesPerAddress.get(hostAndPushPort);
+      if (batchIdSet != null) {
+        batchIdSet.remove(batchId);
+      } else {
+        logger.info("Batches of {} in flight is null.", hostAndPushPort);
+      }
+      totalInflightReqs.decrement();
+      if (maxInFlightBytesSizeEnabled) {
+        int inflightBatchBytesSize =
+            
-Optional.ofNullable(inflightBatchBytesSizes.remove(batchId)).orElse(0);
+        LongAdder inflightBytesSize = 
inflightBytesSizePerAddress.get(hostAndPushPort);
+        if (inflightBytesSize != null) {
+          inflightBytesSize.add(inflightBatchBytesSize);
+        }
+        totalInflightBytes.add(inflightBatchBytesSize);
       }
-      totalInflightBytes.add(inflightBatchBytesSize);
+      lock.notifyAll();

Review Comment:
   The `notifyAll()` is called in `removeBatch` even when no batch was actually 
removed (when `batchIdSet` is null or doesn't contain the `batchId`). This 
could cause unnecessary thread wake-ups. Consider only calling `notifyAll()` 
when a batch is actually removed.
   ```suggestion
         if (batchRemoved) {
           lock.notifyAll();
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to