This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new c83a1b5eb [#1727] feat(server): Introduce local allocation buffer to 
store blocks in memory (#2492)
c83a1b5eb is described below

commit c83a1b5ebc02d92152810b2855fec90232b07d7e
Author: xianjingfeng <[email protected]>
AuthorDate: Thu Jun 12 14:10:10 2025 +0800

    [#1727] feat(server): Introduce local allocation buffer to store blocks in 
memory (#2492)
    
    ### What changes were proposed in this pull request?
    
    Introduce local allocation buffer to store blocks in memory.
    
    ### Why are the changes needed?
    
    Fix: #1727
    
    ### Does this PR introduce _any_ user-facing change?
    
    set `rss.server.buffer.lab.enable` to `true` in server.conf
    
    ### How was this patch tested?
    
    CI and verify in production environment
---
 .../uniffle/common/ShufflePartitionedBlock.java    |  24 +-
 .../apache/uniffle/common/config/ConfigUtils.java  |   3 +
 .../apache/uniffle/common/util/ByteBufUtils.java   |   7 +
 .../common/ShufflePartitionedBlockTest.java        |  11 -
 .../apache/uniffle/server/ShuffleServerConf.java   |  40 +++
 .../uniffle/server/ShuffleServerGrpcService.java   |   2 +-
 .../server/buffer/ShuffleBufferManager.java        |  20 +-
 .../server/buffer/ShuffleBufferWithLinkedList.java |  30 +-
 .../server/buffer/ShuffleBufferWithSkipList.java   |  29 +-
 .../apache/uniffle/server/buffer/lab/Chunk.java    |  94 +++++++
 .../uniffle/server/buffer/lab/ChunkCreator.java    | 313 +++++++++++++++++++++
 .../org/apache/uniffle/server/buffer/lab/LAB.java  | 108 +++++++
 .../buffer/lab/LABShuffleBufferWithLinkedList.java |  84 ++++++
 .../buffer/lab/LABShuffleBufferWithSkipList.java   |  82 ++++++
 .../buffer/lab/LABShufflePartitionedBlock.java     |  40 +++
 .../uniffle/server/buffer/lab/OffheapChunk.java    |  40 +++
 .../uniffle/server/buffer/lab/SupportsLAB.java     |  20 ++
 .../org/apache/uniffle/server/merge/Partition.java |  10 +-
 .../server/netty/ShuffleServerNettyHandler.java    |   2 +-
 .../uniffle/server/buffer/BufferTestBase.java      |   2 +
 .../buffer/LABShuffleBufferWithLinkedListTest.java |  35 +++
 .../buffer/LABShuffleBufferWithSkipListTest.java   |  35 +++
 .../server/buffer/ShuffleBufferManagerTest.java    |  25 +-
 .../buffer/ShuffleBufferWithLinkedListTest.java    | 187 ++++++------
 .../buffer/ShuffleBufferWithSkipListTest.java      |  19 +-
 25 files changed, 1114 insertions(+), 148 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java 
b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
index e476e37f3..0a04d9be7 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
@@ -38,12 +38,13 @@ public class ShufflePartitionedBlock {
       long blockId,
       long taskAttemptId,
       byte[] data) {
-    this.dataLength = dataLength;
-    this.crc = crc;
-    this.blockId = blockId;
-    this.uncompressLength = uncompressLength;
-    this.taskAttemptId = taskAttemptId;
-    this.data = data == null ? Unpooled.EMPTY_BUFFER : 
Unpooled.wrappedBuffer(data);
+    this(
+        dataLength,
+        uncompressLength,
+        crc,
+        blockId,
+        taskAttemptId,
+        data == null ? Unpooled.EMPTY_BUFFER : Unpooled.wrappedBuffer(data));
   }
 
   public ShufflePartitionedBlock(
@@ -61,6 +62,10 @@ public class ShufflePartitionedBlock {
     this.data = data;
   }
 
+  public boolean isOnLAB() {
+    return false;
+  }
+
   // calculate the data size for this block in memory including metadata which 
are
   // blockId, crc, taskAttemptId, length, uncompressLength
   public long getEncodedLength() {
@@ -76,15 +81,12 @@ public class ShufflePartitionedBlock {
       return false;
     }
     ShufflePartitionedBlock that = (ShufflePartitionedBlock) o;
-    return dataLength == that.dataLength
-        && crc == that.crc
-        && blockId == that.blockId
-        && data.equals(that.data);
+    return blockId == that.blockId;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(dataLength, crc, blockId, data);
+    return Objects.hash(blockId);
   }
 
   public int getDataLength() {
diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java 
b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
index 64f0ee1d5..b8f5ac096 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
@@ -226,4 +226,7 @@ public class ConfigUtils {
 
   public static final Function<Double, Boolean> PERCENTAGE_DOUBLE_VALIDATOR =
       value -> Double.compare(value, 100.0) <= 0 && Double.compare(value, 0.0) 
>= 0;
+
+  public static final Function<Double, Boolean> DOUBLE_VALIDATOR_ZERO_TO_ONE =
+      value -> Double.compare(value, 1.0) <= 0 && Double.compare(value, 0.0) 
>= 0;
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/util/ByteBufUtils.java 
b/common/src/main/java/org/apache/uniffle/common/util/ByteBufUtils.java
index 24f60e20e..c5f2a5e58 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/ByteBufUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/ByteBufUtils.java
@@ -96,4 +96,11 @@ public class ByteBufUtils {
     final ByteBuffer byteBuffer = bytes.asReadOnlyByteBuffer();
     return Unpooled.wrappedBuffer(byteBuffer);
   }
+
+  public static ByteBuf copy(ByteBuf from) {
+    ByteBuf newByteBuf = Unpooled.directBuffer(from.readableBytes());
+    newByteBuf.writeBytes(from);
+    from.resetReaderIndex();
+    return newByteBuf;
+  }
 }
diff --git 
a/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
 
b/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
index eb79ede63..d8c0ddbc5 100644
--- 
a/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
+++ 
b/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
@@ -20,8 +20,6 @@ package org.apache.uniffle.common;
 import java.util.Random;
 
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.CsvSource;
 
 import org.apache.uniffle.common.util.ByteBufUtils;
 
@@ -57,15 +55,6 @@ public class ShufflePartitionedBlockTest {
     assertNotEquals(b1, new Object());
   }
 
-  @ParameterizedTest
-  @CsvSource({"5, 2, 3, 4", "1, 5, 3, 4", "1, 2, 5, 4", "1, 2, 3, 5"})
-  public void testNotEquals(int length, long crc, long blockId, int dataSize) {
-    ShufflePartitionedBlock b1 = new ShufflePartitionedBlock(1, 0, 2, 3, 0, 
new byte[4]);
-    ShufflePartitionedBlock b2 =
-        new ShufflePartitionedBlock(length, 0, crc, blockId, 0, new 
byte[dataSize]);
-    assertNotEquals(b1, b2);
-  }
-
   @Test
   public void testToString() {
     ShufflePartitionedBlock b1 = new ShufflePartitionedBlock(1, 2, 3, 4, 5, 
new byte[6]);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index edf7d8684..7490f97a7 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -505,6 +505,46 @@ public class ShuffleServerConf extends RssBaseConf {
               "Threshold when flushing shuffle data to persistent storage, 
recommend value would be 256K, "
                   + "512K, or even 1M");
 
+  public static final ConfigOption<Boolean> SERVER_SHUFFLE_BUFFER_LAB_ENABLE =
+      ConfigOptions.key("rss.server.buffer.lab.enabled")
+          .booleanType()
+          .defaultValue(false)
+          .withDescription("Whether enable LAB(Local allocation buffer) for 
shuffle buffers.");
+
+  public static final ConfigOption<Integer> 
SERVER_SHUFFLE_BUFFER_LAB_CHUNK_SIZE =
+      ConfigOptions.key("rss.server.buffer.lab.chunkSize")
+          .intType()
+          .defaultValue(1024 * 100) // 100K
+          .withDescription(
+              "Defines the pre-allocated chunk size per partition for LAB. "
+                  + "Each partition reserves one chunk of this size. "
+                  + "Larger values may cause memory waste when processing many 
partitions, "
+                  + "while smaller values generate excessive small chunks, 
increasing GC frequency and overhead. "
+                  + "Configure based on expected data size, available memory, 
and GC tolerance to balance efficiency.");
+
+  public static final ConfigOption<Double> 
SERVER_SHUFFLE_BUFFER_LAB_MAX_ALLOC_RATIO =
+      ConfigOptions.key("rss.server.buffer.lab.maxAllocRatio")
+          .doubleType()
+          .checkValue(
+              ConfigUtils.DOUBLE_VALIDATOR_ZERO_TO_ONE,
+              "The lab max alloc ratio must be between 0.0 and 1.0")
+          .defaultValue(0.2)
+          .withDescription(
+              "If the block size is not small, we don't need to put it in the 
chunk."
+                  + " If the ratio is 0.2, it means the blocks which size is 
less or equal than "
+                  + SERVER_SHUFFLE_BUFFER_LAB_CHUNK_SIZE.key()
+                  + " * 0.2 will be put in the chunk.");
+
+  public static final ConfigOption<Double> 
SERVER_SHUFFLE_BUFFER_LAB_CHUNK_POOL_CAPACITY_RATIO =
+      ConfigOptions.key("rss.server.buffer.lab.chunkPoolCapacityRatio")
+          .doubleType()
+          .defaultValue(1.0)
+          .withDescription(
+              "Controls the maximum memory capacity ratio between LAB's chunk 
pool and the configured buffer capacity. "
+                  + "The ratio represents (total memory of chunk pool) / "
+                  + SERVER_BUFFER_CAPACITY.key()
+                  + ".");
+
   public static final ConfigOption<String> STORAGE_MEDIA_PROVIDER_ENV_KEY =
       ConfigOptions.key("rss.server.storageMediaProvider.from.env.key")
           .stringType()
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 3977f5fdc..979761f9e 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -478,7 +478,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                     + e.getMessage();
             ret = StatusCode.INTERNAL_ERROR;
             responseMessage = errorMsg;
-            LOG.error(errorMsg);
+            LOG.error(errorMsg, e);
             hasFailureOccurred = true;
             break;
           } finally {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index c4ed44d29..0b6b246b5 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -55,6 +55,9 @@ import org.apache.uniffle.server.ShuffleFlushManager;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.ShuffleServerMetrics;
 import org.apache.uniffle.server.ShuffleTaskManager;
+import org.apache.uniffle.server.buffer.lab.ChunkCreator;
+import org.apache.uniffle.server.buffer.lab.LABShuffleBufferWithLinkedList;
+import org.apache.uniffle.server.buffer.lab.LABShuffleBufferWithSkipList;
 
 import static 
org.apache.uniffle.server.ShuffleServerMetrics.BLOCK_COUNT_IN_BUFFER_POOL;
 import static 
org.apache.uniffle.server.ShuffleServerMetrics.BUFFER_COUNT_IN_BUFFER_POOL;
@@ -66,6 +69,7 @@ public class ShuffleBufferManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(ShuffleBufferManager.class);
 
   private final ShuffleBufferType shuffleBufferType;
+  private final Boolean isLABEnabled;
   private final int flushTryLockTimeout;
   private ShuffleTaskManager shuffleTaskManager;
   private final ShuffleFlushManager shuffleFlushManager;
@@ -212,6 +216,16 @@ public class ShuffleBufferManager {
                             
ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE));
           }
         });
+
+    isLABEnabled = 
conf.get(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_LAB_ENABLE);
+    if (isLABEnabled) {
+      int chunkSize = 
conf.get(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_LAB_CHUNK_SIZE);
+      double chunkPoolCapacityRatio =
+          
conf.get(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_LAB_CHUNK_POOL_CAPACITY_RATIO);
+      double maxAllocRatio = 
conf.get(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_LAB_MAX_ALLOC_RATIO);
+      int maxAlloc = (int) (chunkSize * maxAllocRatio);
+      ChunkCreator.initialize(chunkSize, (long) (capacity * 
chunkPoolCapacityRatio), maxAlloc);
+    }
   }
 
   public void setShuffleTaskManager(ShuffleTaskManager taskManager) {
@@ -229,9 +243,11 @@ public class ShuffleBufferManager {
       ShuffleServerMetrics.gaugeTotalPartitionNum.inc();
       ShuffleBuffer shuffleBuffer;
       if (shuffleBufferType == ShuffleBufferType.SKIP_LIST) {
-        shuffleBuffer = new ShuffleBufferWithSkipList();
+        shuffleBuffer =
+            isLABEnabled ? new LABShuffleBufferWithSkipList() : new 
ShuffleBufferWithSkipList();
       } else {
-        shuffleBuffer = new ShuffleBufferWithLinkedList();
+        shuffleBuffer =
+            isLABEnabled ? new LABShuffleBufferWithLinkedList() : new 
ShuffleBufferWithLinkedList();
       }
       bufferRangeMap.put(Range.closed(startPartition, endPartition), 
shuffleBuffer);
     } else {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index e35c42575..2216e2907 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -61,11 +61,11 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
     for (ShufflePartitionedBlock block : data.getBlockList()) {
       // If sendShuffleData retried, we may receive duplicate block. The 
duplicate
       // block would gc without release. Here we must release the duplicated 
block.
-      if (blocks.add(block)) {
+      if (addBlock(block)) {
         currentEncodedLength += block.getEncodedLength();
         currentDataLength += block.getDataLength();
       } else {
-        block.getData().release();
+        releaseBlock(block);
       }
     }
     this.encodedLength += currentEncodedLength;
@@ -74,6 +74,14 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
     return currentEncodedLength;
   }
 
+  protected boolean addBlock(ShufflePartitionedBlock block) {
+    return blocks.add(block);
+  }
+
+  protected void releaseBlock(ShufflePartitionedBlock block) {
+    block.getData().release();
+  }
+
   @Override
   public synchronized ShuffleDataFlushEvent toFlushEvent(
       String appId,
@@ -110,12 +118,7 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
             spBlocks,
             isValid,
             this);
-    event.addCleanupCallback(
-        () -> {
-          this.clearInFlushBuffer(event.getEventId());
-          inFlushedQueueBlocks.forEach(spb -> spb.getData().release());
-          inFlushSize.addAndGet(-event.getEncodedLength());
-        });
+    event.addCleanupCallback(createCallbackForFlush(event));
     inFlushBlockMap.put(eventId, inFlushedQueueBlocks);
     blocks = new LinkedHashSet<>();
     inFlushSize.addAndGet(encodedLength);
@@ -124,6 +127,15 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
     return event;
   }
 
+  protected Runnable createCallbackForFlush(ShuffleDataFlushEvent event) {
+    Set<ShufflePartitionedBlock> inFlushedQueueBlocks = blocks;
+    return () -> {
+      this.clearInFlushBuffer(event.getEventId());
+      inFlushedQueueBlocks.forEach(this::releaseBlock);
+      inFlushSize.addAndGet(-event.getEncodedLength());
+    };
+  }
+
   @Override
   public Set<ShufflePartitionedBlock> getBlocks() {
     return blocks;
@@ -147,7 +159,7 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
     evicted = true;
     for (ShufflePartitionedBlock spb : blocks) {
       try {
-        spb.getData().release();
+        releaseBlock(spb);
         releasedSize += spb.getEncodedLength();
       } catch (Throwable t) {
         lastException = t;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index d6c47933a..33ba313ce 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -69,12 +69,12 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
       // If sendShuffleData retried, we may receive duplicate block. The 
duplicate
       // block would gc without release. Here we must release the duplicated 
block.
       if (!blocksMap.containsKey(block.getBlockId())) {
-        blocksMap.put(block.getBlockId(), block);
+        addBlock(block);
         blockCount++;
         currentEncodedLength += block.getEncodedLength();
         currentDataLength += block.getDataLength();
       } else {
-        block.getData().release();
+        releaseBlock(block);
       }
     }
     this.encodedLength += currentEncodedLength;
@@ -83,6 +83,14 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
     return currentEncodedLength;
   }
 
+  protected void addBlock(ShufflePartitionedBlock block) {
+    blocksMap.put(block.getBlockId(), block);
+  }
+
+  protected void releaseBlock(ShufflePartitionedBlock block) {
+    block.getData().release();
+  }
+
   @Override
   public synchronized ShuffleDataFlushEvent toFlushEvent(
       String appId,
@@ -108,12 +116,7 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
             spBlocks,
             isValid,
             this);
-    event.addCleanupCallback(
-        () -> {
-          this.clearInFlushBuffer(event.getEventId());
-          spBlocks.forEach(spb -> spb.getData().release());
-          inFlushSize.addAndGet(-event.getEncodedLength());
-        });
+    event.addCleanupCallback(createCallbackForFlush(event));
     inFlushBlockMap.put(eventId, blocksMap);
     blocksMap = newConcurrentSkipListMap();
     blockCount = 0;
@@ -123,6 +126,14 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
     return event;
   }
 
+  protected Runnable createCallbackForFlush(ShuffleDataFlushEvent event) {
+    return () -> {
+      this.clearInFlushBuffer(event.getEventId());
+      event.getShuffleBlocks().forEach(this::releaseBlock);
+      inFlushSize.addAndGet(-event.getEncodedLength());
+    };
+  }
+
   @Override
   public Set<ShufflePartitionedBlock> getBlocks() {
     return new LinkedHashSet<>(blocksMap.values());
@@ -146,7 +157,7 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
     evicted = true;
     for (ShufflePartitionedBlock spb : blocksMap.values()) {
       try {
-        spb.getData().release();
+        releaseBlock(spb);
         releasedSize += spb.getEncodedLength();
       } catch (Throwable t) {
         lastException = t;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/lab/Chunk.java 
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/Chunk.java
new file mode 100644
index 000000000..d0aacf40c
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/lab/Chunk.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer.lab;
+
+import io.netty.buffer.ByteBuf;
+
+/** A chunk of memory out of which allocations are sliced. */
+public abstract class Chunk {
+  /** Actual underlying data */
+  protected ByteBuf data;
+  /** Size of chunk in bytes */
+  protected final int size;
+  // The unique id associated with the chunk.
+  private final int id;
+  private final boolean fromPool;
+
+  /**
+   * Create an uninitialized chunk. Note that memory is not allocated yet, so 
this is cheap.
+   *
+   * @param size in bytes
+   * @param id the chunk id
+   */
+  public Chunk(int size, int id) {
+    this(size, id, false);
+  }
+
+  /**
+   * Create an uninitialized chunk. Note that memory is not allocated yet, so 
this is cheap.
+   *
+   * @param size in bytes
+   * @param id the chunk id
+   * @param fromPool if the chunk is formed by pool
+   */
+  public Chunk(int size, int id, boolean fromPool) {
+    this.size = size;
+    this.id = id;
+    this.fromPool = fromPool;
+  }
+
+  int getId() {
+    return this.id;
+  }
+
+  boolean isFromPool() {
+    return this.fromPool;
+  }
+
+  /**
+   * Actually claim the memory for this chunk. This should only be called from 
the thread that
+   * constructed the chunk. It is thread-safe against other threads calling 
alloc(), who will block
+   * until the allocation is complete.
+   */
+  public void init() {
+    allocateDataBuffer();
+  }
+
+  abstract void allocateDataBuffer();
+
+  /**
+   * Try to get the allocated offset from the chunk.
+   *
+   * @return the offset of the successful allocation, or -1 to indicate 
not-enough-space
+   */
+  public int getAllocOffset(int size) {
+    if (data.writerIndex() + size > data.capacity()) {
+      return -1;
+    }
+    return data.writerIndex();
+  }
+
+  void reset() {
+    data.clear();
+  }
+
+  /** @return This chunk's backing data. */
+  ByteBuf getData() {
+    return this.data;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/lab/ChunkCreator.java 
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/ChunkCreator.java
new file mode 100644
index 000000000..cfab30b87
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/ChunkCreator.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer.lab;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Does the management of LAB chunk creations. A monotonically incrementing id 
is associated with
+ * every chunk
+ */
+public class ChunkCreator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ChunkCreator.class);
+  // monotonically increasing chunkid. Starts at 1.
+  private final AtomicInteger chunkID = new AtomicInteger(1);
+
+  // mapping from chunk IDs to chunks
+  private final Map<Integer, Chunk> chunkIdMap = new 
ConcurrentHashMap<Integer, Chunk>();
+  static ChunkCreator instance;
+  private final int maxAlloc;
+  private final ChunkPool chunksPool;
+  private final int chunkSize;
+
+  ChunkCreator(int chunkSize, long bufferCapacity, int maxAlloc) {
+    this.chunkSize = chunkSize;
+    this.maxAlloc = maxAlloc;
+    this.chunksPool = initializePool(bufferCapacity, chunkSize);
+  }
+
+  /**
+   * Initializes the instance of ChunkCreator
+   *
+   * @param chunkSize the chunkSize
+   * @param bufferCapacity the buffer capacity
+   * @return singleton ChunkCreator
+   */
+  public static synchronized void initialize(int chunkSize, long 
bufferCapacity, int maxAlloc) {
+    if (instance != null) {
+      LOG.warn("ChunkCreator instance is already initialized.");
+    }
+    instance = new ChunkCreator(chunkSize, bufferCapacity, maxAlloc);
+  }
+
+  public static ChunkCreator getInstance() {
+    return instance;
+  }
+
+  /**
+   * Creates and inits a chunk with specific index type and type.
+   *
+   * @return the chunk that was initialized
+   */
+  Chunk getChunk() {
+    return getChunk(chunksPool.getChunkSize());
+  }
+
+  /**
+   * Creates and inits a chunk.
+   *
+   * @return the chunk that was initialized
+   * @param size the size of the chunk to be allocated, in bytes
+   */
+  Chunk getChunk(int size) {
+    Chunk chunk = null;
+    ChunkPool pool = null;
+
+    // if the size is suitable for one of the pools
+    if (chunksPool != null && size == chunksPool.getChunkSize()) {
+      pool = chunksPool;
+    }
+
+    if (pool != null) {
+      chunk = pool.getChunk();
+      if (chunk == null) {
+        LOG.warn(
+            "The chunk pool is full. Reached maxCount= "
+                + pool.getMaxCount()
+                + ". Creating chunk outside of the pool.");
+      }
+    }
+
+    if (chunk == null) {
+      chunk = createChunk(false, size);
+    }
+    chunk.init();
+    return chunk;
+  }
+
+  /**
+   * Creates the chunk
+   *
+   * @param pool indicates if the chunks have to be created which will be used 
by the Pool
+   * @param size the size of the chunk to be allocated, in bytes
+   * @return the chunk
+   */
+  private Chunk createChunk(boolean pool, int size) {
+    Chunk chunk;
+    int id = chunkID.getAndIncrement();
+    Preconditions.checkArgument(id > 0, "chunkId should be positive.");
+    chunk = new OffheapChunk(size, id, pool);
+    this.chunkIdMap.put(chunk.getId(), chunk);
+    return chunk;
+  }
+
+  // Chunks from pool are created covered with strong references anyway
+  private Chunk createChunkForPool(int chunkSize) {
+    if (chunkSize != chunksPool.getChunkSize()) {
+      return null;
+    }
+    return createChunk(true, chunkSize);
+  }
+
+  private void removeChunks(List<Integer> chunkIDs) {
+    chunkIDs.forEach(this::removeChunk);
+  }
+
+  Chunk removeChunk(int chunkId) {
+    Chunk c = this.chunkIdMap.remove(chunkId);
+    c.getData().release();
+    return c;
+  }
+
+  /**
+   * A pool of {@link Chunk} instances.
+   *
+   * <p>ChunkPool caches a number of retired chunks for reusing, it could 
decrease allocating bytes
+   * when writing, thereby optimizing the garbage collection on JVM.
+   */
+  private class ChunkPool {
+    private final int chunkSize;
+    private final int maxCount;
+
+    // A queue of reclaimed chunks
+    private final BlockingQueue<Chunk> reclaimedChunks;
+
+    /** Statistics thread schedule pool */
+    private final ScheduledExecutorService scheduleThreadPool;
+    /** Statistics thread */
+    private static final int statThreadPeriod = 60 * 5;
+
+    private final AtomicLong chunkCount = new AtomicLong();
+    private final LongAdder reusedChunkCount = new LongAdder();
+
+    ChunkPool(int chunkSize, int maxCount) {
+      this.chunkSize = chunkSize;
+      this.maxCount = maxCount;
+      this.reclaimedChunks = new LinkedBlockingQueue<>();
+      final String n = Thread.currentThread().getName();
+      scheduleThreadPool =
+          Executors.newScheduledThreadPool(
+              1,
+              new ThreadFactoryBuilder()
+                  .setNameFormat(n + "-ChunkPool Statistics")
+                  .setDaemon(true)
+                  .build());
+      this.scheduleThreadPool.scheduleAtFixedRate(
+          new StatisticsThread(), statThreadPeriod, statThreadPeriod, 
TimeUnit.SECONDS);
+    }
+
+    /**
+     * Poll a chunk from the pool, reset it if not null, else create a new 
chunk to return if we
+     * have not yet created max allowed chunks count. When we have already 
created max allowed
+     * chunks and no free chunks as of now, return null. It is the 
responsibility of the caller to
+     * make a chunk then. Note: Chunks returned by this pool must be put back 
to the pool after its
+     * use.
+     *
+     * @return a chunk
+     * @see #putbackChunks(Chunk)
+     */
+    Chunk getChunk() {
+      Chunk chunk = reclaimedChunks.poll();
+      if (chunk != null) {
+        chunk.reset();
+        reusedChunkCount.increment();
+      } else {
+        // Make a chunk if we have not yet created the maxCount chunks
+        while (true) {
+          long created = this.chunkCount.get();
+          if (created < this.maxCount) {
+            if (this.chunkCount.compareAndSet(created, created + 1)) {
+              chunk = createChunkForPool(chunkSize);
+              break;
+            }
+          } else {
+            break;
+          }
+        }
+      }
+      return chunk;
+    }
+
+    int getChunkSize() {
+      return chunkSize;
+    }
+
+    /**
+     * Add the chunks to the pool, when the pool achieves the max size, it 
will skip the remaining
+     * chunks
+     */
+    private void putbackChunks(Chunk c) {
+      int toAdd = this.maxCount - reclaimedChunks.size();
+      if (c.isFromPool() && c.size == chunkSize && toAdd > 0) {
+        reclaimedChunks.add(c);
+      } else {
+        // remove the chunk (that is not going to pool)
+        ChunkCreator.this.removeChunk(c.getId());
+      }
+    }
+
+    private class StatisticsThread extends Thread {
+      StatisticsThread() {
+        super("MemStoreChunkPool.StatisticsThread");
+        setDaemon(true);
+      }
+
+      @Override
+      public void run() {
+        logStats();
+      }
+
+      private void logStats() {
+        long created = chunkCount.get();
+        long reused = reusedChunkCount.sum();
+        long total = created + reused;
+        LOG.info(
+            "ChunkPool stats (chunk size={}): current pool size={}, created 
chunk count={}, "
+                + "reused chunk count={}, reuseRatio={}",
+            chunkSize,
+            reclaimedChunks.size(),
+            created,
+            reused,
+            (total == 0 ? "0" : StringUtils.formatPercent((float) reused / 
(float) total, 2)));
+      }
+    }
+
+    private int getMaxCount() {
+      return this.maxCount;
+    }
+  }
+
+  private ChunkPool initializePool(long bufferCapacity, int chunkSize) {
+    int maxCount = Math.max((int) (bufferCapacity / chunkSize), 1);
+    LOG.info(
+        "Allocating ChunkPool with chunk size {}, max count {}",
+        StringUtils.byteDesc(chunkSize),
+        maxCount);
+    return new ChunkPool(chunkSize, maxCount);
+  }
+
+  int getChunkSize() {
+    return chunkSize;
+  }
+
+  int getMaxAlloc() {
+    return maxAlloc;
+  }
+
+  synchronized void putBackChunks(List<Integer> chunks) {
+    // if there is no pool just try to clear the chunkIdMap in case there is 
something
+    if (chunksPool == null) {
+      this.removeChunks(chunks);
+      return;
+    }
+
+    // if there is a pool, go over all chunk IDs that came back, the chunks 
may be from pool or not
+    for (int chunkID : chunks) {
+      // translate chunk ID to chunk, if chunk initially wasn't in pool
+      // this translation will (most likely) return null
+      Chunk chunk = chunkIdMap.get(chunkID);
+      if (chunk != null) {
+        if (chunk.isFromPool()) {
+          chunksPool.putbackChunks(chunk);
+        } else {
+          // chunks which are not from one of the pools
+          // should be released without going to the pools.
+          this.removeChunk(chunkID);
+        }
+      } else {
+        LOG.warn("Chunk {} can not be found in chunkIdMap, ignore it", 
chunkID);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/lab/LAB.java 
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/LAB.java
new file mode 100644
index 000000000..5d3b7000a
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/lab/LAB.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer.lab;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+
+/**
+ * Local allocation buffer.
+ *
+ * <p>The LAB is basically a bump-the-pointer allocator that allocates big 
(100K) chunks from and
+ * then doles it out to threads that request slices into the array. These 
chunks can get pooled as
+ * well. See {@link ChunkCreator}.
+ *
+ * <p>The purpose of this is to combat heap fragmentation in the shuffle 
server. By ensuring that
+ * all blocks in a given partition refer only to large chunks of contiguous 
memory, we ensure that
+ * large blocks get freed up when the partition is flushed.
+ *
+ * <p>Without the LAB, the byte array allocated during insertion end up 
interleaved throughout the
+ * heap, and the old generation gets progressively more fragmented until a 
stop-the-world compacting
+ * collection occurs.
+ *
+ * <p>This manages the large sized chunks. When blocks are to be added to 
partition, LAB's {@link
+ * #tryCopyBlockToChunk(ShufflePartitionedBlock)} gets called. This allocates 
enough size in the
+ * chunk to hold this block's data and copies into this area and then recreate 
a
+ * LABShufflePartitionedBlock over this copied data.
+ *
+ * <p>
+ *
+ * @see ChunkCreator
+ */
+public class LAB {
+  private Chunk currChunk;
+
+  List<Integer> chunks = new LinkedList<>();
+  private final int maxAlloc;
+  private final ChunkCreator chunkCreator;
+
+  public LAB() {
+    this.chunkCreator = ChunkCreator.getInstance();
+    maxAlloc = chunkCreator.getMaxAlloc();
+  }
+
+  public ShufflePartitionedBlock tryCopyBlockToChunk(ShufflePartitionedBlock 
block) {
+    int size = block.getDataLength();
+    if (size > maxAlloc) {
+      return block;
+    }
+    Chunk c;
+    int allocOffset;
+    while (true) {
+      // Try to get the chunk
+      c = getOrMakeChunk();
+      // Try to allocate from this chunk
+      allocOffset = c.getAllocOffset(size);
+      if (allocOffset != -1) {
+        break;
+      }
+      // not enough space!
+      currChunk = null;
+    }
+    c.getData().writeBytes(block.getData());
+    return new LABShufflePartitionedBlock(
+        block.getDataLength(),
+        block.getUncompressLength(),
+        block.getCrc(),
+        block.getBlockId(),
+        block.getTaskAttemptId(),
+        c.getData().slice(allocOffset, size));
+  }
+
+  public void close() {
+    recycleChunks();
+  }
+
+  private void recycleChunks() {
+    chunkCreator.putBackChunks(chunks);
+  }
+
+  /** Get the current chunk, or, if there is no current chunk, allocate a new 
one. */
+  private Chunk getOrMakeChunk() {
+    Chunk c = currChunk;
+    if (c != null) {
+      return c;
+    }
+    c = this.chunkCreator.getChunk();
+    currChunk = c;
+    chunks.add(c.getId());
+    return c;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/lab/LABShuffleBufferWithLinkedList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/LABShuffleBufferWithLinkedList.java
new file mode 100644
index 000000000..eb6fe0cd5
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/LABShuffleBufferWithLinkedList.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer.lab;
+
+import java.util.function.Supplier;
+
+import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.buffer.ShuffleBufferWithLinkedList;
+
+public class LABShuffleBufferWithLinkedList extends ShuffleBufferWithLinkedList
+    implements SupportsLAB {
+  private LAB lab;
+
+  public LABShuffleBufferWithLinkedList() {
+    super();
+    lab = new LAB();
+  }
+
+  @Override
+  protected boolean addBlock(ShufflePartitionedBlock block) {
+    ShufflePartitionedBlock newBlock = lab.tryCopyBlockToChunk(block);
+    boolean addSuccess = super.addBlock(newBlock);
+    if (addSuccess && newBlock.isOnLAB()) {
+      super.releaseBlock(block);
+    }
+    return addSuccess;
+  }
+
+  @Override
+  protected void releaseBlock(ShufflePartitionedBlock block) {
+    if (!block.isOnLAB()) {
+      super.releaseBlock(block);
+    }
+  }
+
+  @Override
+  public synchronized ShuffleDataFlushEvent toFlushEvent(
+      String appId,
+      int shuffleId,
+      int startPartition,
+      int endPartition,
+      Supplier<Boolean> isValid,
+      ShuffleDataDistributionType dataDistributionType) {
+    ShuffleDataFlushEvent event =
+        super.toFlushEvent(
+            appId, shuffleId, startPartition, endPartition, isValid, 
dataDistributionType);
+    lab = new LAB();
+    return event;
+  }
+
+  @Override
+  protected Runnable createCallbackForFlush(ShuffleDataFlushEvent event) {
+    Runnable runnable = super.createCallbackForFlush(event);
+    final LAB labRef = lab;
+    return () -> {
+      runnable.run();
+      labRef.close();
+    };
+  }
+
+  @Override
+  public synchronized long release() {
+    long size = super.release();
+    lab.close();
+    return size;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/lab/LABShuffleBufferWithSkipList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/LABShuffleBufferWithSkipList.java
new file mode 100644
index 000000000..8609be3cf
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/LABShuffleBufferWithSkipList.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer.lab;
+
+import java.util.function.Supplier;
+
+import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.buffer.ShuffleBufferWithSkipList;
+
+public class LABShuffleBufferWithSkipList extends ShuffleBufferWithSkipList 
implements SupportsLAB {
+  private LAB lab;
+
+  public LABShuffleBufferWithSkipList() {
+    super();
+    this.lab = new LAB();
+  }
+
+  @Override
+  protected void addBlock(ShufflePartitionedBlock block) {
+    ShufflePartitionedBlock newBlock = lab.tryCopyBlockToChunk(block);
+    if (newBlock.isOnLAB()) {
+      super.releaseBlock(block);
+    }
+    super.addBlock(newBlock);
+  }
+
+  @Override
+  protected void releaseBlock(ShufflePartitionedBlock block) {
+    if (!block.isOnLAB()) {
+      super.releaseBlock(block);
+    }
+  }
+
+  @Override
+  public synchronized ShuffleDataFlushEvent toFlushEvent(
+      String appId,
+      int shuffleId,
+      int startPartition,
+      int endPartition,
+      Supplier<Boolean> isValid,
+      ShuffleDataDistributionType dataDistributionType) {
+    ShuffleDataFlushEvent event =
+        super.toFlushEvent(
+            appId, shuffleId, startPartition, endPartition, isValid, 
dataDistributionType);
+    lab = new LAB();
+    return event;
+  }
+
+  @Override
+  protected Runnable createCallbackForFlush(ShuffleDataFlushEvent event) {
+    Runnable runnable = super.createCallbackForFlush(event);
+    final LAB labRef = lab;
+    return () -> {
+      runnable.run();
+      labRef.close();
+    };
+  }
+
+  @Override
+  public synchronized long release() {
+    long size = super.release();
+    lab.close();
+    return size;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/lab/LABShufflePartitionedBlock.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/LABShufflePartitionedBlock.java
new file mode 100644
index 000000000..ed3618120
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/LABShufflePartitionedBlock.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer.lab;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+
+public class LABShufflePartitionedBlock extends ShufflePartitionedBlock {
+
+  public LABShufflePartitionedBlock(
+      int dataLength,
+      int uncompressLength,
+      long crc,
+      long blockId,
+      long taskAttemptId,
+      ByteBuf slice) {
+    super(dataLength, uncompressLength, crc, blockId, taskAttemptId, slice);
+  }
+
+  @Override
+  public boolean isOnLAB() {
+    return true;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/lab/OffheapChunk.java 
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/OffheapChunk.java
new file mode 100644
index 000000000..24c13e27b
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/OffheapChunk.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer.lab;
+
+import io.netty.buffer.Unpooled;
+
+/** An off heap chunk implementation. */
+public class OffheapChunk extends Chunk {
+
+  OffheapChunk(int size, int id) {
+    super(size, id);
+  }
+
+  OffheapChunk(int size, int id, boolean fromPool) {
+    super(size, id, fromPool);
+  }
+
+  @Override
+  void allocateDataBuffer() {
+    if (data == null) {
+      data = Unpooled.directBuffer(this.size);
+      data.writeInt(this.getId());
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/lab/SupportsLAB.java 
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/SupportsLAB.java
new file mode 100644
index 000000000..cb5dc4cee
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/lab/SupportsLAB.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer.lab;
+
+public interface SupportsLAB {}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/merge/Partition.java 
b/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
index 48a9e0fb6..e4000bc39 100644
--- a/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
+++ b/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
@@ -51,6 +51,7 @@ import 
org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.serializer.SerInputStream;
 import org.apache.uniffle.common.serializer.SerOutputStream;
+import org.apache.uniffle.common.util.ByteBufUtils;
 import org.apache.uniffle.server.ShuffleDataReadEvent;
 import org.apache.uniffle.server.buffer.ShuffleBuffer;
 import org.apache.uniffle.server.buffer.ShuffleBufferWithSkipList;
@@ -158,8 +159,13 @@ public class Partition<K, V> {
       try {
         // If ByteBuf is released by flush cleanup will throw 
IllegalReferenceCountException.
         // Then we need get block buffer from file
-        ByteBuf byteBuf = block.getData().retain().duplicate();
-        cachedBlocks.put(blockId, byteBuf.slice(0, block.getDataLength()));
+        if (block.isOnLAB()) {
+          ByteBuf byteBuf = ByteBufUtils.copy(block.getData());
+          cachedBlocks.put(blockId, byteBuf);
+        } else {
+          ByteBuf byteBuf = block.getData().retain().duplicate();
+          cachedBlocks.put(blockId, byteBuf.slice(0, block.getDataLength()));
+        }
       } catch (IllegalReferenceCountException irce) {
         allCached = false;
         LOG.warn("Can't read bytes from block in memory, maybe already been 
flushed!");
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index ab6ce2834..c7f438d34 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -337,7 +337,7 @@ public class ShuffleServerNettyHandler implements 
BaseMessageHandler {
                     + e.getMessage();
             ret = StatusCode.INTERNAL_ERROR;
             responseMessage = errorMsg;
-            LOG.error(errorMsg);
+            LOG.error(errorMsg, e);
             hasFailureOccurred = true;
           } finally {
             // Once the cache failure occurs, we should explicitly release 
data held by byteBuf
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java 
b/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java
index 7aef68c15..dd2070d02 100644
--- a/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java
+++ b/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java
@@ -68,4 +68,6 @@ public abstract class BufferTestBase {
   protected AtomicInteger getAtomSequenceNo() {
     return atomSequenceNo;
   }
+
+  protected abstract ShuffleBuffer createShuffleBuffer();
 }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/LABShuffleBufferWithLinkedListTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/LABShuffleBufferWithLinkedListTest.java
new file mode 100644
index 000000000..dbb23e8e5
--- /dev/null
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/LABShuffleBufferWithLinkedListTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer;
+
+import org.junit.jupiter.api.BeforeAll;
+
+import org.apache.uniffle.server.buffer.lab.ChunkCreator;
+import org.apache.uniffle.server.buffer.lab.LABShuffleBufferWithLinkedList;
+
+public class LABShuffleBufferWithLinkedListTest extends 
ShuffleBufferWithLinkedListTest {
+  @BeforeAll
+  public static void setUp() {
+    ChunkCreator.initialize(1024 * 3, 1024 * 1024, 512);
+  }
+
+  @Override
+  protected ShuffleBuffer createShuffleBuffer() {
+    return new LABShuffleBufferWithLinkedList();
+  }
+}
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/LABShuffleBufferWithSkipListTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/LABShuffleBufferWithSkipListTest.java
new file mode 100644
index 000000000..36f96d6da
--- /dev/null
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/LABShuffleBufferWithSkipListTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer;
+
+import org.junit.jupiter.api.BeforeAll;
+
+import org.apache.uniffle.server.buffer.lab.ChunkCreator;
+import org.apache.uniffle.server.buffer.lab.LABShuffleBufferWithSkipList;
+
+public class LABShuffleBufferWithSkipListTest extends 
ShuffleBufferWithSkipListTest {
+  @BeforeAll
+  public static void setUp() {
+    ChunkCreator.initialize(1024 * 3, 1024 * 1024, 512);
+  }
+
+  @Override
+  protected ShuffleBuffer createShuffleBuffer() {
+    return new LABShuffleBufferWithSkipList();
+  }
+}
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 4dcb4bb60..7b81926e0 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.collect.RangeMap;
 import com.google.common.util.concurrent.Uninterruptibles;
+import io.netty.buffer.ByteBuf;
 import io.prometheus.client.Collector;
 import org.apache.commons.lang3.tuple.Pair;
 import org.awaitility.Awaitility;
@@ -175,6 +176,8 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     ShufflePartitionedData spd2 = createData(0, 68);
     ShufflePartitionedData spd3 = createData(0, 68);
     ShufflePartitionedData spd4 = createData(0, 68);
+    final ByteBuf expected2 = 
ByteBufUtils.copy(spd2.getBlockList()[0].getData());
+    final ByteBuf expected3 = 
ByteBufUtils.copy(spd3.getBlockList()[0].getData());
     shuffleBufferManager.cacheShuffleData(appId, 1, false, spd1);
     shuffleBufferManager.cacheShuffleData(appId, 2, false, spd2);
     shuffleBufferManager.cacheShuffleData(appId, 2, false, spd3);
@@ -188,10 +191,10 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     // validate get shuffle data
     ShuffleDataResult sdr =
         shuffleBufferManager.getShuffleData(appId, 2, 0, 
Constants.INVALID_BLOCK_ID, 60);
-    
assertArrayEquals(ByteBufUtils.readBytes(spd2.getBlockList()[0].getData()), 
sdr.getData());
+    assertArrayEquals(ByteBufUtils.readBytes(expected2), sdr.getData());
     long lastBlockId = spd2.getBlockList()[0].getBlockId();
     sdr = shuffleBufferManager.getShuffleData(appId, 2, 0, lastBlockId, 100);
-    
assertArrayEquals(ByteBufUtils.readBytes(spd3.getBlockList()[0].getData()), 
sdr.getData());
+    assertArrayEquals(ByteBufUtils.readBytes(expected3), sdr.getData());
     // flush happen
     ShufflePartitionedData spd5 = createData(0, 10);
     shuffleBufferManager.cacheShuffleData(appId, 4, false, spd5);
@@ -206,10 +209,10 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     assertEquals(1, bufferPool.get(appId).get(4).get(0).getBlocks().size());
     // data in flush buffer now, it also can be got before flush finish
     sdr = shuffleBufferManager.getShuffleData(appId, 2, 0, 
Constants.INVALID_BLOCK_ID, 60);
-    
assertArrayEquals(ByteBufUtils.readBytes(spd2.getBlockList()[0].getData()), 
sdr.getData());
+    assertArrayEquals(ByteBufUtils.readBytes(expected2), sdr.getData());
     lastBlockId = spd2.getBlockList()[0].getBlockId();
     sdr = shuffleBufferManager.getShuffleData(appId, 2, 0, lastBlockId, 100);
-    
assertArrayEquals(ByteBufUtils.readBytes(spd3.getBlockList()[0].getData()), 
sdr.getData());
+    assertArrayEquals(ByteBufUtils.readBytes(expected3), sdr.getData());
     // cache data again, it should cause flush
     spd1 = createData(0, 10);
     shuffleBufferManager.cacheShuffleData(appId, 1, false, spd1);
@@ -268,9 +271,12 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     assertEquals(0, shuffleSizeMap.get(appId1).get(1).get());
     shuffleBufferManager.releaseMemory(463, true, false);
 
-    shuffleBufferManager.cacheShuffleData(appId1, 1, false, spd1);
-    shuffleBufferManager.cacheShuffleData(appId1, 2, false, spd2);
-    shuffleBufferManager.cacheShuffleData(appId2, 1, false, spd4);
+    ShufflePartitionedData spd7 = createData(0, 67);
+    ShufflePartitionedData spd8 = createData(0, 68);
+    ShufflePartitionedData spd9 = createData(0, 68);
+    shuffleBufferManager.cacheShuffleData(appId1, 1, false, spd7);
+    shuffleBufferManager.cacheShuffleData(appId1, 2, false, spd8);
+    shuffleBufferManager.cacheShuffleData(appId2, 1, false, spd9);
     shuffleBufferManager.removeBuffer(appId1);
     assertNull(shuffleSizeMap.get(appId1));
     assertEquals(100, shuffleSizeMap.get(appId2).get(1).get());
@@ -863,4 +869,9 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     assertEquals(1, pair.getRight().size());
     assertEquals(0, pair.getRight().get(0));
   }
+
+  @Override
+  protected ShuffleBuffer createShuffleBuffer() {
+    return null;
+  }
 }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
index 47fe3dd39..460f61151 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
@@ -46,7 +46,7 @@ public class ShuffleBufferWithLinkedListTest extends 
BufferTestBase {
 
   @Test
   public void appendTest() {
-    ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
+    ShuffleBuffer shuffleBuffer = createShuffleBuffer();
     shuffleBuffer.append(createData(10));
     // ShufflePartitionedBlock has constant 32 bytes overhead
     assertEquals(42, shuffleBuffer.getEncodedLength());
@@ -60,7 +60,7 @@ public class ShuffleBufferWithLinkedListTest extends 
BufferTestBase {
 
   @Test
   public void appendMultiBlocksTest() {
-    ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
+    ShuffleBuffer shuffleBuffer = createShuffleBuffer();
     ShufflePartitionedData data1 = createData(10);
     ShufflePartitionedData data2 = createData(10);
     ShufflePartitionedBlock[] dataCombine = new ShufflePartitionedBlock[2];
@@ -72,7 +72,7 @@ public class ShuffleBufferWithLinkedListTest extends 
BufferTestBase {
 
   @Test
   public void toFlushEventTest() {
-    ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
+    ShuffleBuffer shuffleBuffer = createShuffleBuffer();
     ShuffleDataFlushEvent event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, 
null);
     assertNull(event);
     shuffleBuffer.append(createData(10));
@@ -91,7 +91,7 @@ public class ShuffleBufferWithLinkedListTest extends 
BufferTestBase {
   @Test
   public void getShuffleDataWithExpectedTaskIdsFilterTest() {
     /** case1: all blocks in cached(or in flushed map) and size < 
readBufferSize */
-    ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
+    ShuffleBuffer shuffleBuffer = createShuffleBuffer();
     ShufflePartitionedData spd1 = createData(1, 1, 15);
     ShufflePartitionedData spd2 = createData(1, 0, 15);
     ShufflePartitionedData spd3 = createData(1, 2, 55);
@@ -203,17 +203,18 @@ public class ShuffleBufferWithLinkedListTest extends 
BufferTestBase {
 
   @Test
   public void getShuffleDataWithLocalOrderTest() {
-    ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
+    ShuffleBuffer shuffleBuffer = createShuffleBuffer();
     ShufflePartitionedData spd1 = createData(1, 1, 15);
     ShufflePartitionedData spd2 = createData(1, 0, 15);
     ShufflePartitionedData spd3 = createData(1, 2, 15);
+    final byte[] expectedData = getExpectedData(spd1, spd2);
+    final byte[] expectedData2 = getExpectedData(spd3);
     shuffleBuffer.append(spd1);
     shuffleBuffer.append(spd2);
     shuffleBuffer.append(spd3);
 
     // First read from the cached data
     ShuffleDataResult sdr = 
shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 16);
-    byte[] expectedData = getExpectedData(spd1, spd2);
     compareBufferSegment(
         new LinkedList<>(shuffleBuffer.getBlocks()), sdr.getBufferSegments(), 
0, 2);
     assertArrayEquals(expectedData, sdr.getData());
@@ -223,13 +224,12 @@ public class ShuffleBufferWithLinkedListTest extends 
BufferTestBase {
         shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null, 
ShuffleDataDistributionType.LOCAL_ORDER);
     long lastBlockId = sdr.getBufferSegments().get(1).getBlockId();
     sdr = shuffleBuffer.getShuffleData(lastBlockId, 16);
-    expectedData = getExpectedData(spd3);
     compareBufferSegment(
         new 
LinkedList<>(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId())),
         sdr.getBufferSegments(),
         2,
         1);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData2, sdr.getData());
     Iterator<ShufflePartitionedBlock> it = 
event1.getShuffleBlocks().iterator();
     assertEquals(0, it.next().getTaskAttemptId());
     assertEquals(1, it.next().getTaskAttemptId());
@@ -254,68 +254,69 @@ public class ShuffleBufferWithLinkedListTest extends 
BufferTestBase {
 
   @Test
   public void getShuffleDataTest() {
-    ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
+    ShuffleBuffer shuffleBuffer = createShuffleBuffer();
     // case1: cached data only, blockId = -1, readBufferSize > buffer size
     ShufflePartitionedData spd1 = createData(10);
     ShufflePartitionedData spd2 = createData(20);
+    final byte[] expectedData = getExpectedData(spd1, spd2);
     shuffleBuffer.append(spd1);
     shuffleBuffer.append(spd2);
-    byte[] expectedData = getExpectedData(spd1, spd2);
     ShuffleDataResult sdr = 
shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 40);
     compareBufferSegment(
         new LinkedList<>(shuffleBuffer.getBlocks()), sdr.getBufferSegments(), 
0, 2);
     assertArrayEquals(expectedData, sdr.getData());
 
     // case2: cached data only, blockId = -1, readBufferSize = buffer size
-    shuffleBuffer = new ShuffleBufferWithLinkedList();
+    shuffleBuffer = createShuffleBuffer();
     spd1 = createData(20);
     spd2 = createData(20);
+    final byte[] expectedData2 = getExpectedData(spd1, spd2);
     shuffleBuffer.append(spd1);
     shuffleBuffer.append(spd2);
-    expectedData = getExpectedData(spd1, spd2);
     sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 40);
     compareBufferSegment(
         new LinkedList<>(shuffleBuffer.getBlocks()), sdr.getBufferSegments(), 
0, 2);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData2, sdr.getData());
 
     // case3-1: cached data only, blockId = -1, readBufferSize < buffer size
-    shuffleBuffer = new ShuffleBufferWithLinkedList();
+    shuffleBuffer = createShuffleBuffer();
     spd1 = createData(20);
     spd2 = createData(21);
+    final byte[] expectedData3 = getExpectedData(spd1, spd2);
     shuffleBuffer.append(spd1);
     shuffleBuffer.append(spd2);
-    expectedData = getExpectedData(spd1, spd2);
     sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 40);
     compareBufferSegment(
         new LinkedList<>(shuffleBuffer.getBlocks()), sdr.getBufferSegments(), 
0, 2);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData3, sdr.getData());
 
     // case3-2: cached data only, blockId = -1, readBufferSize < buffer size
-    shuffleBuffer = new ShuffleBufferWithLinkedList();
+    shuffleBuffer = createShuffleBuffer();
     spd1 = createData(15);
     spd2 = createData(15);
     ShufflePartitionedData spd3 = createData(15);
+    final byte[] expectedData4 = getExpectedData(spd1, spd2);
+    final byte[] expectedData5 = getExpectedData(spd3);
     shuffleBuffer.append(spd1);
     shuffleBuffer.append(spd2);
     shuffleBuffer.append(spd3);
-    expectedData = getExpectedData(spd1, spd2);
     sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 25);
     compareBufferSegment(
         new LinkedList<>(shuffleBuffer.getBlocks()), sdr.getBufferSegments(), 
0, 2);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData4, sdr.getData());
 
     // case4: cached data only, blockId != -1 && exist, readBufferSize < 
buffer size
     long lastBlockId = spd2.getBlockList()[0].getBlockId();
     sdr = shuffleBuffer.getShuffleData(lastBlockId, 25);
-    expectedData = getExpectedData(spd3);
     compareBufferSegment(
         new LinkedList<>(shuffleBuffer.getBlocks()), sdr.getBufferSegments(), 
2, 1);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData5, sdr.getData());
 
     // case5: flush data only, blockId = -1, readBufferSize < buffer size
-    shuffleBuffer = new ShuffleBufferWithLinkedList();
+    shuffleBuffer = createShuffleBuffer();
     spd1 = createData(15);
     spd2 = createData(15);
+    final byte[] expectedData6 = getExpectedData(spd1, spd2);
     shuffleBuffer.append(spd1);
     shuffleBuffer.append(spd2);
     ShuffleDataFlushEvent event1 = shuffleBuffer.toFlushEvent("appId", 0, 0, 
1, null);
@@ -326,52 +327,78 @@ public class ShuffleBufferWithLinkedListTest extends 
BufferTestBase {
         sdr.getBufferSegments(),
         0,
         2);
-    expectedData = getExpectedData(spd1, spd2);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData6, sdr.getData());
 
     // case5: flush data only, blockId = lastBlockId
     sdr = shuffleBuffer.getShuffleData(spd2.getBlockList()[0].getBlockId(), 
20);
     assertEquals(0, sdr.getBufferSegments().size());
 
     // case6: no data in buffer & flush buffer
-    shuffleBuffer = new ShuffleBufferWithLinkedList();
+    shuffleBuffer = createShuffleBuffer();
     sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 10);
     assertEquals(0, sdr.getBufferSegments().size());
     assertEquals(0, sdr.getDataLength());
 
     // case7: get data with multiple flush buffer and cached buffer
-    shuffleBuffer = new ShuffleBufferWithLinkedList();
+    shuffleBuffer = createShuffleBuffer();
     spd1 = createData(15);
     spd2 = createData(15);
     spd3 = createData(15);
+    final ShufflePartitionedData spd4 = createData(15);
+    final ShufflePartitionedData spd5 = createData(15);
+    final ShufflePartitionedData spd6 = createData(15);
+    final ShufflePartitionedData spd7 = createData(15);
+    final ShufflePartitionedData spd8 = createData(15);
+    final ShufflePartitionedData spd9 = createData(15);
+    final ShufflePartitionedData spd10 = createData(15);
+    final ShufflePartitionedData spd11 = createData(15);
+    final ShufflePartitionedData spd12 = createData(15);
+    final ShufflePartitionedData spd13 = createData(15);
+    final ShufflePartitionedData spd14 = createData(15);
+    final ShufflePartitionedData spd15 = createData(15);
+    final byte[] expectedData7 = getExpectedData(spd1);
+    final byte[] expectedData8 = getExpectedData(spd2);
+    final byte[] expectedData9 = getExpectedData(spd1, spd2);
+    final byte[] expectedData10 = getExpectedData(spd2, spd3);
+    final byte[] expectedData11 = getExpectedData(spd1, spd2, spd3, spd4);
+    final byte[] expectedData12 = getExpectedData(spd4);
+    final byte[] expectedData13 = getExpectedData(spd6);
+    final byte[] expectedData14 = getExpectedData(spd4, spd5, spd6);
+    final byte[] expectedData15 = getExpectedData(spd3, spd4, spd5, spd6, 
spd7);
+    final byte[] expectedData16 = getExpectedData(spd6, spd7);
+    final byte[] expectedData17 = getExpectedData(spd6, spd7, spd8, spd9);
+    final byte[] expectedData18 = getExpectedData(spd9, spd10);
+    final byte[] expectedData19 = getExpectedData(spd10);
+    final byte[] expectedData20 = getExpectedData(spd12);
+    final byte[] expectedData21 = getExpectedData(spd10, spd11, spd12);
+    final byte[] expectedData22 = getExpectedData(spd12, spd13);
+    final byte[] expectedData23 = getExpectedData(spd13);
+    final byte[] expectedData24 = getExpectedData(spd14, spd15);
+    final byte[] expectedData25 = getExpectedData(spd15);
+    final byte[] expectedData26 = getExpectedData(spd12, spd13, spd14, spd15);
+    final byte[] expectedData27 =
+        getExpectedData(
+            spd1, spd2, spd3, spd4, spd5, spd6, spd7, spd8, spd9, spd10, 
spd11, spd12, spd13, spd14,
+            spd15);
     shuffleBuffer.append(spd1);
     shuffleBuffer.append(spd2);
     shuffleBuffer.append(spd3);
     event1 = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null);
-    ShufflePartitionedData spd4 = createData(15);
-    ShufflePartitionedData spd5 = createData(15);
-    ShufflePartitionedData spd6 = createData(15);
+
     shuffleBuffer.append(spd4);
     shuffleBuffer.append(spd5);
     shuffleBuffer.append(spd6);
+
     final ShuffleDataFlushEvent event2 = shuffleBuffer.toFlushEvent("appId", 
0, 0, 1, null);
-    ShufflePartitionedData spd7 = createData(15);
-    ShufflePartitionedData spd8 = createData(15);
-    ShufflePartitionedData spd9 = createData(15);
+
     shuffleBuffer.append(spd7);
     shuffleBuffer.append(spd8);
     shuffleBuffer.append(spd9);
     final ShuffleDataFlushEvent event3 = shuffleBuffer.toFlushEvent("appId", 
0, 0, 1, null);
-    ShufflePartitionedData spd10 = createData(15);
-    ShufflePartitionedData spd11 = createData(15);
-    ShufflePartitionedData spd12 = createData(15);
     shuffleBuffer.append(spd10);
     shuffleBuffer.append(spd11);
     shuffleBuffer.append(spd12);
     final ShuffleDataFlushEvent event4 = shuffleBuffer.toFlushEvent("appId", 
0, 0, 1, null);
-    ShufflePartitionedData spd13 = createData(15);
-    ShufflePartitionedData spd14 = createData(15);
-    ShufflePartitionedData spd15 = createData(15);
     shuffleBuffer.append(spd13);
     shuffleBuffer.append(spd14);
     shuffleBuffer.append(spd15);
@@ -387,179 +414,158 @@ public class ShuffleBufferWithLinkedListTest extends 
BufferTestBase {
     // case7 to get spd1
     List<ShufflePartitionedBlock> expectedBlocks =
         
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
-    expectedData = getExpectedData(spd1);
     sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 10);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 0, 1);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData7, sdr.getData());
 
     // case7 to get spd2
     expectedBlocks =
         
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
-    expectedData = getExpectedData(spd2);
     sdr = shuffleBuffer.getShuffleData(spd1.getBlockList()[0].getBlockId(), 
10);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 1, 1);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData8, sdr.getData());
 
     // case7 to get spd1, spd2
     expectedBlocks =
         
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
-    expectedData = getExpectedData(spd1, spd2);
     sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 20);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 0, 2);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData9, sdr.getData());
 
     // case7 to get spd2, spd3
     expectedBlocks =
         
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
-    expectedData = getExpectedData(spd2, spd3);
     sdr = shuffleBuffer.getShuffleData(spd1.getBlockList()[0].getBlockId(), 
20);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 1, 2);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData10, sdr.getData());
 
     // case7 to get spd1, spd2, spd3, spd4
     expectedBlocks =
         
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
     
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
-    expectedData = getExpectedData(spd1, spd2, spd3, spd4);
     sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 50);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 0, 4);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData11, sdr.getData());
 
     // case7 to get spd2, spd3
     expectedBlocks =
         
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
     
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
-    expectedData = getExpectedData(spd2, spd3);
     sdr = shuffleBuffer.getShuffleData(spd1.getBlockList()[0].getBlockId(), 
20);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 1, 2);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData10, sdr.getData());
 
     // case7 to get spd4
     expectedBlocks =
         
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
-    expectedData = getExpectedData(spd4);
     sdr = shuffleBuffer.getShuffleData(spd3.getBlockList()[0].getBlockId(), 
10);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 0, 1);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData12, sdr.getData());
 
     // case7 to get spd6
     expectedBlocks =
         
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
-    expectedData = getExpectedData(spd6);
     sdr = shuffleBuffer.getShuffleData(spd5.getBlockList()[0].getBlockId(), 
10);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 2, 1);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData13, sdr.getData());
 
     // case7 to get spd4, spd5, spd6
     expectedBlocks =
         
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
-    expectedData = getExpectedData(spd4, spd5, spd6);
     sdr = shuffleBuffer.getShuffleData(spd3.getBlockList()[0].getBlockId(), 
40);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 0, 3);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData14, sdr.getData());
 
     // case7 to get spd3, spd4, spd5, spd6, spd7
     expectedBlocks =
         
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
     
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
     
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event3.getEventId()));
-    expectedData = getExpectedData(spd3, spd4, spd5, spd6, spd7);
     sdr = shuffleBuffer.getShuffleData(spd2.getBlockList()[0].getBlockId(), 
70);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 2, 5);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData15, sdr.getData());
 
     // case7 to get spd6, spd7
     expectedBlocks =
         
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
     
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event3.getEventId()));
-    expectedData = getExpectedData(spd6, spd7);
     sdr = shuffleBuffer.getShuffleData(spd5.getBlockList()[0].getBlockId(), 
20);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 2, 2);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData16, sdr.getData());
 
     // case7 to get spd6, spd7, spd8, spd9
     expectedBlocks =
         
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
     
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event3.getEventId()));
-    expectedData = getExpectedData(spd6, spd7, spd8, spd9);
     sdr = shuffleBuffer.getShuffleData(spd5.getBlockList()[0].getBlockId(), 
50);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 2, 4);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData17, sdr.getData());
 
     // case7 to get spd9, spd10
     expectedBlocks =
         
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event3.getEventId()));
     
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
-    expectedData = getExpectedData(spd9, spd10);
     sdr = shuffleBuffer.getShuffleData(spd8.getBlockList()[0].getBlockId(), 
20);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 2, 2);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData18, sdr.getData());
 
     // case7 to get spd10
     expectedBlocks =
         
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
-    expectedData = getExpectedData(spd10);
     sdr = shuffleBuffer.getShuffleData(spd9.getBlockList()[0].getBlockId(), 
10);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 0, 1);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData19, sdr.getData());
 
     // case7 to get spd12
     expectedBlocks =
         
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
-    expectedData = getExpectedData(spd12);
     sdr = shuffleBuffer.getShuffleData(spd11.getBlockList()[0].getBlockId(), 
10);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 2, 1);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData20, sdr.getData());
 
     // case7 to get spd10, spd11, spd12
     expectedBlocks =
         
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
-    expectedData = getExpectedData(spd10, spd11, spd12);
     sdr = shuffleBuffer.getShuffleData(spd9.getBlockList()[0].getBlockId(), 
40);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 0, 3);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData21, sdr.getData());
 
     // case7 to get spd12, spd13
     expectedBlocks =
         
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
     expectedBlocks.addAll(shuffleBuffer.getBlocks());
