This is an automated email from the ASF dual-hosted git repository.

xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 3aaac3e7 [Bug] Fix potential negative preAllocatedSize variable (#428)
3aaac3e7 is described below

commit 3aaac3e725b8e408b27be111b003abcdae7fb350
Author: advancedxy <[email protected]>
AuthorDate: Tue Dec 27 19:22:28 2022 +0800

    [Bug] Fix potential negative preAllocatedSize variable (#428)
    
    ### What changes were proposed in this pull request?
    - update `preAllocatedSize` atomically
    - add a new configuration to trigger ShuffleBufferManager's 
flushIfNecessary periodically
    - tweaks existing UTs
    
    ### Why are the changes needed?
    `preAllocatedSize` could be negative in prod env and this affects  memory 
pressure calculation.
    And this commit should fix #229 #426.
    
    ### Does this PR introduce _any_ user-facing change?
    - a new ShuffleServer configuration is introduced
    
    ### How was this patch tested?
    Existing UTs.
---
 .../uniffle/common/ShufflePartitionedData.java     | 11 ++++++
 .../apache/uniffle/test/IntegrationTestBase.java   |  1 +
 .../apache/uniffle/test/ShuffleServerGrpcTest.java |  2 -
 .../test/ShuffleServerWithMemLocalHdfsTest.java    |  1 +
 .../uniffle/test/ShuffleServerWithMemoryTest.java  |  1 +
 .../apache/uniffle/server/ShuffleServerConf.java   |  7 ++++
 .../uniffle/server/ShuffleServerGrpcService.java   | 26 ++++++++-----
 .../apache/uniffle/server/ShuffleTaskManager.java  | 44 ++++++++++++++++++----
 .../server/buffer/PreAllocatedBufferInfo.java      |  1 +
 .../server/buffer/ShuffleBufferManager.java        | 20 +++++-----
 .../uniffle/server/ShuffleTaskManagerTest.java     | 21 +++++++----
 .../server/buffer/ShuffleBufferManagerTest.java    |  8 +++-
 server/src/test/resources/server.conf              |  1 +
 13 files changed, 106 insertions(+), 38 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java 
b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
index 64a19140..3ee1b489 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
@@ -49,4 +49,15 @@ public class ShufflePartitionedData {
     return blockList;
   }
 
+  public long getTotalBlockSize() {
+    if (blockList == null) {
+      return 0L;
+    }
+    long size = 0;
+    for (ShufflePartitionedBlock block : blockList) {
+      size += block.getSize();
+    }
+    return size;
+  }
+
 }
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
index 77eb25f6..d080fb2e 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
@@ -121,6 +121,7 @@ public abstract class IntegrationTestBase extends 
HdfsTestBase {
     serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L * 
1024L);
     serverConf.setBoolean("rss.server.health.check.enable", false);
     serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
+    serverConf.set(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL, 
500L);
     return serverConf;
   }
 
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index fdb83a9f..438dba20 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -399,8 +399,6 @@ public class ShuffleServerGrpcTest extends 
IntegrationTestBase {
     RssSendShuffleDataRequest rssdr = new RssSendShuffleDataRequest(
         "sendDataWithoutRegisterTest", 3, 1000, shuffleToBlocks);
     shuffleServerClient.sendShuffleData(rssdr);
-    assertEquals(132, shuffleServers.get(0).getPreAllocatedMemory());
-    Thread.sleep(10000);
     assertEquals(0, shuffleServers.get(0).getPreAllocatedMemory());
   }
 
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
index 4dc9c706..947612d4 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
@@ -75,6 +75,7 @@ public class ShuffleServerWithMemLocalHdfsTest extends 
ShuffleReadWriteBase {
     
shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
 20.0);
     
shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
 40.0);
     shuffleServerConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 500L);
+    
shuffleServerConf.set(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL, 
500L);
     createShuffleServer(shuffleServerConf);
     startServers();
   }
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
index 81a7e31f..1ffc8540 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
@@ -71,6 +71,7 @@ public class ShuffleServerWithMemoryTest extends 
ShuffleReadWriteBase {
     
shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
 20.0);
     
shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
 40.0);
     shuffleServerConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 500L);
