This is an automated email from the ASF dual-hosted git repository.

zhengchenyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new f9b4c0e01 [2171] improvement(remote merge): remove cached block for 
partition. (#2172)
f9b4c0e01 is described below

commit f9b4c0e01584102a59000d8267ebd0df1f946b52
Author: zhengchenyu <[email protected]>
AuthorDate: Fri Oct 18 15:10:11 2024 +0800

    [2171] improvement(remote merge): remove cached block for partition. (#2172)
    
    ### What changes were proposed in this pull request?
    
    remove cached block for partition.
    
    ### Why are the changes needed?
    
    
    Fix: #2171
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    unit test, integration test.
---
 .../uniffle/common/merger/StreamedSegment.java     | 22 ---------
 .../test/RemoteMergeShuffleWithRssClientTest.java  |  3 ++
 ...ShuffleWithRssClientTestWhenShuffleFlushed.java |  3 ++
 .../org/apache/uniffle/test/RMWordCountTest.java   |  2 +
 .../uniffle/test/RMTezOrderedWordCountTest.java    |  2 +
 .../org/apache/uniffle/server/ShuffleServer.java   |  5 +++
 .../uniffle/server/ShuffleServerGrpcService.java   |  4 --
 .../server/buffer/ShuffleBufferManager.java        |  4 ++
 .../server/buffer/ShuffleBufferWithSkipList.java   | 12 +++++
 .../org/apache/uniffle/server/merge/Partition.java | 52 +++++++++++-----------
 .../org/apache/uniffle/server/merge/Shuffle.java   | 10 -----
 .../uniffle/server/merge/ShuffleMergeManager.java  |  8 ----
 .../server/merge/ShuffleMergeManagerTest.java      |  3 +-
 13 files changed, 59 insertions(+), 71 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/merger/StreamedSegment.java 
b/common/src/main/java/org/apache/uniffle/common/merger/StreamedSegment.java
index b4f0117f5..7276b140e 100644
--- a/common/src/main/java/org/apache/uniffle/common/merger/StreamedSegment.java
+++ b/common/src/main/java/org/apache/uniffle/common/merger/StreamedSegment.java
@@ -21,8 +21,6 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import io.netty.buffer.ByteBuf;
-
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.records.RecordsReader;
 import org.apache.uniffle.common.serializer.PartialInputStream;
@@ -30,7 +28,6 @@ import 
org.apache.uniffle.common.serializer.PartialInputStream;
 public class StreamedSegment<K, V> extends Segment {
 
   private RecordsReader<K, V> reader;
-  ByteBuf byteBuf = null;
 
   public StreamedSegment(
       RssConf rssConf,
@@ -43,21 +40,6 @@ public class StreamedSegment<K, V> extends Segment {
     this.reader = new RecordsReader<>(rssConf, inputStream, keyClass, 
valueClass, raw);
   }
 
-  public StreamedSegment(
-      RssConf rssConf, ByteBuf byteBuf, long blockId, Class keyClass, Class 
valueClass, boolean raw)
-      throws IOException {
-    super(blockId);
-    this.byteBuf = byteBuf;
-    this.byteBuf.retain();
-    this.reader =
-        new RecordsReader<>(
-            rssConf,
-            PartialInputStream.newInputStream(byteBuf.nioBuffer()),
-            keyClass,
-            valueClass,
-            raw);
-  }
-
   // The buffer must be sorted by key
   public StreamedSegment(
       RssConf rssConf,
@@ -110,10 +92,6 @@ public class StreamedSegment<K, V> extends Segment {
 
   @Override
   public void close() throws IOException {
-    if (byteBuf != null) {
-      this.byteBuf.release();
-      this.byteBuf = null;
-    }
     if (this.reader != null) {
       this.reader.close();
       this.reader = null;
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
index dfc56d6c6..17ec0c212 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
@@ -65,6 +65,7 @@ import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.buffer.ShuffleBufferType;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static 
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED;
@@ -86,6 +87,8 @@ public class RemoteMergeShuffleWithRssClientTest extends 
ShuffleReadWriteBase {
     ShuffleServerConf shuffleServerConf = 
getShuffleServerConf(ServerType.GRPC);
     shuffleServerConf.set(ShuffleServerConf.SERVER_MERGE_ENABLE, true);
     
shuffleServerConf.set(ShuffleServerConf.SERVER_MERGE_DEFAULT_MERGED_BLOCK_SIZE, 
"1k");
+    shuffleServerConf.set(
+        ShuffleServerConf.SERVER_SHUFFLE_BUFFER_TYPE, 
ShuffleBufferType.SKIP_LIST);
     shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 
10000000);
     File dataDir1 = new File(tmpDir, "data1");
     File dataDir2 = new File(tmpDir, "data2");
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
index 28d7cc10a..c0af5e1bf 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed.java
@@ -65,6 +65,7 @@ import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.buffer.ShuffleBufferType;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static 
org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED;
@@ -88,6 +89,8 @@ public class 
RemoteMergeShuffleWithRssClientTestWhenShuffleFlushed extends Shuff
     ShuffleServerConf shuffleServerConf = 
getShuffleServerConf(ServerType.GRPC);
     shuffleServerConf.set(ShuffleServerConf.SERVER_MERGE_ENABLE, true);
     
shuffleServerConf.set(ShuffleServerConf.SERVER_MERGE_DEFAULT_MERGED_BLOCK_SIZE, 
"1k");
+    shuffleServerConf.set(
+        ShuffleServerConf.SERVER_SHUFFLE_BUFFER_TYPE, 
ShuffleBufferType.SKIP_LIST);
     // Each shuffle data will be flushed!
     shuffleServerConf.set(SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 0.0);
     shuffleServerConf.set(SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
diff --git 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/RMWordCountTest.java
 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/RMWordCountTest.java
index 206068eb6..4ac8fc589 100644
--- 
a/integration-test/mr/src/test/java/org/apache/uniffle/test/RMWordCountTest.java
+++ 
b/integration-test/mr/src/test/java/org/apache/uniffle/test/RMWordCountTest.java
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.buffer.ShuffleBufferType;
 
 import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MERGE_ENABLE;
 
@@ -46,6 +47,7 @@ public class RMWordCountTest extends MRIntegrationTestBase {
   public static void setupServers() throws Exception {
     ShuffleServerConf serverConf = new ShuffleServerConf();
     serverConf.set(SERVER_MERGE_ENABLE, true);
+    serverConf.set(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_TYPE, 
ShuffleBufferType.SKIP_LIST);
     MRIntegrationTestBase.setupServers(MRIntegrationTestBase.getDynamicConf(), 
serverConf);
   }
 
diff --git 
a/integration-test/tez/src/test/java/org/apache/uniffle/test/RMTezOrderedWordCountTest.java
 
b/integration-test/tez/src/test/java/org/apache/uniffle/test/RMTezOrderedWordCountTest.java
index eb5ee51dc..925ef02ab 100644
--- 
a/integration-test/tez/src/test/java/org/apache/uniffle/test/RMTezOrderedWordCountTest.java
+++ 
b/integration-test/tez/src/test/java/org/apache/uniffle/test/RMTezOrderedWordCountTest.java
@@ -36,6 +36,7 @@ import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.buffer.ShuffleBufferType;
 
 import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MERGE_ENABLE;
 
@@ -51,6 +52,7 @@ public class RMTezOrderedWordCountTest extends 
TezIntegrationTestBase {
   public static void setupServers() throws Exception {
     ShuffleServerConf serverConf = new ShuffleServerConf();
     serverConf.set(SERVER_MERGE_ENABLE, true);
+    serverConf.set(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_TYPE, 
ShuffleBufferType.SKIP_LIST);
     TezIntegrationTestBase.setupServers(serverConf);
   }
 
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 30ecacf3e..92fa6b36b 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -58,6 +58,7 @@ import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.common.web.CoalescedCollectorRegistry;
 import org.apache.uniffle.common.web.JettyServer;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+import org.apache.uniffle.server.buffer.ShuffleBufferType;
 import org.apache.uniffle.server.merge.ShuffleMergeManager;
 import org.apache.uniffle.server.netty.StreamServer;
 import org.apache.uniffle.server.storage.StorageManager;
@@ -310,6 +311,10 @@ public class ShuffleServer {
         new ShuffleBufferManager(shuffleServerConf, shuffleFlushManager, 
nettyServerEnabled);
     remoteMergeEnable = 
shuffleServerConf.get(ShuffleServerConf.SERVER_MERGE_ENABLE);
     if (remoteMergeEnable) {
+      if (shuffleBufferManager.getShuffleBufferType() != 
ShuffleBufferType.SKIP_LIST) {
+        throw new RssException(
+            "Shuffle buffer type must be SKIP_LIST when remote merge is 
enable!");
+      }
       shuffleMergeManager = new ShuffleMergeManager(shuffleServerConf, this);
     }
     shuffleTaskManager =
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 8ed1fae73..ee780b872 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -491,10 +491,6 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
               hasFailureOccurred = true;
               break;
             } else {
-              if (shuffleServer.isRemoteMergeEnable()) {
-                // TODO: Use ShuffleBufferWithSkipList to avoid caching block 
here.
-                shuffleServer.getShuffleMergeManager().cacheBlock(appId, 
shuffleId, spd);
-              }
               long toReleasedSize = spd.getTotalBlockEncodedLength();
               // after each cacheShuffleData call, the `preAllocatedSize` is 
updated timely.
               manager.releasePreAllocatedSize(toReleasedSize);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index b7b66be37..b4c9b9ef3 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -807,4 +807,8 @@ public class ShuffleBufferManager {
   public void setBufferFlushThreshold(long bufferFlushThreshold) {
     this.bufferFlushThreshold = bufferFlushThreshold;
   }
+
+  public ShuffleBufferType getShuffleBufferType() {
+    return shuffleBufferType;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index 4a92c919f..d6c47933a 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -282,4 +282,16 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
     }
     return hasLastBlockId;
   }
+
+  public synchronized ShufflePartitionedBlock getBlock(long blockId) {
+    ShufflePartitionedBlock block = blocksMap.get(blockId);
+    if (block == null) {
+      for (ConcurrentSkipListMap<Long, ShufflePartitionedBlock> map : 
inFlushBlockMap.values()) {
+        if (map.containsKey(blockId)) {
+          return map.get(blockId);
+        }
+      }
+    }
+    return block;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/merge/Partition.java 
b/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
index 05e28ddaa..667de657d 100644
--- a/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
+++ b/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
@@ -29,7 +29,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import io.netty.buffer.ByteBuf;
+import com.google.common.collect.Range;
+import io.netty.buffer.ByteBufUtil;
 import org.apache.hadoop.io.RawComparator;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
@@ -51,8 +52,9 @@ import 
org.apache.uniffle.common.netty.buffer.FileSegmentManagedBuffer;
 import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
 import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
 import org.apache.uniffle.common.rpc.StatusCode;
-import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.server.ShuffleDataReadEvent;
+import org.apache.uniffle.server.buffer.ShuffleBuffer;
+import org.apache.uniffle.server.buffer.ShuffleBufferWithSkipList;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.handler.impl.LocalFileServerReadHandler;
 import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
@@ -73,13 +75,6 @@ public class Partition<K, V> {
 
   private final Shuffle shuffle;
   private final int partitionId;
-  // Inserting or deleting ShuffleBuffer::blocks while traversing blocks may 
cause an
-  // ConcurrentModificationException.
-  // So cache the block here. When we use the cached block, we should check 
refCnt so that we can
-  // make sure the ByteBuf
-  // is not released.
-  Map<Long, ShufflePartitionedBlock> cachedblockMap = 
JavaUtils.newConcurrentMap();
-  Map<Long, ShufflePartitionedBlock> mergedBlockMap = 
JavaUtils.newConcurrentMap();
 
   private MergeState state = MergeState.INITED;
   private MergedResult result;
@@ -133,6 +128,22 @@ public class Partition<K, V> {
     }
   }
 
+  private ShufflePartitionedBlock getShufflePartitionedBlock(long blockId, 
boolean merged) {
+    Map.Entry<Range<Integer>, ShuffleBuffer> entry =
+        shuffle
+            .shuffleServer
+            .getShuffleBufferManager()
+            .getShuffleBufferEntry(
+                merged ? shuffle.appId + MERGE_APP_SUFFIX : shuffle.appId,
+                shuffle.shuffleId,
+                partitionId);
+    if (entry != null) {
+      ShuffleBuffer shuffleBuffer = entry.getValue();
+      return ((ShuffleBufferWithSkipList) shuffleBuffer).getBlock(blockId);
+    }
+    return null;
+  }
+
   // getSegments is used to get segments from original shuffle blocks
   public List<Segment> getSegments(
       RssConf rssConf, Iterator<Long> blockIds, Class keyClass, Class 
valueClass)
@@ -141,25 +152,21 @@ public class Partition<K, V> {
     Set<Long> blocksFlushed = new HashSet<>();
     while (blockIds.hasNext()) {
       long blockId = blockIds.next();
-      ByteBuf buf = null;
-      if (cachedblockMap.containsKey(blockId)) {
-        buf = cachedblockMap.get(blockId).getData();
-      }
-      if (buf != null && buf.refCnt() > 0) {
+      ShufflePartitionedBlock block = getShufflePartitionedBlock(blockId, 
false);
+      if (block != null && ByteBufUtil.isAccessible(block.getData())) {
         try {
           StreamedSegment segment =
               new StreamedSegment(
                   rssConf,
-                  buf,
+                  block.getData().nioBuffer(0, block.getDataLength()),
                   blockId,
                   keyClass,
                   valueClass,
                   (shuffle.comparator instanceof RawComparator));
           segments.add(segment);
         } catch (Exception e) {
-          // If ByteBuf is released by flush cleanup before we retain in 
Segment,
-          // will throw ConcurrentModificationException. So we need get block 
buffer
-          // from file
+          // If ByteBuf is released by flush cleanup will throw 
ConcurrentModificationException.
+          // So we need get block buffer from file
           LOG.warn("construct segment failed, caused by ", e);
           blocksFlushed.add(blockId);
         }
@@ -238,10 +245,6 @@ public class Partition<K, V> {
     return new MergeStatus(currentState, size);
   }
 
-  public void cacheBlock(ShufflePartitionedBlock spb) {
-    cachedblockMap.put(spb.getBlockId(), spb);
-  }
-
   // When we merge data, we will divide the merge results into blocks 
according to the specified
   // block size.
   // The merged block in a new appId field (${appd} + MERGE_APP_SUFFIX). We 
will process the merged
@@ -260,7 +263,6 @@ public class Partition<K, V> {
               .getShuffleTaskManager()
               .cacheShuffleData(appId, shuffle.shuffleId, false, spd);
       if (ret == StatusCode.SUCCESS) {
-        mergedBlockMap.put(blockId, spb);
         shuffle
             .shuffleServer
             .getShuffleTaskManager()
@@ -317,7 +319,7 @@ public class Partition<K, V> {
 
   private NettyManagedBuffer getMergedBlockBufferInMemory(long blockId) {
     try {
-      ShufflePartitionedBlock block = this.mergedBlockMap.get(blockId);
+      ShufflePartitionedBlock block = this.getShufflePartitionedBlock(blockId, 
true);
       // We must make sure refCnt > 0, it means the ByteBuf is not released by 
flush cleanup
       if (block != null && block.getData().refCnt() > 0) {
         return new NettyManagedBuffer(block.getData().retain());
@@ -412,8 +414,6 @@ public class Partition<K, V> {
       if (reader != null) {
         reader.close();
       }
-      cachedblockMap.clear();
-      mergedBlockMap.clear();
       shuffleMeta.clear();
     } catch (Exception e) {
       LOG.warn("Partition {} clean up failed, caused by {}", this, e);
diff --git a/server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java 
b/server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java
index 92097eff6..3aa7df6f8 100644
--- a/server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java
+++ b/server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java
@@ -24,8 +24,6 @@ import java.util.Map;
 import com.google.common.annotations.VisibleForTesting;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
-import org.apache.uniffle.common.ShufflePartitionedBlock;
-import org.apache.uniffle.common.ShufflePartitionedData;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.server.ShuffleServer;
@@ -81,14 +79,6 @@ public class Shuffle<K, V> {
     this.partitions.clear();
   }
 
-  public void cacheBlock(ShufflePartitionedData spd) throws IOException {
-    int partitionId = spd.getPartitionId();
-    this.partitions.putIfAbsent(partitionId, new Partition<K, V>(this, 
partitionId));
-    for (ShufflePartitionedBlock block : spd.getBlockList()) {
-      this.partitions.get(partitionId).cacheBlock(block);
-    }
-  }
-
   public ClassLoader getClassLoader() {
     return classLoader;
   }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java 
b/server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java
index 2024459d5..f50bfd271 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java
@@ -39,7 +39,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.common.ShufflePartitionedData;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.merger.Segment;
 import org.apache.uniffle.common.rpc.StatusCode;
@@ -262,13 +261,6 @@ public class ShuffleMergeManager {
     return this.getPartition(appId, shuffleId, 
partitionId).getShuffleData(blockId);
   }
 
-  public void cacheBlock(String appId, int shuffleId, ShufflePartitionedData 
spd)
-      throws IOException {
-    if (this.shuffles.containsKey(appId) && 
this.shuffles.get(appId).containsKey(shuffleId)) {
-      this.getShuffle(appId, shuffleId).cacheBlock(spd);
-    }
-  }
-
   public MergeStatus tryGetBlock(String appId, int shuffleId, int partitionId, 
long blockId) {
     return this.getPartition(appId, shuffleId, 
partitionId).tryGetBlock(blockId);
   }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/merge/ShuffleMergeManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/merge/ShuffleMergeManagerTest.java
index 69e2a5788..449881dc3 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/merge/ShuffleMergeManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/merge/ShuffleMergeManagerTest.java
@@ -49,6 +49,7 @@ import org.apache.uniffle.server.ShuffleServer;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.ShuffleServerMetrics;
 import org.apache.uniffle.server.ShuffleTaskManager;
+import org.apache.uniffle.server.buffer.ShuffleBufferType;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -80,6 +81,7 @@ public class ShuffleMergeManagerTest {
         tempDir1.getAbsolutePath() + "," + tempDir2.getAbsolutePath());
     serverConf.setLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 
60L * 1000L * 60L);
     serverConf.set(ShuffleServerConf.SERVER_MERGE_ENABLE, true);
+    serverConf.set(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_TYPE, 
ShuffleBufferType.SKIP_LIST);
     ShuffleServerMetrics.clear();
     ShuffleServerMetrics.register();
     assertTrue(this.tempDir1.isDirectory());
@@ -156,7 +158,6 @@ public class ShuffleMergeManagerTest {
     }
     ShufflePartitionedData spd = new ShufflePartitionedData(PARTITION_ID, 
shufflePartitionedBlocks);
     shuffleTaskManager.cacheShuffleData(APP_ID, SHUFFLE_ID, false, spd);
-    mergeManager.cacheBlock(APP_ID, SHUFFLE_ID, spd);
     // 4.2 report shuffle result
     shuffleTaskManager.addFinishedBlockIds(
         APP_ID, SHUFFLE_ID, ImmutableMap.of(PARTITION_ID, blocks), 1);

Reply via email to