This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new e802b93d [ISSUE-378][HugePartition][Part-2] Introduce memory usage
limit and data flush (#471)
e802b93d is described below
commit e802b93d7050dc48a62b1abe2693dc96c706151f
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Jan 17 10:02:36 2023 +0800
[ISSUE-378][HugePartition][Part-2] Introduce memory usage limit and data
flush (#471)
### What changes were proposed in this pull request?
1. Introduce memory usage limit for huge partition to keep the regular
partition writing stable
2. Once partition is marked as huge-partition, when its buffer size is
greater than `rss.server.single.buffer.flush.threshold` value, single-buffer
flush will be triggered whatever the single buffer flush is enabled or not
### Why are the changes needed?
1. To solve the problems mentioned by #378
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
1. UTs
---
.../apache/uniffle/common/util/TripleFunction.java | 25 ++++++++
docs/server_guide.md | 2 +
.../client/impl/grpc/ShuffleServerGrpcClient.java | 45 ++++++++++++--
proto/src/main/proto/Rss.proto | 3 +
.../apache/uniffle/server/ShuffleServerConf.java | 14 +++++
.../uniffle/server/ShuffleServerGrpcService.java | 18 +++++-
.../apache/uniffle/server/ShuffleTaskManager.java | 31 +++++++++-
.../server/buffer/ShuffleBufferManager.java | 72 ++++++++++++++++++++--
.../uniffle/server/ShuffleTaskManagerTest.java | 43 +++++++++++++
.../server/buffer/ShuffleBufferManagerTest.java | 67 ++++++++++++++++++++
10 files changed, 306 insertions(+), 14 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/util/TripleFunction.java
b/common/src/main/java/org/apache/uniffle/common/util/TripleFunction.java
new file mode 100644
index 00000000..56ece5f6
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/util/TripleFunction.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+@FunctionalInterface
+public interface TripleFunction<T, U, E, R> {
+
+ R accept(T t, U u, E e);
+
+}
diff --git a/docs/server_guide.md b/docs/server_guide.md
index 0e0b08cd..09decaae 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -86,6 +86,8 @@ This document will introduce how to deploy Uniffle shuffle
servers.
|rss.server.leak.shuffledata.check.interval|3600000|The interval of leak
shuffle data check (ms)|
|rss.server.max.concurrency.of.single.partition.writer|1|The max concurrency
of single partition writer, the data partition file number is equal to this
value. Default value is 1. This config could improve the writing speed,
especially for huge partition.|
|rss.metrics.reporter.class|-|The class of metrics reporter.|
+|rss.server.huge-partition.size.threshold|20g|Threshold of huge partition
size, once exceeding threshold, memory usage limitation and huge partition
buffer flushing will be triggered.|
+|rss.server.huge-partition.memory.limit.ratio|0.2|The memory usage limit ratio
for huge partition, it will only triggered when partition's size exceeds the
threshold of 'rss.server.huge-partition.size.threshold'|
### Advanced Configurations
|Property Name|Default| Description
|
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 74fe2767..57dc8ed5 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -17,6 +17,8 @@
package org.apache.uniffle.client.impl.grpc;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -179,8 +181,33 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
return blockingStub.withDeadlineAfter(timeout,
TimeUnit.MILLISECONDS).appHeartbeat(request);
}
- public long requirePreAllocation(int requireSize, int retryMax, long
retryIntervalMax) {
- RequireBufferRequest rpcRequest =
RequireBufferRequest.newBuilder().setRequireSize(requireSize).build();
+ // Only for tests
+ @VisibleForTesting
+ public long requirePreAllocation(int requireSize, int retryMax, long
retryIntervalMax) throws Exception {
+ return requirePreAllocation(
+ "EMPTY",
+ 0,
+ Collections.emptyList(),
+ requireSize,
+ retryMax,
+ retryIntervalMax
+ );
+ }
+
+ public long requirePreAllocation(
+ String appId,
+ int shuffleId,
+ List<Integer> partitionIds,
+ int requireSize,
+ int retryMax,
+ long retryIntervalMax) {
+ RequireBufferRequest rpcRequest = RequireBufferRequest.newBuilder()
+ .setShuffleId(shuffleId)
+ .addAllPartitionIds(partitionIds)
+ .setAppId(appId)
+ .setRequireSize(requireSize)
+ .build();
+
long start = System.currentTimeMillis();
RequireBufferResponse rpcResponse =
getBlockingStub().requireBuffer(rpcRequest);
int retry = 0;
@@ -282,6 +309,9 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
List<ShuffleData> shuffleData = Lists.newArrayList();
int size = 0;
int blockNum = 0;
+ int shuffleId = stb.getKey();
+ List<Integer> partitionIds = new ArrayList<>();
+
for (Map.Entry<Integer, List<ShuffleBlockInfo>> ptb :
stb.getValue().entrySet()) {
List<ShuffleBlock> shuffleBlocks = Lists.newArrayList();
for (ShuffleBlockInfo sbi : ptb.getValue()) {
@@ -298,14 +328,21 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
shuffleData.add(ShuffleData.newBuilder().setPartitionId(ptb.getKey())
.addAllBlock(shuffleBlocks)
.build());
+ partitionIds.add(ptb.getKey());
}
final int allocateSize = size;
final int finalBlockNum = blockNum;
try {
RetryUtils.retry(() -> {
- long requireId = requirePreAllocation(allocateSize,
request.getRetryMax() / maxRetryAttempts,
- request.getRetryIntervalMax());
+ long requireId = requirePreAllocation(
+ appId,
+ shuffleId,
+ partitionIds,
+ allocateSize,
+ request.getRetryMax() / maxRetryAttempts,
+ request.getRetryIntervalMax()
+ );
if (requireId == FAILED_REQUIRE_ID) {
throw new RssException(String.format(
"requirePreAllocation failed! size[%s], host[%s], port[%s]",
allocateSize, host, port));
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 3308cdcb..0bff956e 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -51,6 +51,9 @@ message FinishShuffleResponse {
message RequireBufferRequest {
int32 requireSize = 1;
+ string appId = 2;
+ int32 shuffleId = 3;
+ repeated int32 partitionIds = 4;
}
message RequireBufferResponse {
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index b40b19db..5b181075 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -342,6 +342,20 @@ public class ShuffleServerConf extends RssBaseConf {
.noDefaultValue()
.withDescription("The env key to get json source of local storage media
provider");
+ public static final ConfigOption<Long> HUGE_PARTITION_SIZE_THRESHOLD =
ConfigOptions
+ .key("rss.server.huge-partition.size.threshold")
+ .longType()
+ .defaultValue(20 * 1024 * 1024 * 1024L)
+ .withDescription("Threshold of huge partition size, once exceeding
threshold, memory usage limitation and "
+ + "huge partition buffer flushing will be triggered.");
+
+ public static final ConfigOption<Double>
HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO = ConfigOptions
+ .key("rss.server.huge-partition.memory.limit.ratio")
+ .doubleType()
+ .defaultValue(0.2)
+ .withDescription("The memory usage limit ratio for huge partition, it
will only triggered when partition's "
+ + "size exceeds the threshold of '" +
HUGE_PARTITION_SIZE_THRESHOLD.key() + "'");
+
public ShuffleServerConf() {
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 1e6999e3..774a79b9 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -30,6 +30,7 @@ import com.google.protobuf.UnsafeByteOperations;
import io.grpc.Context;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
+import org.apache.commons.lang3.StringUtils;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -353,7 +354,22 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
@Override
public void requireBuffer(RequireBufferRequest request,
StreamObserver<RequireBufferResponse> responseObserver) {
- long requireBufferId =
shuffleServer.getShuffleTaskManager().requireBuffer(request.getRequireSize());
+ String appId = request.getAppId();
+ long requireBufferId;
+ if (StringUtils.isEmpty(appId)) {
+ // To be compatible with older client version
+ requireBufferId = shuffleServer.getShuffleTaskManager().requireBuffer(
+ request.getRequireSize()
+ );
+ } else {
+ requireBufferId = shuffleServer.getShuffleTaskManager().requireBuffer(
+ appId,
+ request.getShuffleId(),
+ request.getPartitionIdsList(),
+ request.getRequireSize()
+ );
+ }
+
StatusCode status = StatusCode.SUCCESS;
if (requireBufferId == -1) {
status = StatusCode.NO_BUFFER;
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index d103f506..53f22c46 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -95,7 +95,6 @@ public class ShuffleTaskManager {
private Map<Long, PreAllocatedBufferInfo> requireBufferIds =
Maps.newConcurrentMap();
private Runnable clearResourceThread;
private BlockingQueue<PurgeEvent> expiredAppIdQueue =
Queues.newLinkedBlockingQueue();
- // appId -> shuffleId -> serverReadHandler
public ShuffleTaskManager(
ShuffleServerConf conf,
@@ -200,7 +199,13 @@ public class ShuffleTaskManager {
public StatusCode cacheShuffleData(
String appId, int shuffleId, boolean isPreAllocated,
ShufflePartitionedData spd) {
refreshAppId(appId);
- return shuffleBufferManager.cacheShuffleData(appId, shuffleId,
isPreAllocated, spd);
+ return shuffleBufferManager.cacheShuffleData(
+ appId,
+ shuffleId,
+ isPreAllocated,
+ spd,
+ this::getPartitionDataSize
+ );
}
public PreAllocatedBufferInfo getAndRemovePreAllocatedBuffer(long
requireBufferId) {
@@ -335,6 +340,27 @@ public class ShuffleTaskManager {
return blockIds;
}
+ public long getPartitionDataSize(String appId, int shuffleId, int
partitionId) {
+ ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.get(appId);
+ if (shuffleTaskInfo == null) {
+ return 0L;
+ }
+ return shuffleTaskInfo.getPartitionDataSize(shuffleId, partitionId);
+ }
+
+ public long requireBuffer(String appId, int shuffleId, List<Integer>
partitionIds, int requireSize) {
+ ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.get(appId);
+ if (shuffleTaskInfo != null) {
+ for (int partitionId : partitionIds) {
+ long partitionUsedDataSize = getPartitionDataSize(appId, shuffleId,
partitionId);
+ if (shuffleBufferManager.limitHugePartition(appId, shuffleId,
partitionId, partitionUsedDataSize)) {
+ return -1;
+ }
+ }
+ }
+ return requireBuffer(requireSize);
+ }
+
public long requireBuffer(int requireSize) {
long requireId = -1;
if (shuffleBufferManager.requireMemory(requireSize, true)) {
@@ -630,5 +656,4 @@ public class ShuffleTaskManager {
this.shuffleBufferManager.flushIfNecessary();
}
}
-
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index eca6610f..b4c7c2d4 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -41,6 +41,7 @@ import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.common.util.TripleFunction;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleFlushManager;
import org.apache.uniffle.server.ShuffleServerConf;
@@ -62,6 +63,9 @@ public class ShuffleBufferManager {
// when shuffle buffer manager flushes data, shuffles with data size <
shuffleFlushThreshold is kept in memory to
// reduce small I/Os to persistent storage, especially for local HDDs.
private long shuffleFlushThreshold;
+ // Huge partition vars
+ private long hugePartitionSizeThreshold;
+ private long hugePartitionMemoryLimitSize;
protected long bufferSize = 0;
protected AtomicLong preAllocatedSize = new AtomicLong(0L);
@@ -86,6 +90,10 @@ public class ShuffleBufferManager {
this.bufferFlushEnabled =
conf.getBoolean(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED);
this.bufferFlushThreshold =
conf.getLong(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD);
this.shuffleFlushThreshold =
conf.getLong(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD);
+ this.hugePartitionSizeThreshold =
conf.getSizeAsBytes(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD);
+ this.hugePartitionMemoryLimitSize = Math.round(
+ capacity *
conf.get(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO)
+ );
}
public StatusCode registerBuffer(String appId, int shuffleId, int
startPartition, int endPartition) {
@@ -104,8 +112,27 @@ public class ShuffleBufferManager {
return StatusCode.SUCCESS;
}
- public StatusCode cacheShuffleData(String appId, int shuffleId,
- boolean isPreAllocated, ShufflePartitionedData spd) {
+ // Only for tests
+ public StatusCode cacheShuffleData(
+ String appId,
+ int shuffleId,
+ boolean isPreAllocated,
+ ShufflePartitionedData spd) {
+ return cacheShuffleData(
+ appId,
+ shuffleId,
+ isPreAllocated,
+ spd,
+ null
+ );
+ }
+
+ public StatusCode cacheShuffleData(
+ String appId,
+ int shuffleId,
+ boolean isPreAllocated,
+ ShufflePartitionedData spd,
+ TripleFunction<String, Integer, Integer, Long> getPartitionDataSizeFunc)
{
if (!isPreAllocated && isFull()) {
LOG.warn("Got unexpected data, can't cache it because the space is
full");
return StatusCode.NO_BUFFER;
@@ -124,8 +151,15 @@ public class ShuffleBufferManager {
}
updateShuffleSize(appId, shuffleId, size);
synchronized (this) {
- flushSingleBufferIfNecessary(buffer, appId, shuffleId,
- entry.getKey().lowerEndpoint(), entry.getKey().upperEndpoint());
+ flushSingleBufferIfNecessary(
+ buffer,
+ appId,
+ shuffleId,
+ spd.getPartitionId(),
+ entry.getKey().lowerEndpoint(),
+ entry.getKey().upperEndpoint(),
+ getPartitionDataSizeFunc
+ );
flushIfNecessary();
}
return StatusCode.SUCCESS;
@@ -184,12 +218,26 @@ public class ShuffleBufferManager {
return buffer.getShuffleData(blockId, readBufferSize, expectedTaskIds);
}
- void flushSingleBufferIfNecessary(ShuffleBuffer buffer, String appId,
- int shuffleId, int startPartition, int endPartition) {
+ void flushSingleBufferIfNecessary(
+ ShuffleBuffer buffer,
+ String appId,
+ int shuffleId,
+ int partitionId,
+ int startPartition,
+ int endPartition,
+ TripleFunction<String, Integer, Integer, Long> getPartitionDataSizeFunc)
{
// When we use multi storage and trigger single buffer flush, the buffer
size should be bigger
// than rss.server.flush.cold.storage.threshold.size, otherwise cold
storage will be useless.
if (this.bufferFlushEnabled && buffer.getSize() >
this.bufferFlushThreshold) {
flushBuffer(buffer, appId, shuffleId, startPartition, endPartition);
+ return;
+ }
+
+ if (getPartitionDataSizeFunc != null
+ && getPartitionDataSizeFunc.accept(appId, shuffleId, partitionId) >
hugePartitionSizeThreshold
+ && buffer.getSize() > this.bufferFlushThreshold) {
+ flushBuffer(buffer, appId, shuffleId, startPartition, endPartition);
+ return;
}
}
@@ -515,4 +563,16 @@ public class ShuffleBufferManager {
shuffleIdToBuffers.remove(shuffleId);
}
}
+
+ public boolean limitHugePartition(String appId, int shuffleId, int
partitionId, long usedPartitionDataSize) {
+ if (usedPartitionDataSize > hugePartitionSizeThreshold) {
+ long memoryUsed = getShuffleBufferEntry(appId, shuffleId,
partitionId).getValue().getSize();
+ if (memoryUsed > hugePartitionMemoryLimitSize) {
+ LOG.warn("AppId: {}, shuffleId: {}, partitionId: {}, memory used: {}, "
+ + "huge partition triggered memory limitation.", appId, shuffleId,
partitionId, memoryUsed);
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 1dc6eb58..43cd5ac1 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -64,6 +64,7 @@ import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -78,6 +79,48 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
ShuffleServerMetrics.clear();
}
+ @Test
+ public void hugePartitionMemoryUsageLimitTest() throws Exception {
+ String confFile = ClassLoader.getSystemResource("server.conf").getFile();
+ ShuffleServerConf conf = new ShuffleServerConf(confFile);
+ conf.set(ShuffleServerConf.RSS_STORAGE_TYPE,
StorageType.MEMORY_LOCALFILE.name());
+ conf.setString(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD.key(),
"1K");
+ conf.setString("rss.server.buffer.capacity", "10K");
+ conf.set(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO,
0.1);
+
+ ShuffleServer shuffleServer = new ShuffleServer(conf);
+ ShuffleTaskManager shuffleTaskManager =
shuffleServer.getShuffleTaskManager();
+
+ String appId = "hugePartitionMemoryUsageLimitTest_appId";
+ int shuffleId = 1;
+
+ shuffleTaskManager.registerShuffle(
+ appId,
+ shuffleId,
+ Lists.newArrayList(new PartitionRange(1, 1)),
+ RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
+ StringUtils.EMPTY
+ );
+
+ // case1
+ long requiredId = shuffleTaskManager.requireBuffer(appId, 1,
Arrays.asList(1), 500);
+ assertNotEquals(-1, requiredId);
+
+ // case2
+ ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 500);
+ shuffleTaskManager.cacheShuffleData(appId, shuffleId, true,
partitionedData0);
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1,
partitionedData0.getBlockList());
+ requiredId = shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1),
500);
+ assertNotEquals(-1, requiredId);
+
+ // case3
+ partitionedData0 = createPartitionedData(1, 1, 500);
+ shuffleTaskManager.cacheShuffleData(appId, shuffleId, true,
partitionedData0);
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1,
partitionedData0.getBlockList());
+ requiredId = shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1),
500);
+ assertEquals(-1, requiredId);
+ }
+
@Test
public void partitionDataSizeSummaryTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 5ca675cc..fcfe8306 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -43,8 +43,10 @@ import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@@ -442,6 +444,71 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
assertEquals(1, shuffleBufferManager.getBufferPool().keySet().size());
}
+ @Test
+ public void flushSingleBufferForHugePartitionTest(@TempDir File tmpDir)
throws Exception {
+ ShuffleServerConf shuffleConf = new ShuffleServerConf();
+ File dataDir = new File(tmpDir, "data");
+ shuffleConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE,
StorageType.LOCALFILE.name());
+ shuffleConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(dataDir.getAbsolutePath()));
+
shuffleConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
20.0);
+
shuffleConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
80.0);
+ shuffleConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L *
1024L);
+ shuffleConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 200L);
+
shuffleConf.set(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO,
0.1);
+ shuffleConf.set(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD, 100L);
+
shuffleConf.setSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD,
64L);
+
+ ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
+ StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleConf);
+ ShuffleFlushManager shuffleFlushManager =
+ new ShuffleFlushManager(shuffleConf, "serverId", mockShuffleServer,
storageManager);
+ shuffleBufferManager = new ShuffleBufferManager(shuffleConf,
shuffleFlushManager);
+ ShuffleTaskManager shuffleTaskManager =
+ new ShuffleTaskManager(shuffleConf, shuffleFlushManager,
shuffleBufferManager, storageManager);
+
+ when(mockShuffleServer
+ .getShuffleFlushManager())
+ .thenReturn(shuffleFlushManager);
+ when(mockShuffleServer
+ .getShuffleBufferManager())
+ .thenReturn(shuffleBufferManager);
+ when(mockShuffleServer
+ .getShuffleTaskManager())
+ .thenReturn(shuffleTaskManager);
+
+ String appId = "flushSingleBufferForHugePartitionTest_appId";
+ int shuffleId = 1;
+
+ // case1: its partition is not huge partition
+ shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 0);
+ ShufflePartitionedData partitionedData = createData(0, 1);
+ shuffleTaskManager.cacheShuffleData(appId, shuffleId, false,
partitionedData);
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0,
partitionedData.getBlockList());
+ assertEquals(1 + 32, shuffleBufferManager.getUsedMemory());
+ long usedSize = shuffleTaskManager.getPartitionDataSize(appId, shuffleId,
0);
+ assertEquals(1 + 32, usedSize);
+ assertFalse(
+ shuffleBufferManager.limitHugePartition(appId, shuffleId, 0,
+ shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 0)
+ )
+ );
+
+ // case2: its partition is huge partition, its buffer will be flushed to
DISK directly
+ partitionedData = createData(0, 36);
+ shuffleTaskManager.cacheShuffleData(appId, shuffleId, false,
partitionedData);
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0,
partitionedData.getBlockList());
+ assertEquals(33 + 36 + 32, shuffleBufferManager.getUsedMemory());
+ assertTrue(
+ shuffleBufferManager.limitHugePartition(appId, shuffleId, 0,
+ shuffleTaskManager.getPartitionDataSize(appId, shuffleId, 0)
+ )
+ );
+ partitionedData = createData(0, 1);
+ shuffleTaskManager.cacheShuffleData(appId, shuffleId, false,
partitionedData);
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 0,
partitionedData.getBlockList());
+ waitForFlush(shuffleFlushManager, appId, shuffleId, 3);
+ }
+
@Test
public void flushSingleBufferTest(@TempDir File tmpDir) throws Exception {
ShuffleServerConf shuffleConf = new ShuffleServerConf();