+    
shuffleServerConf.set(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL, 
500L);
     createShuffleServer(shuffleServerConf);
     startServers();
   }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 38454598..88198616 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -321,6 +321,13 @@ public class ShuffleServerConf extends RssBaseConf {
       .withDescription("The max concurrency of single partition writer, the 
data partition file number is "
           + "equal to this value. Default value is 1.");
 
+  public static final ConfigOption<Long> SERVER_TRIGGER_FLUSH_CHECK_INTERVAL = 
ConfigOptions
+          .key("rss.server.shuffleBufferManager.trigger.flush.interval")
+          .longType()
+          .defaultValue(0L)
+          .withDescription("The interval of trigger shuffle buffer manager to 
flush data to persistent storage. If <= 0"
+          + ", then this flush check would be disabled.");
+
   public ShuffleServerConf() {
   }
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 054ffe55..97cd41f9 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -77,6 +77,7 @@ import 
org.apache.uniffle.proto.RssProtos.ShufflePartitionRange;
 import org.apache.uniffle.proto.RssProtos.ShuffleRegisterRequest;
 import org.apache.uniffle.proto.RssProtos.ShuffleRegisterResponse;
 import org.apache.uniffle.proto.ShuffleServerGrpc.ShuffleServerImplBase;
+import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.common.StorageReadMetrics;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
@@ -212,7 +213,9 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
     String responseMessage = "OK";
     if (req.getShuffleDataCount() > 0) {
       ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireSize);
-      boolean isPreAllocated = 
shuffleServer.getShuffleTaskManager().isPreAllocated(requireBufferId);
+      ShuffleTaskManager manager = shuffleServer.getShuffleTaskManager();
+      PreAllocatedBufferInfo info = 
manager.getAndRemovePreAllocatedBuffer(requireBufferId);
+      boolean isPreAllocated = info != null;
       if (!isPreAllocated) {
         String errorMsg = "Can't find requireBufferId[" + requireBufferId + "] 
for appId[" + appId
             + "], shuffleId[" + shuffleId + "]";
@@ -229,13 +232,12 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
       }
       final long start = System.currentTimeMillis();
       List<ShufflePartitionedData> shufflePartitionedData = 
toPartitionedData(req);
+      long alreadyReleasedSize = 0;
       for (ShufflePartitionedData spd : shufflePartitionedData) {
         String shuffleDataInfo = "appId[" + appId + "], shuffleId[" + shuffleId
             + "], partitionId[" + spd.getPartitionId() + "]";
         try {
-          ret = shuffleServer
-              .getShuffleTaskManager()
-              .cacheShuffleData(appId, shuffleId, isPreAllocated, spd);
+          ret = manager.cacheShuffleData(appId, shuffleId, isPreAllocated, 
spd);
           if (ret != StatusCode.SUCCESS) {
             String errorMsg = "Error happened when shuffleEngine.write for "
                 + shuffleDataInfo + ", statusCode=" + ret;
@@ -243,11 +245,11 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
             responseMessage = errorMsg;
             break;
           } else {
-            // remove require bufferId, the memory should be updated already
-            shuffleServer
-                
.getShuffleTaskManager().removeRequireBufferId(requireBufferId);
-            shuffleServer.getShuffleTaskManager().updateCachedBlockIds(
-                appId, shuffleId, spd.getBlockList());
+            long toReleasedSize = spd.getTotalBlockSize();
+            // after each cacheShuffleData call, the `preAllocatedSize` is 
updated timely.
+            manager.releasePreAllocatedSize(toReleasedSize);
+            alreadyReleasedSize += toReleasedSize;
+            manager.updateCachedBlockIds(appId, shuffleId, spd.getBlockList());
           }
         } catch (Exception e) {
           String errorMsg = "Error happened when shuffleEngine.write for "
@@ -258,6 +260,12 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
           break;
         }
       }
+      // since the required buffer id is only used once, the shuffle client 
would try to require another buffer whether
+      // current connection succeeded or not. Therefore, the 
preAllocatedBuffer is first get and removed, then after
+      // cacheShuffleData finishes, the preAllocatedSize should be updated 
accordingly.
+      if (info.getRequireSize() > alreadyReleasedSize) {
+        manager.releasePreAllocatedSize(info.getRequireSize() - 
alreadyReleasedSize);
+      }
       reply = 
SendShuffleDataResponse.newBuilder().setStatus(valueOf(ret)).setRetMsg(responseMessage).build();
       long costTime = System.currentTimeMillis() - start;
       
shuffleServer.getGrpcMetrics().recordProcessTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD,
 costTime);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 5e74eabf..ad93ac53 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -75,6 +75,7 @@ public class ShuffleTaskManager {
   private final ScheduledExecutorService scheduledExecutorService;
   private final ScheduledExecutorService expiredAppCleanupExecutorService;
   private final ScheduledExecutorService leakShuffleDataCheckExecutorService;
+  private ScheduledExecutorService triggerFlushExecutorService;
   private final StorageManager storageManager;
   private AtomicLong requireBufferId = new AtomicLong(0);
   private ShuffleServerConf conf;
@@ -82,6 +83,7 @@ public class ShuffleTaskManager {
   private long preAllocationExpired;
   private long commitCheckIntervalMax;
   private long leakShuffleDataCheckInterval;
+  private long triggerFlushInterval;
   // appId -> shuffleId -> blockIds to avoid too many appId
   // store taskAttemptId info to filter speculation task
   // Roaring64NavigableMap instance will cost much memory,
@@ -109,6 +111,7 @@ public class ShuffleTaskManager {
     this.commitCheckIntervalMax = 
conf.getLong(ShuffleServerConf.SERVER_COMMIT_CHECK_INTERVAL_MAX);
     this.preAllocationExpired = 
conf.getLong(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED);
     this.leakShuffleDataCheckInterval = 
conf.getLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL);
+    this.triggerFlushInterval = 
conf.getLong(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL);
     // the thread for checking application status
     this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
         ThreadUtils.getThreadFactory("checkResource-%d"));
@@ -125,6 +128,13 @@ public class ShuffleTaskManager {
     leakShuffleDataCheckExecutorService.scheduleAtFixedRate(
         () -> checkLeakShuffleData(), leakShuffleDataCheckInterval,
             leakShuffleDataCheckInterval, TimeUnit.MILLISECONDS);
+    if (triggerFlushInterval > 0) {
+      triggerFlushExecutorService = Executors.newSingleThreadScheduledExecutor(
+          ThreadUtils.getThreadFactory("triggerShuffleBufferManagerFlush"));
+      triggerFlushExecutorService.scheduleWithFixedDelay(
+              this::triggerFlush, triggerFlushInterval / 2,
+          triggerFlushInterval, TimeUnit.MILLISECONDS);
+    }
     // the thread for clear expired resources
     clearResourceThread = () -> {
       while (true) {
@@ -193,12 +203,20 @@ public class ShuffleTaskManager {
     return shuffleBufferManager.cacheShuffleData(appId, shuffleId, 
isPreAllocated, spd);
   }
 
-  public boolean isPreAllocated(long requireBufferId) {
-    return requireBufferIds.containsKey(requireBufferId);
+  public PreAllocatedBufferInfo getAndRemovePreAllocatedBuffer(long 
requireBufferId) {
+    return requireBufferIds.remove(requireBufferId);
+  }
+
+  public void releasePreAllocatedSize(long requireSize) {
+    shuffleBufferManager.releasePreAllocatedSize(requireSize);
   }
 
-  public void removeRequireBufferId(long requireId) {
-    requireBufferIds.remove(requireId);
+  @VisibleForTesting
+  void removeAndReleasePreAllocatedBuffer(long requireBufferId) {
+    PreAllocatedBufferInfo info = 
getAndRemovePreAllocatedBuffer(requireBufferId);
+    if (info != null) {
+      releasePreAllocatedSize(info.getRequireSize());
+    }
   }
 
   public StatusCode commitShuffle(String appId, int shuffleId) throws 
Exception {
@@ -528,12 +546,17 @@ public class ShuffleTaskManager {
       for (PreAllocatedBufferInfo info : requireBufferIds.values()) {
         if (current - info.getTimestamp() > preAllocationExpired) {
           removeIds.add(info.getRequireId());
-          shuffleBufferManager.releaseMemory(info.getRequireSize(), false, 
true);
         }
       }
       for (Long requireId : removeIds) {
-        requireBufferIds.remove(requireId);
-        LOG.info("Remove expired requireId " + requireId);
+        PreAllocatedBufferInfo info = requireBufferIds.remove(requireId);
+        if (info != null) {
+          // move release memory code down to here as the requiredBuffer could 
be consumed during removing processing.
+          shuffleBufferManager.releaseMemory(info.getRequireSize(), false, 
true);
+          LOG.info("Remove expired preAllocatedBuffer " + requireId);
+        } else {
+          LOG.info("PreAllocatedBuffer[id={}] has already been removed", 
requireId);
+        }
       }
     } catch (Exception e) {
       LOG.warn("Error happened in preAllocatedBufferCheck", e);
@@ -579,4 +602,11 @@ public class ShuffleTaskManager {
   public ShuffleDataDistributionType getDataDistributionType(String appId) {
     return shuffleTaskInfos.get(appId).getDataDistType();
   }
+
+  private void triggerFlush() {
+    synchronized (this.shuffleBufferManager) {
+      this.shuffleBufferManager.flushIfNecessary();
+    }
+  }
+
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/PreAllocatedBufferInfo.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/PreAllocatedBufferInfo.java
index 3eccafee..bd6f6a33 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/PreAllocatedBufferInfo.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/PreAllocatedBufferInfo.java
@@ -40,4 +40,5 @@ public class PreAllocatedBufferInfo {
   public int getRequireSize() {
     return requireSize;
   }
+
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 500542a0..a8f0df9b 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -115,7 +115,9 @@ public class ShuffleBufferManager {
 
     ShuffleBuffer buffer = entry.getValue();
     long size = buffer.append(spd);
-    updateSize(size, isPreAllocated);
+    if (!isPreAllocated) {
+      updateUsedMemory(size);
+    }
     updateShuffleSize(appId, shuffleId, size);
     synchronized (this) {
       flushSingleBufferIfNecessary(buffer, appId, shuffleId,
@@ -187,7 +189,7 @@ public class ShuffleBufferManager {
     }
   }
 
-  void flushIfNecessary() {
+  public void flushIfNecessary() {
     // if data size in buffer > highWaterMark, do the flush
     if (usedMemory.get() - preAllocatedSize.get() - inFlushSize.get() > 
highWaterMark) {
       LOG.info("Start to flush with usedMemory[{}], preAllocatedSize[{}], 
inFlushSize[{}]",
@@ -340,14 +342,10 @@ public class ShuffleBufferManager {
     }
   }
 
-  void updateSize(long delta, boolean isPreAllocated) {
-    if (isPreAllocated) {
-      releasePreAllocatedSize(delta);
-    } else {
-      // add size if not allocated
-      usedMemory.addAndGet(delta);
-      ShuffleServerMetrics.gaugeUsedBufferSize.set(usedMemory.get());
-    }
+  public void updateUsedMemory(long delta) {
+    // add size if not allocated
+    usedMemory.addAndGet(delta);
+    ShuffleServerMetrics.gaugeUsedBufferSize.set(usedMemory.get());
   }
 
   void requirePreAllocatedSize(long delta) {
@@ -355,7 +353,7 @@ public class ShuffleBufferManager {
     ShuffleServerMetrics.gaugeAllocatedBufferSize.set(preAllocatedSize.get());
   }
 
-  void releasePreAllocatedSize(long delta) {
+  public void releasePreAllocatedSize(long delta) {
     preAllocatedSize.addAndGet(-delta);
     ShuffleServerMetrics.gaugeAllocatedBufferSize.set(preAllocatedSize.get());
   }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 661857d8..86a8dcdf 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -175,9 +175,9 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     // won't flush for partition 1-1
     ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
     
expectedBlocks1.addAll(Lists.newArrayList(partitionedData0.getBlockList()));
-    long requireId = shuffleTaskManager.requireBuffer(35);
+    long bufferId = shuffleTaskManager.requireBuffer(35);
     assertEquals(1, bufferIds.size());
-    PreAllocatedBufferInfo pabi = bufferIds.get(requireId);
+    PreAllocatedBufferInfo pabi = bufferIds.get(bufferId);
     assertEquals(35, pabi.getRequireSize());
     StatusCode sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, 
true, partitionedData0);
     shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
partitionedData0.getBlockList());
@@ -185,6 +185,8 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     assertEquals(1, bufferIds.size());
     assertEquals(StatusCode.SUCCESS, sc);
     shuffleTaskManager.commitShuffle(appId, shuffleId);
+    // manually release the pre allocate buffer
+    shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
 
     ShuffleFlushManager shuffleFlushManager = 
shuffleServer.getShuffleFlushManager();
     assertEquals(1, shuffleFlushManager.getCommittedBlockIds(appId, 
shuffleId).getLongCardinality());
@@ -192,10 +194,11 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     // flush for partition 1-1
     ShufflePartitionedData partitionedData1 = createPartitionedData(1, 2, 35);
     
expectedBlocks1.addAll(Lists.newArrayList(partitionedData1.getBlockList()));
-    shuffleTaskManager.requireBuffer(70);
+    bufferId = shuffleTaskManager.requireBuffer(70);
     sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, 
partitionedData1);
     shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
partitionedData1.getBlockList());
     assertEquals(StatusCode.SUCCESS, sc);
+    shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
     waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1);
 
     // won't flush for partition 1-1
@@ -209,17 +212,19 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     // won't flush for partition 2-2
     ShufflePartitionedData partitionedData3 = createPartitionedData(2, 1, 30);
     
expectedBlocks2.addAll(Lists.newArrayList(partitionedData3.getBlockList()));
-    shuffleTaskManager.requireBuffer(30);
+    bufferId = shuffleTaskManager.requireBuffer(30);
     sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, 
partitionedData3);
     shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
partitionedData3.getBlockList());
+    shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
     assertEquals(StatusCode.SUCCESS, sc);
 
     // flush for partition 2-2
     ShufflePartitionedData partitionedData4 = createPartitionedData(2, 1, 35);
     
expectedBlocks2.addAll(Lists.newArrayList(partitionedData4.getBlockList()));
-    shuffleTaskManager.requireBuffer(35);
+    bufferId = shuffleTaskManager.requireBuffer(35);
     sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, 
partitionedData4);
     shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
partitionedData4.getBlockList());
+    shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
     assertEquals(StatusCode.SUCCESS, sc);
 
     shuffleTaskManager.commitShuffle(appId, shuffleId);
@@ -230,9 +235,10 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     ShufflePartitionedData partitionedData5 = createPartitionedData(1, 2, 35);
     shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
partitionedData5.getBlockList());
     
expectedBlocks1.addAll(Lists.newArrayList(partitionedData5.getBlockList()));
-    shuffleTaskManager.requireBuffer(70);
+    bufferId = shuffleTaskManager.requireBuffer(70);
     sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, 
partitionedData5);
     assertEquals(StatusCode.SUCCESS, sc);
+    shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
 
     // 2 new blocks should be committed
     waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1 + 3 + 2);
@@ -245,9 +251,10 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     // flush for partition 0-1
     ShufflePartitionedData partitionedData7 = createPartitionedData(1, 2, 35);
     shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 
partitionedData7.getBlockList());
-    shuffleTaskManager.requireBuffer(70);
+    bufferId = shuffleTaskManager.requireBuffer(70);
     sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, 
partitionedData7);
     assertEquals(StatusCode.SUCCESS, sc);
+    shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
 
     // 2 new blocks should be committed
     waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1 + 3 + 2 + 2);
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 4b6c0a77..e00bf9d8 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -336,9 +336,9 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     // receive data with preAllocation
     shuffleBufferManager.cacheShuffleData(appId, shuffleId, true, 
createData(0, 16));
     assertEquals(48, shuffleBufferManager.getUsedMemory());
-    assertEquals(0, shuffleBufferManager.getPreAllocatedSize());
+    assertEquals(48, shuffleBufferManager.getPreAllocatedSize());
     // release memory
-    shuffleBufferManager.releaseMemory(48, false, false);
+    shuffleBufferManager.releaseMemory(48, false, true);
     assertEquals(0, shuffleBufferManager.getUsedMemory());
     assertEquals(0, shuffleBufferManager.getPreAllocatedSize());
     // receive data without preAllocation
@@ -367,12 +367,16 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
 
     // actual data size < spillThreshold, won't flush
     sc = shuffleBufferManager.cacheShuffleData(appId, shuffleId, true, 
createData(1, 16));
+    shuffleBufferManager.releasePreAllocatedSize(48);
     assertEquals(StatusCode.SUCCESS, sc);
     assertEquals(500, shuffleBufferManager.getUsedMemory());
     assertEquals(452, shuffleBufferManager.getPreAllocatedSize());
 
     // actual data size > highWaterMark, flush
     shuffleBufferManager.cacheShuffleData(appId, shuffleId, true, 
createData(0, 400));
+    shuffleBufferManager.releasePreAllocatedSize(432);
+    // trigger flush manually
+    shuffleBufferManager.flushIfNecessary();
     assertEquals(StatusCode.SUCCESS, sc);
     assertEquals(500, shuffleBufferManager.getUsedMemory());
     assertEquals(20, shuffleBufferManager.getPreAllocatedSize());
diff --git a/server/src/test/resources/server.conf 
b/server/src/test/resources/server.conf
index 2838ba07..ca7a47bf 100644
--- a/server/src/test/resources/server.conf
+++ b/server/src/test/resources/server.conf
@@ -26,3 +26,4 @@ rss.jetty.http.port 12345
 rss.jetty.corePool.size 64
 rss.server.heartbeat.timeout 1
 rss.server.write.timeout 2000
+rss.server.shuffleBufferManager.trigger.flush.interval 500

Reply via email to