-    expectedData = getExpectedData(spd12, spd13);
     sdr = shuffleBuffer.getShuffleData(spd11.getBlockList()[0].getBlockId(), 
20);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 2, 2);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData22, sdr.getData());
 
     // case7 to get spd13
     expectedBlocks = Lists.newArrayList(shuffleBuffer.getBlocks());
     expectedBlocks.addAll(shuffleBuffer.getBlocks());
-    expectedData = getExpectedData(spd13);
     sdr = shuffleBuffer.getShuffleData(spd12.getBlockList()[0].getBlockId(), 
10);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 0, 1);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData23, sdr.getData());
 
     // case7 to get spd14, spd15
     expectedBlocks = Lists.newArrayList(shuffleBuffer.getBlocks());
     expectedBlocks.addAll(shuffleBuffer.getBlocks());
-    expectedData = getExpectedData(spd14, spd15);
     sdr = shuffleBuffer.getShuffleData(spd13.getBlockList()[0].getBlockId(), 
20);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 1, 2);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData24, sdr.getData());
 
     // case7 to get spd15
     expectedBlocks = Lists.newArrayList(shuffleBuffer.getBlocks());
     expectedBlocks.addAll(shuffleBuffer.getBlocks());
-    expectedData = getExpectedData(spd15);
     sdr = shuffleBuffer.getShuffleData(spd14.getBlockList()[0].getBlockId(), 
10);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 2, 1);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData25, sdr.getData());
 
     // case7 to get spd12, spd13, spd14, spd15
     expectedBlocks =
         
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
     expectedBlocks.addAll(shuffleBuffer.getBlocks());
