This is an automated email from the ASF dual-hosted git repository.

xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 887d04284 [#2496] improvement(server): Improve the performance of 
flushing single buffer (#2523)
887d04284 is described below

commit 887d04284eeee6a9d8886d9fe78edc7e309e6901
Author: xianjingfeng <[email protected]>
AuthorDate: Fri Jun 27 10:12:41 2025 +0800

    [#2496] improvement(server): Improve the performance of flushing single 
buffer (#2523)
    
    ### What changes were proposed in this pull request?
    Improve the performance of flushing single buffer
    
    ### Why are the changes needed?
    Fix: #2496
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    CI
---
 .../apache/uniffle/server/HugePartitionUtils.java  |  8 ++--
 .../org/apache/uniffle/server/ShuffleTaskInfo.java |  8 ++--
 .../server/buffer/ShuffleBufferManager.java        | 50 ++++++++++++----------
 3 files changed, 38 insertions(+), 28 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java 
b/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
index 59a010cff..dd5477561 100644
--- a/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
+++ b/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
@@ -52,9 +52,11 @@ public class HugePartitionUtils {
    */
   public static boolean isHugePartition(
       ShuffleTaskManager shuffleTaskManager, String appId, int shuffleId, int 
partitionId) {
-    return shuffleTaskManager != null
-        && shuffleTaskManager.getShuffleTaskInfo(appId) != null
-        && 
shuffleTaskManager.getShuffleTaskInfo(appId).isHugePartition(shuffleId, 
partitionId);
+    if (shuffleTaskManager == null) {
+      return false;
+    }
+    ShuffleTaskInfo taskInfo = shuffleTaskManager.getShuffleTaskInfo(appId);
+    return taskInfo != null && taskInfo.isHugePartition(shuffleId, 
partitionId);
   }
 
   /**
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index 687622bb0..e748a892f 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -243,9 +243,11 @@ public class ShuffleTaskInfo {
   }
 
   public boolean isHugePartition(int shuffleId, int partitionId) {
-    return existHugePartition.get()
-        && hugePartitionTags.containsKey(shuffleId)
-        && hugePartitionTags.get(shuffleId).contains(partitionId);
+    if (!existHugePartition.get()) {
+      return false;
+    }
+    Set<Integer> partitions = hugePartitionTags.get(shuffleId);
+    return partitions != null && partitions.contains(partitionId);
   }
 
   public Set<Integer> getShuffleIds() {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 0b6b246b5..54dde2d9d 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -303,15 +303,13 @@ public class ShuffleBufferManager {
         shuffleId,
         spd.getPartitionId());
     updateShuffleSize(appId, shuffleId, size);
-    synchronized (this) {
-      flushSingleBufferIfNecessary(
-          buffer,
-          appId,
-          shuffleId,
-          spd.getPartitionId(),
-          entry.getKey().lowerEndpoint(),
-          entry.getKey().upperEndpoint());
-    }
+    flushSingleBufferIfNecessary(
+        buffer,
+        appId,
+        shuffleId,
+        spd.getPartitionId(),
+        entry.getKey().lowerEndpoint(),
+        entry.getKey().upperEndpoint());
     if (bufferFlushWhenCachingData && needToFlush()) {
       flushIfNecessary();
     }
@@ -374,24 +372,32 @@ public class ShuffleBufferManager {
       int partitionId,
       int startPartition,
       int endPartition) {
+    if (!(buffer.getEncodedLength() > this.bufferFlushThreshold
+        || buffer.getBlockCount() > bufferFlushBlocksNumThreshold)) {
+      return;
+    }
     boolean isHugePartition =
         HugePartitionUtils.isHugePartition(shuffleTaskManager, appId, 
shuffleId, partitionId);
+    if (!(isHugePartition || this.bufferFlushEnabled)) {
+      return;
+    }
     // When we use multi storage and trigger single buffer flush, the buffer 
size should be bigger
     // than rss.server.flush.cold.storage.threshold.size, otherwise cold 
storage will be useless.
-    if ((isHugePartition || this.bufferFlushEnabled)
-        && (buffer.getEncodedLength() > this.bufferFlushThreshold
-            || buffer.getBlockCount() > bufferFlushBlocksNumThreshold)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(
-            "Start to flush single buffer. Details - shuffleId:{}, 
startPartition:{}, endPartition:{}, isHugePartition:{}, bufferSize:{}, 
blocksNum:{}",
-            shuffleId,
-            startPartition,
-            endPartition,
-            isHugePartition,
-            buffer.getEncodedLength(),
-            buffer.getBlockCount());
+    synchronized (buffer) {
+      if (buffer.getEncodedLength() > this.bufferFlushThreshold
+          || buffer.getBlockCount() > bufferFlushBlocksNumThreshold) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Start to flush single buffer. Details - shuffleId:{}, 
startPartition:{}, endPartition:{}, isHugePartition:{}, bufferSize:{}, 
blocksNum:{}",
+              shuffleId,
+              startPartition,
+              endPartition,
+              isHugePartition,
+              buffer.getEncodedLength(),
+              buffer.getBlockCount());
+        }
+        flushBuffer(buffer, appId, shuffleId, startPartition, endPartition, 
isHugePartition);
       }
-      flushBuffer(buffer, appId, shuffleId, startPartition, endPartition, 
isHugePartition);
     }
   }
 

Reply via email to