This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new e802b93d [ISSUE-378][HugePartition][Part-2] Introduce memory usage 
limit and data flush (#471)
e802b93d is described below

commit e802b93d7050dc48a62b1abe2693dc96c706151f
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Jan 17 10:02:36 2023 +0800

    [ISSUE-378][HugePartition][Part-2] Introduce memory usage limit and data 
flush (#471)
    
    
    ### What changes were proposed in this pull request?
    1. Introduce memory usage limit for huge partition to keep the regular 
partition writing stable
    2. Once partition is marked as huge-partition, when its buffer size is 
greater than `rss.server.single.buffer.flush.threshold` value,  single-buffer 
flush will be triggered whatever the single buffer flush is enabled or not
    
    ### Why are the changes needed?
    1. To solve the problems mentioned by #378
    
    ### Does this PR introduce _any_ user-facing change?
    Yes.
    
    ### How was this patch tested?
    1. UTs
---
 .../apache/uniffle/common/util/TripleFunction.java | 25 ++++++++
 docs/server_guide.md                               |  2 +
 .../client/impl/grpc/ShuffleServerGrpcClient.java  | 45 ++++++++++++--
 proto/src/main/proto/Rss.proto                     |  3 +
 .../apache/uniffle/server/ShuffleServerConf.java   | 14 +++++
 .../uniffle/server/ShuffleServerGrpcService.java   | 18 +++++-
 .../apache/uniffle/server/ShuffleTaskManager.java  | 31 +++++++++-
 .../server/buffer/ShuffleBufferManager.java        | 72 ++++++++++++++++++++--
 .../uniffle/server/ShuffleTaskManagerTest.java     | 43 +++++++++++++
 .../server/buffer/ShuffleBufferManagerTest.java    | 67 ++++++++++++++++++++
 10 files changed, 306 insertions(+), 14 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/util/TripleFunction.java 
b/common/src/main/java/org/apache/uniffle/common/util/TripleFunction.java
new file mode 100644
index 00000000..56ece5f6
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/util/TripleFunction.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+@FunctionalInterface
+public interface TripleFunction<T, U, E, R> {
+
+  R accept(T t, U u, E e);
+
+}
diff --git a/docs/server_guide.md b/docs/server_guide.md
index 0e0b08cd..09decaae 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -86,6 +86,8 @@ This document will introduce how to deploy Uniffle shuffle 
servers.
 |rss.server.leak.shuffledata.check.interval|3600000|The interval of leak 
shuffle data check (ms)|
 |rss.server.max.concurrency.of.single.partition.writer|1|The max concurrency 
of single partition writer, the data partition file number is equal to this 
value. Default value is 1. This config could improve the writing speed, 
especially for huge partition.|
 |rss.metrics.reporter.class|-|The class of metrics reporter.|
+|rss.server.huge-partition.size.threshold|20g|Threshold of huge partition 
size, once exceeding threshold, memory usage limitation and huge partition 
buffer flushing will be triggered.|
+|rss.server.huge-partition.memory.limit.ratio|0.2|The memory usage limit ratio 
for huge partition, it will only triggered when partition's size exceeds the 
threshold of 'rss.server.huge-partition.size.threshold'|
 
 ### Advanced Configurations
 |Property Name|Default| Description                                            
                                                                                
                                                     |
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 74fe2767..57dc8ed5 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -17,6 +17,8 @@
 
 package org.apache.uniffle.client.impl.grpc;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -179,8 +181,33 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
     return blockingStub.withDeadlineAfter(timeout, 
TimeUnit.MILLISECONDS).appHeartbeat(request);
   }
 
-  public long requirePreAllocation(int requireSize, int retryMax, long 
retryIntervalMax) {
-    RequireBufferRequest rpcRequest = 
RequireBufferRequest.newBuilder().setRequireSize(requireSize).build();
+  // Only for tests
+  @VisibleForTesting
+  public long requirePreAllocation(int requireSize, int retryMax, long 
retryIntervalMax) throws Exception {
+    return requirePreAllocation(
+        "EMPTY",
+        0,
+        Collections.emptyList(),
+        requireSize,
+        retryMax,
+        retryIntervalMax
+    );
+  }
+
+  public long requirePreAllocation(
+      String appId,
+      int shuffleId,
+      List<Integer> partitionIds,
+      int requireSize,
+      int retryMax,
+      long retryIntervalMax) {
+    RequireBufferRequest rpcRequest = RequireBufferRequest.newBuilder()
+        .setShuffleId(shuffleId)
+        .addAllPartitionIds(partitionIds)
+        .setAppId(appId)
+        .setRequireSize(requireSize)
+        .build();
+
     long start = System.currentTimeMillis();
     RequireBufferResponse rpcResponse = 
getBlockingStub().requireBuffer(rpcRequest);
     int retry = 0;
@@ -282,6 +309,9 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
       List<ShuffleData> shuffleData = Lists.newArrayList();
       int size = 0;
       int blockNum = 0;
+      int shuffleId = stb.getKey();
+      List<Integer> partitionIds = new ArrayList<>();
+
       for (Map.Entry<Integer, List<ShuffleBlockInfo>> ptb : 
stb.getValue().entrySet()) {
         List<ShuffleBlock> shuffleBlocks = Lists.newArrayList();
         for (ShuffleBlockInfo sbi : ptb.getValue()) {
@@ -298,14 +328,21 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
         shuffleData.add(ShuffleData.newBuilder().setPartitionId(ptb.getKey())
             .addAllBlock(shuffleBlocks)
             .build());
+        partitionIds.add(ptb.getKey());
       }
 
       final int allocateSize = size;
       final int finalBlockNum = blockNum;
       try {
         RetryUtils.retry(() -> {
-          long requireId = requirePreAllocation(allocateSize, 
request.getRetryMax() / maxRetryAttempts,
-              request.getRetryIntervalMax());
+          long requireId = requirePreAllocation(
+              appId,
+              shuffleId,
+              partitionIds,
+              allocateSize,
+              request.getRetryMax() / maxRetryAttempts,
+              request.getRetryIntervalMax()
+          );
           if (requireId == FAILED_REQUIRE_ID) {
             throw new RssException(String.format(
                 "requirePreAllocation failed! size[%s], host[%s], port[%s]", 
allocateSize, host, port));
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 3308cdcb..0bff956e 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -51,6 +51,9 @@ message FinishShuffleResponse {
 
 message RequireBufferRequest {
   int32 requireSize = 1;
+  string appId = 2;
+  int32 shuffleId = 3;
+  repeated int32 partitionIds = 4;
 }
 
 message RequireBufferResponse {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index b40b19db..5b181075 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -342,6 +342,20 @@ public class ShuffleServerConf extends RssBaseConf {
       .noDefaultValue()
       .withDescription("The env key to get json source of local storage media 
provider");
 
+  public static final ConfigOption<Long> HUGE_PARTITION_SIZE_THRESHOLD = 
ConfigOptions
+      .key("rss.server.huge-partition.size.threshold")
+      .longType()
+      .defaultValue(20 * 1024 * 1024 * 1024L)
+      .withDescription("Threshold of huge partition size, once exceeding 
threshold, memory usage limitation and "
+          + "huge partition buffer flushing will be triggered.");
+
+  public static final ConfigOption<Double> 
HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO = ConfigOptions
+      .key("rss.server.huge-partition.memory.limit.ratio")
+      .doubleType()
+      .defaultValue(0.2)
+      .withDescription("The memory usage limit ratio for huge partition, it 
will only triggered when partition's "
+          + "size exceeds the threshold of '" + 
HUGE_PARTITION_SIZE_THRESHOLD.key() + "'");
+
   public ShuffleServerConf() {
   }
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 1e6999e3..774a79b9 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -30,6 +30,7 @@ import com.google.protobuf.UnsafeByteOperations;
 import io.grpc.Context;
 import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
+import org.apache.commons.lang3.StringUtils;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -353,7 +354,22 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
   @Override
   public void requireBuffer(RequireBufferRequest request,
       StreamObserver<RequireBufferResponse> responseObserver) {
-    long requireBufferId = 
shuffleServer.getShuffleTaskManager().requireBuffer(request.getRequireSize());
+    String appId = request.getAppId();
+    long requireBufferId;
+    if (StringUtils.isEmpty(appId)) {
+      // To be compatible with older client version
+      requireBufferId = shuffleServer.getShuffleTaskManager().requireBuffer(
+          request.getRequireSize()
+      );
+    } else {
+      requireBufferId = shuffleServer.getShuffleTaskManager().requireBuffer(
+          appId,
+          request.getShuffleId(),
+          request.getPartitionIdsList(),
+          request.getRequireSize()
+      );
+    }
+
     StatusCode status = StatusCode.SUCCESS;
     if (requireBufferId == -1) {
       status = StatusCode.NO_BUFFER;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index d103f506..53f22c46 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -95,7 +95,6 @@ public class ShuffleTaskManager {
   private Map<Long, PreAllocatedBufferInfo> requireBufferIds = 
Maps.newConcurrentMap();
   private Runnable clearResourceThread;
   private BlockingQueue<PurgeEvent> expiredAppIdQueue = 
Queues.newLinkedBlockingQueue();
-  // appId -> shuffleId -> serverReadHandler
 
   public ShuffleTaskManager(
       ShuffleServerConf conf,
@@ -200,7 +199,13 @@ public class ShuffleTaskManager {
   public StatusCode cacheShuffleData(
       String appId, int shuffleId, boolean isPreAllocated, 
ShufflePartitionedData spd) {
     refreshAppId(appId);
-    return shuffleBufferManager.cacheShuffleData(appId, shuffleId, 
isPreAllocated, spd);
+    return shuffleBufferManager.cacheShuffleData(
+        appId,
+        shuffleId,
+        isPreAllocated,
+        spd,
+        this::getPartitionDataSize
+    );
   }
 
   public PreAllocatedBufferInfo getAndRemovePreAllocatedBuffer(long 
requireBufferId) {
@@ -335,6 +340,27 @@ public class ShuffleTaskManager {
     return blockIds;
   }
 
+  public long getPartitionDataSize(String appId, int shuffleId, int 
partitionId) {
+    ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.get(appId);
+    if (shuffleTaskInfo == null) {
+      return 0L;
+    }
+    return shuffleTaskInfo.getPartitionDataSize(shuffleId, partitionId);
+  }
+
+  public long requireBuffer(String appId, int shuffleId, List<Integer> 
partitionIds, int requireSize) {
+    ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.get(appId);
+    if (shuffleTaskInfo != null) {
+      for (int partitionId : partitionIds) {
+        long partitionUsedDataSize = getPartitionDataSize(appId, shuffleId, 
partitionId);
+        if (shuffleBufferManager.limitHugePartition(appId, shuffleId, 
partitionId, partitionUsedDataSize)) {
+          return -1;
+        }
+      }
+    }
+    return requireBuffer(requireSize);
+  }
+
   public long requireBuffer(int requireSize) {
     long requireId = -1;
     if (shuffleBufferManager.requireMemory(requireSize, true)) {
@@ -630,5 +656,4 @@ public class ShuffleTaskManager {
       this.shuffleBufferManager.flushIfNecessary();
     }
   }
-
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index eca6610f..b4c7c2d4 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -41,6 +41,7 @@ import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShufflePartitionedData;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.common.util.TripleFunction;
 import org.apache.uniffle.server.ShuffleDataFlushEvent;
 import org.apache.uniffle.server.ShuffleFlushManager;
 import org.apache.uniffle.server.ShuffleServerConf;
@@ -62,6 +63,9 @@ public class ShuffleBufferManager {
   // when shuffle buffer manager flushes data, shuffles with data size < 
shuffleFlushThreshold is kept in memory to
   // reduce small I/Os to persistent storage, especially for local HDDs.
   private long shuffleFlushThreshold;
+  // Huge partition vars
+  private long hugePartitionSizeThreshold;
+  private long hugePartitionMemoryLimitSize;
 
   protected long bufferSize = 0;
   protected AtomicLong preAllocatedSize = new AtomicLong(0L);
@@ -86,6 +90,10 @@ public class ShuffleBufferManager {
     this.bufferFlushEnabled = 
conf.getBoolean(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED);
     this.bufferFlushThreshold = 
conf.getLong(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD);
     this.shuffleFlushThreshold = 
conf.getLong(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD);
+    this.hugePartitionSizeThreshold = 
conf.getSizeAsBytes(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD);
+    this.hugePartitionMemoryLimitSize = Math.round(
+        capacity * 
conf.get(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO)
+    );
   }
 
   public StatusCode registerBuffer(String appId, int shuffleId, int 
startPartition, int endPartition) {
@@ -104,8 +112,27 @@ public class ShuffleBufferManager {
     return StatusCode.SUCCESS;
   }
 
-  public StatusCode cacheShuffleData(String appId, int shuffleId,
-      boolean isPreAllocated, ShufflePartitionedData spd) {
+  // Only for tests
+  public StatusCode cacheShuffleData(
+      String appId,
+      int shuffleId,
+      boolean isPreAllocated,
+      ShufflePartitionedData spd) {
+    return cacheShuffleData(
+        appId,
+        shuffleId,
+        isPreAllocated,
+        spd,
+        null
+    );
+  }
+
+  public StatusCode cacheShuffleData(
+      String appId,
+      int shuffleId,
+      boolean isPreAllocated,
+      ShufflePartitionedData spd,
+      TripleFunction<String, Integer, Integer, Long> getPartitionDataSizeFunc) 
{
     if (!isPreAllocated && isFull()) {
       LOG.warn("Got unexpected data, can't cache it because the space is 
full");
       return StatusCode.NO_BUFFER;
@@ -124,8 +151,15 @@ public class ShuffleBufferManager {
     }
     updateShuffleSize(appId, shuffleId, size);
     synchronized (this) {
-      flushSingleBufferIfNecessary(buffer, appId, shuffleId,
-          entry.getKey().lowerEndpoint(), entry.getKey().upperEndpoint());
+      flushSingleBufferIfNecessary(
+          buffer,
+          appId,
+          shuffleId,
+          spd.getPartitionId(),
+          entry.getKey().lowerEndpoint(),
+          entry.getKey().upperEndpoint(),
+          getPartitionDataSizeFunc
+      );
       flushIfNecessary();
     }
     return StatusCode.SUCCESS;
@@ -184,12 +218,26 @@ public class ShuffleBufferManager {
     return buffer.getShuffleData(blockId, readBufferSize, expectedTaskIds);
   }
 
-  void flushSingleBufferIfNecessary(ShuffleBuffer buffer, String appId,
-      int shuffleId, int startPartition, int endPartition) {
+  void flushSingleBufferIfNecessary(
+      ShuffleBuffer buffer,
+      String appId,
+      int shuffleId,
+      int partitionId,
+      int startPartition,
+      int endPartition,
+      TripleFunction<String, Integer, Integer, Long> getPartitionDataSizeFunc) 
{
     // When we use multi storage and trigger single buffer flush, the buffer 
size should be bigger
     // than rss.server.flush.cold.storage.threshold.size, otherwise cold 
storage will be useless.
     if (this.bufferFlushEnabled && buffer.getSize() > 
this.bufferFlushThreshold) {
       flushBuffer(buffer, appId, shuffleId, startPartition, endPartition);
+      return;
+    }
+
+    if (getPartitionDataSizeFunc != null
+        && getPartitionDataSizeFunc.accept(appId, shuffleId, partitionId) > 
hugePartitionSizeThreshold
+        && buffer.getSize() > this.bufferFlushThreshold) {
+      flushBuffer(buffer, appId, shuffleId, startPartition, endPartition);
+      return;
     }
   }
 
@@ -515,4 +563,16 @@ public class ShuffleBufferManager {
       shuffleIdToBuffers.remove(shuffleId);
     }
   }
+
+  public boolean limitHugePartition(String appId, int shuffleId, int 
partitionId, long usedPartitionDataSize) {
+    if (usedPartitionDataSize > hugePartitionSizeThreshold) {
+      long memoryUsed = getShuffleBufferEntry(appId, shuffleId, 
partitionId).getValue().getSize();
+      if (memoryUsed > hugePartitionMemoryLimitSize) {
+        LOG.warn("AppId: {}, shuffleId: {}, partitionId: {}, memory used: {}, "
+            + "huge partition triggered memory limitation.", appId, shuffleId, 
partitionId, memoryUsed);
+        return true;
+      }
+    }
+    return false;
+  }
 }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 1dc6eb58..43cd5ac1 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -64,6 +64,7 @@ import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -78,6 +79,48 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     ShuffleServerMetrics.clear();
   }
 
+  @Test
+  public void hugePartitionMemoryUsageLimitTest() throws Exception {
+    String confFile = ClassLoader.getSystemResource("server.conf").getFile();
+    ShuffleServerConf conf = new ShuffleServerConf(confFile);
+    conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.MEMORY_LOCALFILE.name());
+    conf.setString(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD.key(), 
"1K");
+    conf.setString("rss.server.buffer.capacity", "10K");
+    conf.set(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO, 
0.1);
+
+    ShuffleServer shuffleServer = new ShuffleServer(conf);
+    ShuffleTaskManager shuffleTaskManager = 
shuffleServer.getShuffleTaskManager();
+
+    String appId = "hugePartitionMemoryUsageLimitTest_appId";
+    int shuffleId = 1;
+
+    shuffleTaskManager.registerShuffle(
+        appId,
+        shuffleId,
+        Lists.newArrayList(new PartitionRange(1, 1)),
+        RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
+        StringUtils.EMPTY
+    );
+
+    // case1
+    long requiredId = shuffleTaskManager.requireBuffer(appId, 1, 
Arrays.asList(1), 500);
+    assertNotEquals(-1, requiredId);
+
+    // case2
+    ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 500);
+    shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, 
partitionedData0);
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, 
partitionedData0.getBlockList());
+    requiredId = shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 
500);
+    assertNotEquals(-1, requiredId);
+
+    // case3
+    partitionedData0 = createPartitionedData(1, 1, 500);
+    shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, 
partitionedData0);
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, 
partitionedData0.getBlockList());
+    requiredId = shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 
500);
+    assertEquals(-1, requiredId);
+  }
+
   @Test
   public void partitionDataSizeSummaryTest() throws Exception {
     String confFile = ClassLoader.getSystemResource("server.conf").getFile();
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 5ca675cc..fcfe8306 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -43,8 +43,10 @@ import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
@@ -442,6 +444,71 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     assertEquals(1, shuffleBufferManager.getBufferPool().keySet().size());
   }
 
+  @Test
+  public void flushSingleBufferForHugePartitionTest(@TempDir File tmpDir) 
throws Exception {
+    ShuffleServerConf shuffleConf = new ShuffleServerConf();
+    File dataDir = new File(tmpDir, "data");
+    shuffleConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.LOCALFILE.name());
+    shuffleConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
Arrays.asList(dataDir.getAbsolutePath()));
+    
shuffleConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
 20.0);
+    
shuffleConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
 80.0);
+    shuffleConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 
1024L);
+    shuffleConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 200L);
+    
shuffleConf.set(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO, 
0.1);
+    shuffleConf.set(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD, 100L);
+    
shuffleConf.setSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD, 
64L);
+
+    ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
+    StorageManager storageManager = 
StorageManagerFactory.getInstance().createStorageManager(shuffleConf);
+    ShuffleFlushManager shuffleFlushManager =
+        new ShuffleFlushManager(shuffleConf, "serverId", mockShuffleServer, 
storageManager);
+    shuffleBufferManager = new ShuffleBufferManager(shuffleConf, 
shuffleFlushManager);
+    ShuffleTaskManager shuffleTaskManager =
+        new ShuffleTaskManager(shuffleConf, shuffleFlushManager, 
shuffleBufferManager, storageManager);
+
+    when(mockShuffleServer
+        .getShuffleFlushManager())
+        .thenReturn(shuffleFlushManager);
+    when(mockShuffleServer
+        .getShuffleBufferManager())
+        .thenReturn(shuffleBufferManager);
+    when(mockShuffleServer
+        .getShuffleTaskManager())
+        .thenReturn(shuffleTaskManager);
+
+    String appId = "flushSingleBufferForHugePartitionTest_appId";
+    int shuffleId = 1;
+
+    // case1: its partition is not huge partition
+    shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 0);
+    ShufflePartitionedData partitionedData = createData(0, 1);
+    shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, 
partitionedData);
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, 
partitionedData.getBlockList());
+    assertEquals(1 + 32, shuffleBufferManager.getUsedMemory());
+    long usedSize = shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 
0);
+    assertEquals(1 + 32, usedSize);
+    assertFalse(
+        shuffleBufferManager.limitHugePartition(appId, shuffleId, 0,
+            shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 0)
+        )
+    );
+
+    // case2: its partition is huge partition, its buffer will be flushed to 
DISK directly
+    partitionedData = createData(0, 36);
+    shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, 
partitionedData);
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, 
partitionedData.getBlockList());
+    assertEquals(33 + 36 + 32, shuffleBufferManager.getUsedMemory());
+    assertTrue(
+        shuffleBufferManager.limitHugePartition(appId, shuffleId, 0,
+            shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 0)
+        )
+    );
+    partitionedData = createData(0, 1);
+    shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, 
partitionedData);
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0, 
partitionedData.getBlockList());
+    waitForFlush(shuffleFlushManager, appId, shuffleId, 3);
+  }
+
   @Test
   public void flushSingleBufferTest(@TempDir File tmpDir) throws Exception {
     ShuffleServerConf shuffleConf = new ShuffleServerConf();

Reply via email to