-    expectedData = getExpectedData(spd12, spd13, spd14, spd15);
     sdr = shuffleBuffer.getShuffleData(spd11.getBlockList()[0].getBlockId(), 
50);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 2, 4);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData26, sdr.getData());
 
     // case7 to get spd1 - spd15
     expectedBlocks =
@@ -568,13 +574,9 @@ public class ShuffleBufferWithLinkedListTest extends 
BufferTestBase {
     
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event3.getEventId()));
     
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
     expectedBlocks.addAll(shuffleBuffer.getBlocks());
-    expectedData =
-        getExpectedData(
-            spd1, spd2, spd3, spd4, spd5, spd6, spd7, spd8, spd9, spd10, 
spd11, spd12, spd13, spd14,
-            spd15);
     sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 220);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 0, 15);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData27, sdr.getData());
 
     // case7 after get spd15
     sdr = shuffleBuffer.getShuffleData(spd15.getBlockList()[0].getBlockId(), 
20);
@@ -583,21 +585,22 @@ public class ShuffleBufferWithLinkedListTest extends 
BufferTestBase {
     // case7 can't find blockId, read from start
     expectedBlocks =
         
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
-    expectedData = getExpectedData(spd1, spd2);
     sdr = shuffleBuffer.getShuffleData(-200, 20);
     compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 0, 2);
