This is an automated email from the ASF dual-hosted git repository.
zhengchenyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new f9b4c0e01 [2171] improvement(remote merge): remove cached block for
partition. (#2172)
f9b4c0e01 is described below
commit f9b4c0e01584102a59000d8267ebd0df1f946b52
Author: zhengchenyu <[email protected]>
AuthorDate: Fri Oct 18 15:10:11 2024 +0800
[2171] improvement(remote merge): remove cached block for partition. (#2172)
### What changes were proposed in this pull request?
remove cached block for partition.
### Why are the changes needed?
Fix: #2171
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
unit test, integration test.
---
.../uniffle/common/merger/StreamedSegment.java | 22 ---------
.../test/RemoteMergeShuffleWithRssClientTest.java | 3 ++
...ShuffleWithRssClientTestWhenShuffleFlushed.java | 3 ++
.../org/apache/uniffle/test/RMWordCountTest.java | 2 +
.../uniffle/test/RMTezOrderedWordCountTest.java | 2 +
.../org/apache/uniffle/server/ShuffleServer.java | 5 +++
.../uniffle/server/ShuffleServerGrpcService.java | 4 --
.../server/buffer/ShuffleBufferManager.java | 4 ++
.../server/buffer/ShuffleBufferWithSkipList.java | 12 +++++
.../org/apache/uniffle/server/merge/Partition.java | 52 +++++++++++-----------
.../org/apache/uniffle/server/merge/Shuffle.java | 10 -----
.../uniffle/server/merge/ShuffleMergeManager.java | 8 ----
.../server/merge/ShuffleMergeManagerTest.java | 3 +-
13 files changed, 59 insertions(+), 71 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/merger/StreamedSegment.java
b/common/src/main/java/org/apache/uniffle/common/merger/StreamedSegment.java
index b4f0117f5..7276b140e 100644
--- a/common/src/main/java/org/apache/uniffle/common/merger/StreamedSegment.java
+++ b/common/src/main/java/org/apache/uniffle/common/merger/StreamedSegment.java
@@ -21,8 +21,6 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import io.netty.buffer.ByteBuf;
-
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.records.RecordsReader;
import org.apache.uniffle.common.serializer.PartialInputStream;
@@ -30,7 +28,6 @@ import
org.apache.uniffle.common.serializer.PartialInputStream;
public class StreamedSegment<K, V> extends Segment {
private RecordsReader<K, V> reader;
- ByteBuf byteBuf = null;
public StreamedSegment(
RssConf rssConf,
@@ -43,21 +40,6 @@ public class StreamedSegment<K, V> extends Segment {
this.reader = new RecordsReader<>(rssConf, inputStream, keyClass,
valueClass, raw);
}
- public StreamedSegment(
- RssConf rssConf, ByteBuf byteBuf, long blockId, Class keyClass, Class
valueClass, boolean raw)
- throws IOException {
- super(blockId);
- this.byteBuf = byteBuf;
- this.byteBuf.retain();
- this.reader =
- new RecordsReader<>(
- rssConf,
- PartialInputStream.newInputStream(byteBuf.nioBuffer()),
- keyClass,
- valueClass,
- raw);
- }
-
// The buffer must be sorted by key
public StreamedSegment(
RssConf rssConf,
@@ -110,10 +92,6 @@ public class StreamedSegment<K, V> extends Segment {
@Override
public void close() throws IOException {
- if (byteBuf != null) {
- this.byteBuf.release();
- this.byteBuf = null;
- }
if (this.reader != null) {
this.reader.close();
this.reader = null;
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
index dfc56d6c6..17ec0c212 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
@@ -65,6 +65,7 @@ import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.buffer.ShuffleBufferType;
import org.apache.uniffle.storage.util.StorageType;
import static
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED;
@@ -86,6 +87,8 @@ public class RemoteMergeShuffleWithRssClientTest extends
ShuffleReadWriteBase {
ShuffleServerConf shuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
shuffleServerConf.set(ShuffleServerConf.SERVER_MERGE_ENABLE, true);
shuffleServerConf.set(ShuffleServerConf.SERVER_MERGE_DEFAULT_MERGED_BLOCK_SIZE,
"1k");
+ shuffleServerConf.set(
+ ShuffleServerConf.SERVER_SHUFFLE_BUFFER_TYPE,
ShuffleBufferType.SKIP_LIST);
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat",
10000000);
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
index 28d7cc10a..c0af5e1bf 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
@@ -65,6 +65,7 @@ import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.buffer.ShuffleBufferType;
import org.apache.uniffle.storage.util.StorageType;
import static
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED;
@@ -88,6 +89,8 @@ public class
RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed extends Shuff
ShuffleServerConf shuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
shuffleServerConf.set(ShuffleServerConf.SERVER_MERGE_ENABLE, true);
shuffleServerConf.set(ShuffleServerConf.SERVER_MERGE_DEFAULT_MERGED_BLOCK_SIZE,
"1k");
+ shuffleServerConf.set(
+ ShuffleServerConf.SERVER_SHUFFLE_BUFFER_TYPE,
ShuffleBufferType.SKIP_LIST);
// Each shuffle data will be flushed!
shuffleServerConf.set(SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 0.0);
shuffleServerConf.set(SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
diff --git
a/integration-test/mr/src/test/java/org/apache/uniffle/test/RMWordCountTest.java
b/integration-test/mr/src/test/java/org/apache/uniffle/test/RMWordCountTest.java
index 206068eb6..4ac8fc589 100644
---
a/integration-test/mr/src/test/java/org/apache/uniffle/test/RMWordCountTest.java
+++
b/integration-test/mr/src/test/java/org/apache/uniffle/test/RMWordCountTest.java
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.buffer.ShuffleBufferType;
import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MERGE_ENABLE;
@@ -46,6 +47,7 @@ public class RMWordCountTest extends MRIntegrationTestBase {
public static void setupServers() throws Exception {
ShuffleServerConf serverConf = new ShuffleServerConf();
serverConf.set(SERVER_MERGE_ENABLE, true);
+ serverConf.set(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_TYPE,
ShuffleBufferType.SKIP_LIST);
MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf(),
serverConf);
}
diff --git
a/integration-test/tez/src/test/java/org/apache/uniffle/test/RMTezOrderedWordCountTest.java
b/integration-test/tez/src/test/java/org/apache/uniffle/test/RMTezOrderedWordCountTest.java
index eb5ee51dc..925ef02ab 100644
---
a/integration-test/tez/src/test/java/org/apache/uniffle/test/RMTezOrderedWordCountTest.java
+++
b/integration-test/tez/src/test/java/org/apache/uniffle/test/RMTezOrderedWordCountTest.java
@@ -36,6 +36,7 @@ import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.buffer.ShuffleBufferType;
import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MERGE_ENABLE;
@@ -51,6 +52,7 @@ public class RMTezOrderedWordCountTest extends
TezIntegrationTestBase {
public static void setupServers() throws Exception {
ShuffleServerConf serverConf = new ShuffleServerConf();
serverConf.set(SERVER_MERGE_ENABLE, true);
+ serverConf.set(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_TYPE,
ShuffleBufferType.SKIP_LIST);
TezIntegrationTestBase.setupServers(serverConf);
}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 30ecacf3e..92fa6b36b 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -58,6 +58,7 @@ import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.common.web.CoalescedCollectorRegistry;
import org.apache.uniffle.common.web.JettyServer;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+import org.apache.uniffle.server.buffer.ShuffleBufferType;
import org.apache.uniffle.server.merge.ShuffleMergeManager;
import org.apache.uniffle.server.netty.StreamServer;
import org.apache.uniffle.server.storage.StorageManager;
@@ -310,6 +311,10 @@ public class ShuffleServer {
new ShuffleBufferManager(shuffleServerConf, shuffleFlushManager,
nettyServerEnabled);
remoteMergeEnable =
shuffleServerConf.get(ShuffleServerConf.SERVER_MERGE_ENABLE);
if (remoteMergeEnable) {
+ if (shuffleBufferManager.getShuffleBufferType() !=
ShuffleBufferType.SKIP_LIST) {
+ throw new RssException(
+ "Shuffle buffer type must be SKIP_LIST when remote merge is
enable!");
+ }
shuffleMergeManager = new ShuffleMergeManager(shuffleServerConf, this);
}
shuffleTaskManager =
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 8ed1fae73..ee780b872 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -491,10 +491,6 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
hasFailureOccurred = true;
break;
} else {
- if (shuffleServer.isRemoteMergeEnable()) {
- // TODO: Use ShuffleBufferWithSkipList to avoid caching block
here.
- shuffleServer.getShuffleMergeManager().cacheBlock(appId,
shuffleId, spd);
- }
long toReleasedSize = spd.getTotalBlockEncodedLength();
// after each cacheShuffleData call, the `preAllocatedSize` is
updated timely.
manager.releasePreAllocatedSize(toReleasedSize);
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index b7b66be37..b4c9b9ef3 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -807,4 +807,8 @@ public class ShuffleBufferManager {
public void setBufferFlushThreshold(long bufferFlushThreshold) {
this.bufferFlushThreshold = bufferFlushThreshold;
}
+
+ public ShuffleBufferType getShuffleBufferType() {
+ return shuffleBufferType;
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index 4a92c919f..d6c47933a 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -282,4 +282,16 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
}
return hasLastBlockId;
}
+
+ public synchronized ShufflePartitionedBlock getBlock(long blockId) {
+ ShufflePartitionedBlock block = blocksMap.get(blockId);
+ if (block == null) {
+ for (ConcurrentSkipListMap<Long, ShufflePartitionedBlock> map :
inFlushBlockMap.values()) {
+ if (map.containsKey(blockId)) {
+ return map.get(blockId);
+ }
+ }
+ }
+ return block;
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
b/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
index 05e28ddaa..667de657d 100644
--- a/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
+++ b/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
@@ -29,7 +29,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import io.netty.buffer.ByteBuf;
+import com.google.common.collect.Range;
+import io.netty.buffer.ByteBufUtil;
import org.apache.hadoop.io.RawComparator;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
@@ -51,8 +52,9 @@ import
org.apache.uniffle.common.netty.buffer.FileSegmentManagedBuffer;
import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
import org.apache.uniffle.common.rpc.StatusCode;
-import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.server.ShuffleDataReadEvent;
+import org.apache.uniffle.server.buffer.ShuffleBuffer;
+import org.apache.uniffle.server.buffer.ShuffleBufferWithSkipList;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.handler.impl.LocalFileServerReadHandler;
import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
@@ -73,13 +75,6 @@ public class Partition<K, V> {
private final Shuffle shuffle;
private final int partitionId;
- // Inserting or deleting ShuffleBuffer::blocks while traversing blocks may
cause an
- // ConcurrentModificationException.
- // So cache the block here. When we use the cached block, we should check
refCnt so that we can
- // make sure the ByteBuf
- // is not released.
- Map<Long, ShufflePartitionedBlock> cachedblockMap =
JavaUtils.newConcurrentMap();
- Map<Long, ShufflePartitionedBlock> mergedBlockMap =
JavaUtils.newConcurrentMap();
private MergeState state = MergeState.INITED;
private MergedResult result;
@@ -133,6 +128,22 @@ public class Partition<K, V> {
}
}
+ private ShufflePartitionedBlock getShufflePartitionedBlock(long blockId,
boolean merged) {
+ Map.Entry<Range<Integer>, ShuffleBuffer> entry =
+ shuffle
+ .shuffleServer
+ .getShuffleBufferManager()
+ .getShuffleBufferEntry(
+ merged ? shuffle.appId + MERGE_APP_SUFFIX : shuffle.appId,
+ shuffle.shuffleId,
+ partitionId);
+ if (entry != null) {
+ ShuffleBuffer shuffleBuffer = entry.getValue();
+ return ((ShuffleBufferWithSkipList) shuffleBuffer).getBlock(blockId);
+ }
+ return null;
+ }
+
// getSegments is used to get segments from original shuffle blocks
public List<Segment> getSegments(
RssConf rssConf, Iterator<Long> blockIds, Class keyClass, Class
valueClass)
@@ -141,25 +152,21 @@ public class Partition<K, V> {
Set<Long> blocksFlushed = new HashSet<>();
while (blockIds.hasNext()) {
long blockId = blockIds.next();
- ByteBuf buf = null;
- if (cachedblockMap.containsKey(blockId)) {
- buf = cachedblockMap.get(blockId).getData();
- }
- if (buf != null && buf.refCnt() > 0) {
+ ShufflePartitionedBlock block = getShufflePartitionedBlock(blockId,
false);
+ if (block != null && ByteBufUtil.isAccessible(block.getData())) {
try {
StreamedSegment segment =
new StreamedSegment(
rssConf,
- buf,
+ block.getData().nioBuffer(0, block.getDataLength()),
blockId,
keyClass,
valueClass,
(shuffle.comparator instanceof RawComparator));
segments.add(segment);
} catch (Exception e) {
- // If ByteBuf is released by flush cleanup before we retain in
Segment,
- // will throw ConcurrentModificationException. So we need get block
buffer
- // from file
+ // If ByteBuf is released by flush cleanup will throw
ConcurrentModificationException.
+ // So we need get block buffer from file
LOG.warn("construct segment failed, caused by ", e);
blocksFlushed.add(blockId);
}
@@ -238,10 +245,6 @@ public class Partition<K, V> {
return new MergeStatus(currentState, size);
}
- public void cacheBlock(ShufflePartitionedBlock spb) {
- cachedblockMap.put(spb.getBlockId(), spb);
- }
-
// When we merge data, we will divide the merge results into blocks
according to the specified
// block size.
// The merged block in a new appId field (${appd} + MERGE_APP_SUFFIX). We
will process the merged
@@ -260,7 +263,6 @@ public class Partition<K, V> {
.getShuffleTaskManager()
.cacheShuffleData(appId, shuffle.shuffleId, false, spd);
if (ret == StatusCode.SUCCESS) {
- mergedBlockMap.put(blockId, spb);
shuffle
.shuffleServer
.getShuffleTaskManager()
@@ -317,7 +319,7 @@ public class Partition<K, V> {
private NettyManagedBuffer getMergedBlockBufferInMemory(long blockId) {
try {
- ShufflePartitionedBlock block = this.mergedBlockMap.get(blockId);
+ ShufflePartitionedBlock block = this.getShufflePartitionedBlock(blockId,
true);
// We must make sure refCnt > 0, it means the ByteBuf is not released by
flush cleanup
if (block != null && block.getData().refCnt() > 0) {
return new NettyManagedBuffer(block.getData().retain());
@@ -412,8 +414,6 @@ public class Partition<K, V> {
if (reader != null) {
reader.close();
}
- cachedblockMap.clear();
- mergedBlockMap.clear();
shuffleMeta.clear();
} catch (Exception e) {
LOG.warn("Partition {} clean up failed, caused by {}", this, e);
diff --git a/server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java
b/server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java
index 92097eff6..3aa7df6f8 100644
--- a/server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java
+++ b/server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java
@@ -24,8 +24,6 @@ import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
-import org.apache.uniffle.common.ShufflePartitionedBlock;
-import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.server.ShuffleServer;
@@ -81,14 +79,6 @@ public class Shuffle<K, V> {
this.partitions.clear();
}
- public void cacheBlock(ShufflePartitionedData spd) throws IOException {
- int partitionId = spd.getPartitionId();
- this.partitions.putIfAbsent(partitionId, new Partition<K, V>(this,
partitionId));
- for (ShufflePartitionedBlock block : spd.getBlockList()) {
- this.partitions.get(partitionId).cacheBlock(block);
- }
- }
-
public ClassLoader getClassLoader() {
return classLoader;
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java
b/server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java
index 2024459d5..f50bfd271 100644
---
a/server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java
@@ -39,7 +39,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.merger.Segment;
import org.apache.uniffle.common.rpc.StatusCode;
@@ -262,13 +261,6 @@ public class ShuffleMergeManager {
return this.getPartition(appId, shuffleId,
partitionId).getShuffleData(blockId);
}
- public void cacheBlock(String appId, int shuffleId, ShufflePartitionedData
spd)
- throws IOException {
- if (this.shuffles.containsKey(appId) &&
this.shuffles.get(appId).containsKey(shuffleId)) {
- this.getShuffle(appId, shuffleId).cacheBlock(spd);
- }
- }
-
public MergeStatus tryGetBlock(String appId, int shuffleId, int partitionId,
long blockId) {
return this.getPartition(appId, shuffleId,
partitionId).tryGetBlock(blockId);
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/merge/ShuffleMergeManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/merge/ShuffleMergeManagerTest.java
index 69e2a5788..449881dc3 100644
---
a/server/src/test/java/org/apache/uniffle/server/merge/ShuffleMergeManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/merge/ShuffleMergeManagerTest.java
@@ -49,6 +49,7 @@ import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerMetrics;
import org.apache.uniffle.server.ShuffleTaskManager;
+import org.apache.uniffle.server.buffer.ShuffleBufferType;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -80,6 +81,7 @@ public class ShuffleMergeManagerTest {
tempDir1.getAbsolutePath() + "," + tempDir2.getAbsolutePath());
serverConf.setLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT,
60L * 1000L * 60L);
serverConf.set(ShuffleServerConf.SERVER_MERGE_ENABLE, true);
+ serverConf.set(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_TYPE,
ShuffleBufferType.SKIP_LIST);
ShuffleServerMetrics.clear();
ShuffleServerMetrics.register();
assertTrue(this.tempDir1.isDirectory());
@@ -156,7 +158,6 @@ public class ShuffleMergeManagerTest {
}
ShufflePartitionedData spd = new ShufflePartitionedData(PARTITION_ID,
shufflePartitionedBlocks);
shuffleTaskManager.cacheShuffleData(APP_ID, SHUFFLE_ID, false, spd);
- mergeManager.cacheBlock(APP_ID, SHUFFLE_ID, spd);
// 4.2 report shuffle result
shuffleTaskManager.addFinishedBlockIds(
APP_ID, SHUFFLE_ID, ImmutableMap.of(PARTITION_ID, blocks), 1);