This is an automated email from the ASF dual-hosted git repository.

xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new ad7babb7d [#2496] improvement(server): Use tryLock instead of 
synchronized on buffer flushing (#2524)
ad7babb7d is described below

commit ad7babb7d7c177df196cfb8cb396bc2089d4d501
Author: xianjingfeng <[email protected]>
AuthorDate: Mon Jun 30 14:35:14 2025 +0800

    [#2496] improvement(server): Use tryLock instead of synchronized on buffer 
flushing (#2524)
    
    ### What changes were proposed in this pull request?
    Use tryLock instead of synchronized when flush buffer
    
    ### Why are the changes needed?
    To followup the #2523 for better performance, this optimization is 
mentioned in #2523 (comment)
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    CI
---
 .../server/buffer/ShuffleBufferManager.java        | 35 +++++++++++++++-------
 1 file changed, 25 insertions(+), 10 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 54dde2d9d..32112febe 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -26,6 +26,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -99,6 +100,7 @@ public class ShuffleBufferManager {
   // appId -> shuffleId -> shuffle size in buffer
   protected Map<String, Map<Integer, AtomicLong>> shuffleSizeMap = 
JavaUtils.newConcurrentMap();
   private final boolean appBlockSizeMetricEnabled;
+  private final ReentrantLock globalFlushLock = new ReentrantLock();
 
   public ShuffleBufferManager(
       ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager, boolean 
nettyServerEnabled) {
@@ -406,16 +408,29 @@ public class ShuffleBufferManager {
     return usedMemory.get() - preAllocatedSize.get() - inFlushSize.get() > 
highWaterMark;
   }
 
-  public synchronized void flushIfNecessary() {
-    if (needToFlush()) {
-      // todo: add a metric here to track how many times flush occurs.
-      LOG.info(
-          "Start to flush with usedMemory[{}], preAllocatedSize[{}], 
inFlushSize[{}]",
-          usedMemory.get(),
-          preAllocatedSize.get(),
-          inFlushSize.get());
-      Map<String, Set<Integer>> pickedShuffle = pickFlushedShuffle();
-      flush(pickedShuffle);
+  public void flushIfNecessary() {
+    boolean lockAcquired = false;
+    try {
+      lockAcquired = globalFlushLock.tryLock(flushTryLockTimeout, 
TimeUnit.MILLISECONDS);
+      if (!lockAcquired) {
+        return;
+      }
+      if (needToFlush()) {
+        // todo: add a metric here to track how many times flush occurs.
+        LOG.info(
+            "Start to flush with usedMemory[{}], preAllocatedSize[{}], 
inFlushSize[{}]",
+            usedMemory.get(),
+            preAllocatedSize.get(),
+            inFlushSize.get());
+        Map<String, Set<Integer>> pickedShuffle = pickFlushedShuffle();
+        flush(pickedShuffle);
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("Ignore the InterruptedException which should be caused by 
internal killed");
+    } finally {
+      if (lockAcquired) {
+        globalFlushLock.unlock();
+      }
     }
   }
 

Reply via email to