This is an automated email from the ASF dual-hosted git repository.

xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 41d0fc53f [#2558] improvement(server): Limit the max flush event count 
for a single buffer (#2562)
41d0fc53f is described below

commit 41d0fc53fd8ea7db164b3b510d8a351816cd1dd9
Author: xianjingfeng <[email protected]>
AuthorDate: Mon Aug 4 11:20:56 2025 +0800

    [#2558] improvement(server): Limit the max flush event count for a single 
buffer (#2562)
    
    ### What changes were proposed in this pull request?
    Limit the max flush event count for a single buffer.
    
    ### Why are the changes needed?
    Fix: #2558
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Verify in production environment.
---
 .../ShuffleServerConcurrentWriteOfHadoopTest.java  |  1 +
 .../apache/uniffle/server/ShuffleServerConf.java   |  6 ++++
 .../uniffle/server/buffer/ShuffleBuffer.java       |  2 ++
 .../server/buffer/ShuffleBufferManager.java        | 39 ++++++++++++++++------
 .../server/buffer/ShuffleBufferWithLinkedList.java |  5 +++
 .../server/buffer/ShuffleBufferWithSkipList.java   |  5 +++
 6 files changed, 47 insertions(+), 11 deletions(-)

diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
index 7959688a3..c9c743806 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerConcurrentWriteOfHadoopTest.java
@@ -87,6 +87,7 @@ public class ShuffleServerConcurrentWriteOfHadoopTest extends 
ShuffleServerWithH
         ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION, 
MAX_CONCURRENCY);
     
shuffleServerConf.setBoolean(shuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED, 
true);
     
shuffleServerConf.setLong(shuffleServerConf.SINGLE_BUFFER_FLUSH_SIZE_THRESHOLD, 
1024 * 1024L);
+    
shuffleServerConf.setInteger(shuffleServerConf.MAX_FLUSH_EVENT_COUNT_PER_BUFFER,
 -1);
     return shuffleServerConf;
   }
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index ea7bed551..114160c7a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -445,6 +445,12 @@ public class ShuffleServerConf extends RssBaseConf {
                   + "This can cause severe garbage collection issues on the 
server side, and may even lead to out-of-heap-memory errors. "
                   + "If the threshold is set too high, it becomes meaningless. 
It won't be enabled by default.");
 
+  public static final ConfigOption<Integer> MAX_FLUSH_EVENT_COUNT_PER_BUFFER =
+      ConfigOptions.key("rss.server.buffer.maxFlushEventCountPerBuffer")
+          .intType()
+          .defaultValue(5)
+          .withDescription("The max flush event count for a single buffer.");
+
   public static final ConfigOption<Long> 
SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL =
       ConfigOptions.key("rss.server.leak.shuffledata.check.interval")
           .longType()
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index c181aba42..141d926ca 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -65,6 +65,8 @@ public interface ShuffleBuffer {
 
   long getInFlushBlockCount();
 
+  int getInFlushEventCount();
+
   long release();
 
   void clearInFlushBuffer(long eventId);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 32112febe..5cfbcb12f 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -72,6 +72,7 @@ public class ShuffleBufferManager {
   private final ShuffleBufferType shuffleBufferType;
   private final Boolean isLABEnabled;
   private final int flushTryLockTimeout;
+  private final int maxFlushEventCountPerBuffer;
   private ShuffleTaskManager shuffleTaskManager;
   private final ShuffleFlushManager shuffleFlushManager;
   private long capacity;
@@ -228,6 +229,7 @@ public class ShuffleBufferManager {
       int maxAlloc = (int) (chunkSize * maxAllocRatio);
       ChunkCreator.initialize(chunkSize, (long) (capacity * 
chunkPoolCapacityRatio), maxAlloc);
     }
+    this.maxFlushEventCountPerBuffer = 
conf.get(ShuffleServerConf.MAX_FLUSH_EVENT_COUNT_PER_BUFFER);
   }
 
   public void setShuffleTaskManager(ShuffleTaskManager taskManager) {
@@ -450,7 +452,7 @@ public class ShuffleBufferManager {
     }
   }
 
-  protected void flushBuffer(
+  protected boolean flushBuffer(
       ShuffleBuffer buffer,
       String appId,
       int shuffleId,
@@ -465,7 +467,16 @@ public class ShuffleBufferManager {
             "Shuffle[{}] for app[{}] has already been removed, no need to 
flush the buffer",
             shuffleId,
             appId);
-        return;
+        return false;
+      }
+      int flushEventCount = buffer.getInFlushEventCount();
+      if (maxFlushEventCountPerBuffer > 0 && flushEventCount >= 
maxFlushEventCountPerBuffer) {
+        LOG.warn(
+            "Shuffle[{}] for app[{}] already has [{}] flush events, ignore 
it.",
+            shuffleId,
+            appId,
+            flushEventCount);
+        return false;
       }
       ShuffleDataFlushEvent event =
           buffer.toFlushEvent(
@@ -484,10 +495,12 @@ public class ShuffleBufferManager {
         }
         ShuffleServerMetrics.gaugeInFlushBufferSize.set(inFlushSize.get());
         shuffleFlushManager.addToFlushQueue(event);
+        return true;
       }
     } finally {
       readLock.unlock();
     }
+    return false;
   }
 
   public void removeBuffer(String appId) {
@@ -664,15 +677,19 @@ public class ShuffleBufferManager {
                   shuffleIdToBuffers.getValue().asMapOfRanges().entrySet()) {
                 Range<Integer> range = rangeEntry.getKey();
                 ShuffleBuffer shuffleBuffer = rangeEntry.getValue();
-                pickedFlushSize += shuffleBuffer.getEncodedLength();
-                flushBuffer(
-                    shuffleBuffer,
-                    appId,
-                    shuffleId,
-                    range.lowerEndpoint(),
-                    range.upperEndpoint(),
-                    HugePartitionUtils.isHugePartition(
-                        shuffleTaskManager, appId, shuffleId, 
range.lowerEndpoint()));
+                long bufferEncodedLength = shuffleBuffer.getEncodedLength();
+                boolean success =
+                    flushBuffer(
+                        shuffleBuffer,
+                        appId,
+                        shuffleId,
+                        range.lowerEndpoint(),
+                        range.upperEndpoint(),
+                        HugePartitionUtils.isHugePartition(
+                            shuffleTaskManager, appId, shuffleId, 
range.lowerEndpoint()));
+                if (success) {
+                  pickedFlushSize += bufferEncodedLength;
+                }
                 if (pickedFlushSize > expectedFlushSize) {
                   LOG.info("Already picked enough buffers to flush {} bytes", 
pickedFlushSize);
                   return;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index 2216e2907..6485eef55 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -151,6 +151,11 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
     return inFlushBlockMap.values().stream().mapToLong(Set::size).sum();
   }
 
+  @Override
+  public int getInFlushEventCount() {
+    return inFlushBlockMap.size();
+  }
+
   @Override
   public synchronized long release() {
     Throwable lastException = null;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index 33ba313ce..9c65eeee0 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -149,6 +149,11 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
     return inFlushBlockMap.values().stream().mapToLong(Map::size).sum();
   }
 
+  @Override
+  public int getInFlushEventCount() {
+    return inFlushBlockMap.size();
+  }
+
   @Override
   public synchronized long release() {
     Throwable lastException = null;

Reply via email to