-    assertArrayEquals(expectedData, sdr.getData());
+    assertArrayEquals(expectedData9, sdr.getData());
   }
 
   @Test
   public void appendRepeatBlockTest() {
-    ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
+    ShuffleBuffer shuffleBuffer = createShuffleBuffer();
     ShufflePartitionedData block = createData(10);
     shuffleBuffer.append(block);
     // ShufflePartitionedBlock has constant 32 bytes overhead
     assertEquals(42, shuffleBuffer.getEncodedLength());
 
-    shuffleBuffer.append(block);
+    ShufflePartitionedData block2 = createData(10);
+    block2.getBlockList()[0].setBlockId(block.getBlockList()[0].getBlockId());
+    shuffleBuffer.append(block2);
     // The repeat block should not append to shuffleBuffer
     assertEquals(42, shuffleBuffer.getEncodedLength());
   }
@@ -611,6 +614,7 @@ public class ShuffleBufferWithLinkedListTest extends 
BufferTestBase {
     int offset = 0;
     for (ShufflePartitionedData spd : spds) {
       ShufflePartitionedBlock block = spd.getBlockList()[0];
+      block.getData().resetReaderIndex();
       ByteBufUtils.readBytes(block.getData(), expectedData, offset, 
block.getDataLength());
       offset += block.getDataLength();
     }
@@ -641,4 +645,9 @@ public class ShuffleBufferWithLinkedListTest extends 
BufferTestBase {
   protected AtomicInteger getAtomSequenceNo() {
     return atomSequenceNo;
   }
+
+  @Override
+  protected ShuffleBuffer createShuffleBuffer() {
+    return new ShuffleBufferWithLinkedList();
+  }
 }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
index 87ce14569..e311bc80b 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
@@ -39,7 +39,7 @@ public class ShuffleBufferWithSkipListTest extends 
BufferTestBase {
 
   @Test
   public void appendTest() {
-    ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList();
+    ShuffleBuffer shuffleBuffer = createShuffleBuffer();
     shuffleBuffer.append(createData(10));
     // ShufflePartitionedBlock has constant 32 bytes overhead
     assertEquals(42, shuffleBuffer.getEncodedLength());
@@ -53,7 +53,7 @@ public class ShuffleBufferWithSkipListTest extends 
BufferTestBase {
 
   @Test
   public void appendMultiBlocksTest() {
-    ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList();
+    ShuffleBuffer shuffleBuffer = createShuffleBuffer();
     ShufflePartitionedData data1 = createData(10);
     ShufflePartitionedData data2 = createData(10);
     ShufflePartitionedBlock[] dataCombine = new ShufflePartitionedBlock[2];
@@ -65,7 +65,7 @@ public class ShuffleBufferWithSkipListTest extends 
BufferTestBase {
 
   @Test
   public void toFlushEventTest() {
-    ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList();
+    ShuffleBuffer shuffleBuffer = createShuffleBuffer();
     ShuffleDataFlushEvent event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, 
null);
     assertNull(event);
     shuffleBuffer.append(createData(10));
@@ -79,7 +79,7 @@ public class ShuffleBufferWithSkipListTest extends 
BufferTestBase {
   @Test
   public void getShuffleDataWithExpectedTaskIdsFilterTest() {
     /** case1: all blocks in cached(or in flushed map) and size < 
readBufferSize */
-    ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList();
+    ShuffleBuffer shuffleBuffer = createShuffleBuffer();
     ShufflePartitionedData spd1 = createData(1, 1, 15);
     ShufflePartitionedData spd2 = createData(1, 0, 15);
     ShufflePartitionedData spd3 = createData(1, 2, 55);
@@ -199,13 +199,15 @@ public class ShuffleBufferWithSkipListTest extends 
BufferTestBase {
 
   @Test
   public void appendRepeatBlockTest() {
-    ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList();
+    ShuffleBuffer shuffleBuffer = createShuffleBuffer();
     ShufflePartitionedData block = createData(10);
     shuffleBuffer.append(block);
     // ShufflePartitionedBlock has constant 32 bytes overhead
     assertEquals(42, shuffleBuffer.getEncodedLength());
 
-    shuffleBuffer.append(block);
+    ShufflePartitionedData block2 = createData(10);
+    block2.getBlockList()[0].setBlockId(block.getBlockList()[0].getBlockId());
+    shuffleBuffer.append(block2);
     // The repeat block should not append to shuffleBuffer
     assertEquals(42, shuffleBuffer.getEncodedLength());
   }
@@ -214,4 +216,9 @@ public class ShuffleBufferWithSkipListTest extends 
BufferTestBase {
   protected AtomicInteger getAtomSequenceNo() {
     return atomSequenceNo;
   }
+
+  @Override
+  protected ShuffleBuffer createShuffleBuffer() {
+    return new ShuffleBufferWithSkipList();
+  }
 }


Reply via email to