This is an automated email from the ASF dual-hosted git repository.
xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 3aaac3e7 [Bug] Fix potential negative preAllocatedSize variable (#428)
3aaac3e7 is described below
commit 3aaac3e725b8e408b27be111b003abcdae7fb350
Author: advancedxy <[email protected]>
AuthorDate: Tue Dec 27 19:22:28 2022 +0800
[Bug] Fix potential negative preAllocatedSize variable (#428)
### What changes were proposed in this pull request?
- update `preAllocatedSize` atomically
- add a new configuration to trigger ShuffleBufferManager's
flushIfNecessary periodically
- tweaks existing UTs
### Why are the changes needed?
`preAllocatedSize` could be negative in prod env and this affects memory
pressure calculation.
And this commit should fix #229 #426.
### Does this PR introduce _any_ user-facing change?
- a new ShuffleServer configuration is introduced
### How was this patch tested?
Existing UTs.
---
.../uniffle/common/ShufflePartitionedData.java | 11 ++++++
.../apache/uniffle/test/IntegrationTestBase.java | 1 +
.../apache/uniffle/test/ShuffleServerGrpcTest.java | 2 -
.../test/ShuffleServerWithMemLocalHdfsTest.java | 1 +
.../uniffle/test/ShuffleServerWithMemoryTest.java | 1 +
.../apache/uniffle/server/ShuffleServerConf.java | 7 ++++
.../uniffle/server/ShuffleServerGrpcService.java | 26 ++++++++-----
.../apache/uniffle/server/ShuffleTaskManager.java | 44 ++++++++++++++++++----
.../server/buffer/PreAllocatedBufferInfo.java | 1 +
.../server/buffer/ShuffleBufferManager.java | 20 +++++-----
.../uniffle/server/ShuffleTaskManagerTest.java | 21 +++++++----
.../server/buffer/ShuffleBufferManagerTest.java | 8 +++-
server/src/test/resources/server.conf | 1 +
13 files changed, 106 insertions(+), 38 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
index 64a19140..3ee1b489 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedData.java
@@ -49,4 +49,15 @@ public class ShufflePartitionedData {
return blockList;
}
+ public long getTotalBlockSize() {
+ if (blockList == null) {
+ return 0L;
+ }
+ long size = 0;
+ for (ShufflePartitionedBlock block : blockList) {
+ size += block.getSize();
+ }
+ return size;
+ }
+
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
index 77eb25f6..d080fb2e 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
@@ -121,6 +121,7 @@ public abstract class IntegrationTestBase extends
HdfsTestBase {
serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L *
1024L);
serverConf.setBoolean("rss.server.health.check.enable", false);
serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
+ serverConf.set(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL,
500L);
return serverConf;
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index fdb83a9f..438dba20 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -399,8 +399,6 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
RssSendShuffleDataRequest rssdr = new RssSendShuffleDataRequest(
"sendDataWithoutRegisterTest", 3, 1000, shuffleToBlocks);
shuffleServerClient.sendShuffleData(rssdr);
- assertEquals(132, shuffleServers.get(0).getPreAllocatedMemory());
- Thread.sleep(10000);
assertEquals(0, shuffleServers.get(0).getPreAllocatedMemory());
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
index 4dc9c706..947612d4 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
@@ -75,6 +75,7 @@ public class ShuffleServerWithMemLocalHdfsTest extends
ShuffleReadWriteBase {
shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
20.0);
shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
40.0);
shuffleServerConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 500L);
+
shuffleServerConf.set(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL,
500L);
createShuffleServer(shuffleServerConf);
startServers();
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
index 81a7e31f..1ffc8540 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
@@ -71,6 +71,7 @@ public class ShuffleServerWithMemoryTest extends
ShuffleReadWriteBase {
shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
20.0);
shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
40.0);
shuffleServerConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 500L);
+
shuffleServerConf.set(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL,
500L);
createShuffleServer(shuffleServerConf);
startServers();
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 38454598..88198616 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -321,6 +321,13 @@ public class ShuffleServerConf extends RssBaseConf {
.withDescription("The max concurrency of single partition writer, the
data partition file number is "
+ "equal to this value. Default value is 1.");
+ public static final ConfigOption<Long> SERVER_TRIGGER_FLUSH_CHECK_INTERVAL =
ConfigOptions
+ .key("rss.server.shuffleBufferManager.trigger.flush.interval")
+ .longType()
+ .defaultValue(0L)
+ .withDescription("The interval of trigger shuffle buffer manager to
flush data to persistent storage. If <= 0"
+ + ", then this flush check would be disabled.");
+
public ShuffleServerConf() {
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 054ffe55..97cd41f9 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -77,6 +77,7 @@ import
org.apache.uniffle.proto.RssProtos.ShufflePartitionRange;
import org.apache.uniffle.proto.RssProtos.ShuffleRegisterRequest;
import org.apache.uniffle.proto.RssProtos.ShuffleRegisterResponse;
import org.apache.uniffle.proto.ShuffleServerGrpc.ShuffleServerImplBase;
+import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.common.StorageReadMetrics;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
@@ -212,7 +213,9 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
String responseMessage = "OK";
if (req.getShuffleDataCount() > 0) {
ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireSize);
- boolean isPreAllocated =
shuffleServer.getShuffleTaskManager().isPreAllocated(requireBufferId);
+ ShuffleTaskManager manager = shuffleServer.getShuffleTaskManager();
+ PreAllocatedBufferInfo info =
manager.getAndRemovePreAllocatedBuffer(requireBufferId);
+ boolean isPreAllocated = info != null;
if (!isPreAllocated) {
String errorMsg = "Can't find requireBufferId[" + requireBufferId + "]
for appId[" + appId
+ "], shuffleId[" + shuffleId + "]";
@@ -229,13 +232,12 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
}
final long start = System.currentTimeMillis();
List<ShufflePartitionedData> shufflePartitionedData =
toPartitionedData(req);
+ long alreadyReleasedSize = 0;
for (ShufflePartitionedData spd : shufflePartitionedData) {
String shuffleDataInfo = "appId[" + appId + "], shuffleId[" + shuffleId
+ "], partitionId[" + spd.getPartitionId() + "]";
try {
- ret = shuffleServer
- .getShuffleTaskManager()
- .cacheShuffleData(appId, shuffleId, isPreAllocated, spd);
+ ret = manager.cacheShuffleData(appId, shuffleId, isPreAllocated,
spd);
if (ret != StatusCode.SUCCESS) {
String errorMsg = "Error happened when shuffleEngine.write for "
+ shuffleDataInfo + ", statusCode=" + ret;
@@ -243,11 +245,11 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
responseMessage = errorMsg;
break;
} else {
- // remove require bufferId, the memory should be updated already
- shuffleServer
-
.getShuffleTaskManager().removeRequireBufferId(requireBufferId);
- shuffleServer.getShuffleTaskManager().updateCachedBlockIds(
- appId, shuffleId, spd.getBlockList());
+ long toReleasedSize = spd.getTotalBlockSize();
+ // after each cacheShuffleData call, the `preAllocatedSize` is
updated timely.
+ manager.releasePreAllocatedSize(toReleasedSize);
+ alreadyReleasedSize += toReleasedSize;
+ manager.updateCachedBlockIds(appId, shuffleId, spd.getBlockList());
}
} catch (Exception e) {
String errorMsg = "Error happened when shuffleEngine.write for "
@@ -258,6 +260,12 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
break;
}
}
+ // since the required buffer id is only used once, the shuffle client
would try to require another buffer whether
+ // current connection succeeded or not. Therefore, the
preAllocatedBuffer is first get and removed, then after
+ // cacheShuffleData finishes, the preAllocatedSize should be updated
accordingly.
+ if (info.getRequireSize() > alreadyReleasedSize) {
+ manager.releasePreAllocatedSize(info.getRequireSize() -
alreadyReleasedSize);
+ }
reply =
SendShuffleDataResponse.newBuilder().setStatus(valueOf(ret)).setRetMsg(responseMessage).build();
long costTime = System.currentTimeMillis() - start;
shuffleServer.getGrpcMetrics().recordProcessTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD,
costTime);
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 5e74eabf..ad93ac53 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -75,6 +75,7 @@ public class ShuffleTaskManager {
private final ScheduledExecutorService scheduledExecutorService;
private final ScheduledExecutorService expiredAppCleanupExecutorService;
private final ScheduledExecutorService leakShuffleDataCheckExecutorService;
+ private ScheduledExecutorService triggerFlushExecutorService;
private final StorageManager storageManager;
private AtomicLong requireBufferId = new AtomicLong(0);
private ShuffleServerConf conf;
@@ -82,6 +83,7 @@ public class ShuffleTaskManager {
private long preAllocationExpired;
private long commitCheckIntervalMax;
private long leakShuffleDataCheckInterval;
+ private long triggerFlushInterval;
// appId -> shuffleId -> blockIds to avoid too many appId
// store taskAttemptId info to filter speculation task
// Roaring64NavigableMap instance will cost much memory,
@@ -109,6 +111,7 @@ public class ShuffleTaskManager {
this.commitCheckIntervalMax =
conf.getLong(ShuffleServerConf.SERVER_COMMIT_CHECK_INTERVAL_MAX);
this.preAllocationExpired =
conf.getLong(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED);
this.leakShuffleDataCheckInterval =
conf.getLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL);
+ this.triggerFlushInterval =
conf.getLong(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL);
// the thread for checking application status
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.getThreadFactory("checkResource-%d"));
@@ -125,6 +128,13 @@ public class ShuffleTaskManager {
leakShuffleDataCheckExecutorService.scheduleAtFixedRate(
() -> checkLeakShuffleData(), leakShuffleDataCheckInterval,
leakShuffleDataCheckInterval, TimeUnit.MILLISECONDS);
+ if (triggerFlushInterval > 0) {
+ triggerFlushExecutorService = Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.getThreadFactory("triggerShuffleBufferManagerFlush"));
+ triggerFlushExecutorService.scheduleWithFixedDelay(
+ this::triggerFlush, triggerFlushInterval / 2,
+ triggerFlushInterval, TimeUnit.MILLISECONDS);
+ }
// the thread for clear expired resources
clearResourceThread = () -> {
while (true) {
@@ -193,12 +203,20 @@ public class ShuffleTaskManager {
return shuffleBufferManager.cacheShuffleData(appId, shuffleId,
isPreAllocated, spd);
}
- public boolean isPreAllocated(long requireBufferId) {
- return requireBufferIds.containsKey(requireBufferId);
+ public PreAllocatedBufferInfo getAndRemovePreAllocatedBuffer(long
requireBufferId) {
+ return requireBufferIds.remove(requireBufferId);
+ }
+
+ public void releasePreAllocatedSize(long requireSize) {
+ shuffleBufferManager.releasePreAllocatedSize(requireSize);
}
- public void removeRequireBufferId(long requireId) {
- requireBufferIds.remove(requireId);
+ @VisibleForTesting
+ void removeAndReleasePreAllocatedBuffer(long requireBufferId) {
+ PreAllocatedBufferInfo info =
getAndRemovePreAllocatedBuffer(requireBufferId);
+ if (info != null) {
+ releasePreAllocatedSize(info.getRequireSize());
+ }
}
public StatusCode commitShuffle(String appId, int shuffleId) throws
Exception {
@@ -528,12 +546,17 @@ public class ShuffleTaskManager {
for (PreAllocatedBufferInfo info : requireBufferIds.values()) {
if (current - info.getTimestamp() > preAllocationExpired) {
removeIds.add(info.getRequireId());
- shuffleBufferManager.releaseMemory(info.getRequireSize(), false,
true);
}
}
for (Long requireId : removeIds) {
- requireBufferIds.remove(requireId);
- LOG.info("Remove expired requireId " + requireId);
+ PreAllocatedBufferInfo info = requireBufferIds.remove(requireId);
+ if (info != null) {
+ // move release memory code down to here as the requiredBuffer could
be consumed during removing processing.
+ shuffleBufferManager.releaseMemory(info.getRequireSize(), false,
true);
+ LOG.info("Remove expired preAllocatedBuffer " + requireId);
+ } else {
+ LOG.info("PreAllocatedBuffer[id={}] has already been removed",
requireId);
+ }
}
} catch (Exception e) {
LOG.warn("Error happened in preAllocatedBufferCheck", e);
@@ -579,4 +602,11 @@ public class ShuffleTaskManager {
public ShuffleDataDistributionType getDataDistributionType(String appId) {
return shuffleTaskInfos.get(appId).getDataDistType();
}
+
+ private void triggerFlush() {
+ synchronized (this.shuffleBufferManager) {
+ this.shuffleBufferManager.flushIfNecessary();
+ }
+ }
+
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/PreAllocatedBufferInfo.java
b/server/src/main/java/org/apache/uniffle/server/buffer/PreAllocatedBufferInfo.java
index 3eccafee..bd6f6a33 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/PreAllocatedBufferInfo.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/PreAllocatedBufferInfo.java
@@ -40,4 +40,5 @@ public class PreAllocatedBufferInfo {
public int getRequireSize() {
return requireSize;
}
+
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 500542a0..a8f0df9b 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -115,7 +115,9 @@ public class ShuffleBufferManager {
ShuffleBuffer buffer = entry.getValue();
long size = buffer.append(spd);
- updateSize(size, isPreAllocated);
+ if (!isPreAllocated) {
+ updateUsedMemory(size);
+ }
updateShuffleSize(appId, shuffleId, size);
synchronized (this) {
flushSingleBufferIfNecessary(buffer, appId, shuffleId,
@@ -187,7 +189,7 @@ public class ShuffleBufferManager {
}
}
- void flushIfNecessary() {
+ public void flushIfNecessary() {
// if data size in buffer > highWaterMark, do the flush
if (usedMemory.get() - preAllocatedSize.get() - inFlushSize.get() >
highWaterMark) {
LOG.info("Start to flush with usedMemory[{}], preAllocatedSize[{}],
inFlushSize[{}]",
@@ -340,14 +342,10 @@ public class ShuffleBufferManager {
}
}
- void updateSize(long delta, boolean isPreAllocated) {
- if (isPreAllocated) {
- releasePreAllocatedSize(delta);
- } else {
- // add size if not allocated
- usedMemory.addAndGet(delta);
- ShuffleServerMetrics.gaugeUsedBufferSize.set(usedMemory.get());
- }
+ public void updateUsedMemory(long delta) {
+ // add size if not allocated
+ usedMemory.addAndGet(delta);
+ ShuffleServerMetrics.gaugeUsedBufferSize.set(usedMemory.get());
}
void requirePreAllocatedSize(long delta) {
@@ -355,7 +353,7 @@ public class ShuffleBufferManager {
ShuffleServerMetrics.gaugeAllocatedBufferSize.set(preAllocatedSize.get());
}
- void releasePreAllocatedSize(long delta) {
+ public void releasePreAllocatedSize(long delta) {
preAllocatedSize.addAndGet(-delta);
ShuffleServerMetrics.gaugeAllocatedBufferSize.set(preAllocatedSize.get());
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 661857d8..86a8dcdf 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -175,9 +175,9 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
// won't flush for partition 1-1
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
expectedBlocks1.addAll(Lists.newArrayList(partitionedData0.getBlockList()));
- long requireId = shuffleTaskManager.requireBuffer(35);
+ long bufferId = shuffleTaskManager.requireBuffer(35);
assertEquals(1, bufferIds.size());
- PreAllocatedBufferInfo pabi = bufferIds.get(requireId);
+ PreAllocatedBufferInfo pabi = bufferIds.get(bufferId);
assertEquals(35, pabi.getRequireSize());
StatusCode sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId,
true, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
partitionedData0.getBlockList());
@@ -185,6 +185,8 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
assertEquals(1, bufferIds.size());
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.commitShuffle(appId, shuffleId);
+ // manually release the pre allocate buffer
+ shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
ShuffleFlushManager shuffleFlushManager =
shuffleServer.getShuffleFlushManager();
assertEquals(1, shuffleFlushManager.getCommittedBlockIds(appId,
shuffleId).getLongCardinality());
@@ -192,10 +194,11 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
// flush for partition 1-1
ShufflePartitionedData partitionedData1 = createPartitionedData(1, 2, 35);
expectedBlocks1.addAll(Lists.newArrayList(partitionedData1.getBlockList()));
- shuffleTaskManager.requireBuffer(70);
+ bufferId = shuffleTaskManager.requireBuffer(70);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true,
partitionedData1);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
partitionedData1.getBlockList());
assertEquals(StatusCode.SUCCESS, sc);
+ shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1);
// won't flush for partition 1-1
@@ -209,17 +212,19 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
// won't flush for partition 2-2
ShufflePartitionedData partitionedData3 = createPartitionedData(2, 1, 30);
expectedBlocks2.addAll(Lists.newArrayList(partitionedData3.getBlockList()));
- shuffleTaskManager.requireBuffer(30);
+ bufferId = shuffleTaskManager.requireBuffer(30);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true,
partitionedData3);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
partitionedData3.getBlockList());
+ shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
assertEquals(StatusCode.SUCCESS, sc);
// flush for partition 2-2
ShufflePartitionedData partitionedData4 = createPartitionedData(2, 1, 35);
expectedBlocks2.addAll(Lists.newArrayList(partitionedData4.getBlockList()));
- shuffleTaskManager.requireBuffer(35);
+ bufferId = shuffleTaskManager.requireBuffer(35);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true,
partitionedData4);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
partitionedData4.getBlockList());
+ shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.commitShuffle(appId, shuffleId);
@@ -230,9 +235,10 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
ShufflePartitionedData partitionedData5 = createPartitionedData(1, 2, 35);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
partitionedData5.getBlockList());
expectedBlocks1.addAll(Lists.newArrayList(partitionedData5.getBlockList()));
- shuffleTaskManager.requireBuffer(70);
+ bufferId = shuffleTaskManager.requireBuffer(70);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true,
partitionedData5);
assertEquals(StatusCode.SUCCESS, sc);
+ shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
// 2 new blocks should be committed
waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1 + 3 + 2);
@@ -245,9 +251,10 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
// flush for partition 0-1
ShufflePartitionedData partitionedData7 = createPartitionedData(1, 2, 35);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
partitionedData7.getBlockList());
- shuffleTaskManager.requireBuffer(70);
+ bufferId = shuffleTaskManager.requireBuffer(70);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true,
partitionedData7);
assertEquals(StatusCode.SUCCESS, sc);
+ shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
// 2 new blocks should be committed
waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1 + 3 + 2 + 2);
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 4b6c0a77..e00bf9d8 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -336,9 +336,9 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
// receive data with preAllocation
shuffleBufferManager.cacheShuffleData(appId, shuffleId, true,
createData(0, 16));
assertEquals(48, shuffleBufferManager.getUsedMemory());
- assertEquals(0, shuffleBufferManager.getPreAllocatedSize());
+ assertEquals(48, shuffleBufferManager.getPreAllocatedSize());
// release memory
- shuffleBufferManager.releaseMemory(48, false, false);
+ shuffleBufferManager.releaseMemory(48, false, true);
assertEquals(0, shuffleBufferManager.getUsedMemory());
assertEquals(0, shuffleBufferManager.getPreAllocatedSize());
// receive data without preAllocation
@@ -367,12 +367,16 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
// actual data size < spillThreshold, won't flush
sc = shuffleBufferManager.cacheShuffleData(appId, shuffleId, true,
createData(1, 16));
+ shuffleBufferManager.releasePreAllocatedSize(48);
assertEquals(StatusCode.SUCCESS, sc);
assertEquals(500, shuffleBufferManager.getUsedMemory());
assertEquals(452, shuffleBufferManager.getPreAllocatedSize());
// actual data size > highWaterMark, flush
shuffleBufferManager.cacheShuffleData(appId, shuffleId, true,
createData(0, 400));
+ shuffleBufferManager.releasePreAllocatedSize(432);
+ // trigger flush manually
+ shuffleBufferManager.flushIfNecessary();
assertEquals(StatusCode.SUCCESS, sc);
assertEquals(500, shuffleBufferManager.getUsedMemory());
assertEquals(20, shuffleBufferManager.getPreAllocatedSize());
diff --git a/server/src/test/resources/server.conf
b/server/src/test/resources/server.conf
index 2838ba07..ca7a47bf 100644
--- a/server/src/test/resources/server.conf
+++ b/server/src/test/resources/server.conf
@@ -26,3 +26,4 @@ rss.jetty.http.port 12345
rss.jetty.corePool.size 64
rss.server.heartbeat.timeout 1
rss.server.write.timeout 2000
+rss.server.shuffleBufferManager.trigger.flush.interval 500