This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new c83a1b5eb [#1727] feat(server): Introduce local allocation buffer to
store blocks in memory (#2492)
c83a1b5eb is described below
commit c83a1b5ebc02d92152810b2855fec90232b07d7e
Author: xianjingfeng <[email protected]>
AuthorDate: Thu Jun 12 14:10:10 2025 +0800
[#1727] feat(server): Introduce local allocation buffer to store blocks in
memory (#2492)
### What changes were proposed in this pull request?
Introduce local allocation buffer to store blocks in memory.
### Why are the changes needed?
Fix: #1727
### Does this PR introduce _any_ user-facing change?
set `rss.server.buffer.lab.enable` to `true` in server.conf
### How was this patch tested?
CI and verify in production environment
---
.../uniffle/common/ShufflePartitionedBlock.java | 24 +-
.../apache/uniffle/common/config/ConfigUtils.java | 3 +
.../apache/uniffle/common/util/ByteBufUtils.java | 7 +
.../common/ShufflePartitionedBlockTest.java | 11 -
.../apache/uniffle/server/ShuffleServerConf.java | 40 +++
.../uniffle/server/ShuffleServerGrpcService.java | 2 +-
.../server/buffer/ShuffleBufferManager.java | 20 +-
.../server/buffer/ShuffleBufferWithLinkedList.java | 30 +-
.../server/buffer/ShuffleBufferWithSkipList.java | 29 +-
.../apache/uniffle/server/buffer/lab/Chunk.java | 94 +++++++
.../uniffle/server/buffer/lab/ChunkCreator.java | 313 +++++++++++++++++++++
.../org/apache/uniffle/server/buffer/lab/LAB.java | 108 +++++++
.../buffer/lab/LABShuffleBufferWithLinkedList.java | 84 ++++++
.../buffer/lab/LABShuffleBufferWithSkipList.java | 82 ++++++
.../buffer/lab/LABShufflePartitionedBlock.java | 40 +++
.../uniffle/server/buffer/lab/OffheapChunk.java | 40 +++
.../uniffle/server/buffer/lab/SupportsLAB.java | 20 ++
.../org/apache/uniffle/server/merge/Partition.java | 10 +-
.../server/netty/ShuffleServerNettyHandler.java | 2 +-
.../uniffle/server/buffer/BufferTestBase.java | 2 +
.../buffer/LABShuffleBufferWithLinkedListTest.java | 35 +++
.../buffer/LABShuffleBufferWithSkipListTest.java | 35 +++
.../server/buffer/ShuffleBufferManagerTest.java | 25 +-
.../buffer/ShuffleBufferWithLinkedListTest.java | 187 ++++++------
.../buffer/ShuffleBufferWithSkipListTest.java | 19 +-
25 files changed, 1114 insertions(+), 148 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
index e476e37f3..0a04d9be7 100644
---
a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
+++
b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
@@ -38,12 +38,13 @@ public class ShufflePartitionedBlock {
long blockId,
long taskAttemptId,
byte[] data) {
- this.dataLength = dataLength;
- this.crc = crc;
- this.blockId = blockId;
- this.uncompressLength = uncompressLength;
- this.taskAttemptId = taskAttemptId;
- this.data = data == null ? Unpooled.EMPTY_BUFFER :
Unpooled.wrappedBuffer(data);
+ this(
+ dataLength,
+ uncompressLength,
+ crc,
+ blockId,
+ taskAttemptId,
+ data == null ? Unpooled.EMPTY_BUFFER : Unpooled.wrappedBuffer(data));
}
public ShufflePartitionedBlock(
@@ -61,6 +62,10 @@ public class ShufflePartitionedBlock {
this.data = data;
}
+ public boolean isOnLAB() {
+ return false;
+ }
+
// calculate the data size for this block in memory including metadata which
are
// blockId, crc, taskAttemptId, length, uncompressLength
public long getEncodedLength() {
@@ -76,15 +81,12 @@ public class ShufflePartitionedBlock {
return false;
}
ShufflePartitionedBlock that = (ShufflePartitionedBlock) o;
- return dataLength == that.dataLength
- && crc == that.crc
- && blockId == that.blockId
- && data.equals(that.data);
+ return blockId == that.blockId;
}
@Override
public int hashCode() {
- return Objects.hash(dataLength, crc, blockId, data);
+ return Objects.hash(blockId);
}
public int getDataLength() {
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
index 64f0ee1d5..b8f5ac096 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
@@ -226,4 +226,7 @@ public class ConfigUtils {
public static final Function<Double, Boolean> PERCENTAGE_DOUBLE_VALIDATOR =
value -> Double.compare(value, 100.0) <= 0 && Double.compare(value, 0.0)
>= 0;
+
+ public static final Function<Double, Boolean> DOUBLE_VALIDATOR_ZERO_TO_ONE =
+ value -> Double.compare(value, 1.0) <= 0 && Double.compare(value, 0.0)
>= 0;
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/util/ByteBufUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/ByteBufUtils.java
index 24f60e20e..c5f2a5e58 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/ByteBufUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/ByteBufUtils.java
@@ -96,4 +96,11 @@ public class ByteBufUtils {
final ByteBuffer byteBuffer = bytes.asReadOnlyByteBuffer();
return Unpooled.wrappedBuffer(byteBuffer);
}
+
+ public static ByteBuf copy(ByteBuf from) {
+ ByteBuf newByteBuf = Unpooled.directBuffer(from.readableBytes());
+ newByteBuf.writeBytes(from);
+ from.resetReaderIndex();
+ return newByteBuf;
+ }
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
b/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
index eb79ede63..d8c0ddbc5 100644
---
a/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
+++
b/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
@@ -20,8 +20,6 @@ package org.apache.uniffle.common;
import java.util.Random;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.CsvSource;
import org.apache.uniffle.common.util.ByteBufUtils;
@@ -57,15 +55,6 @@ public class ShufflePartitionedBlockTest {
assertNotEquals(b1, new Object());
}
- @ParameterizedTest
- @CsvSource({"5, 2, 3, 4", "1, 5, 3, 4", "1, 2, 5, 4", "1, 2, 3, 5"})
- public void testNotEquals(int length, long crc, long blockId, int dataSize) {
- ShufflePartitionedBlock b1 = new ShufflePartitionedBlock(1, 0, 2, 3, 0,
new byte[4]);
- ShufflePartitionedBlock b2 =
- new ShufflePartitionedBlock(length, 0, crc, blockId, 0, new
byte[dataSize]);
- assertNotEquals(b1, b2);
- }
-
@Test
public void testToString() {
ShufflePartitionedBlock b1 = new ShufflePartitionedBlock(1, 2, 3, 4, 5,
new byte[6]);
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index edf7d8684..7490f97a7 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -505,6 +505,46 @@ public class ShuffleServerConf extends RssBaseConf {
"Threshold when flushing shuffle data to persistent storage,
recommend value would be 256K, "
+ "512K, or even 1M");
+ public static final ConfigOption<Boolean> SERVER_SHUFFLE_BUFFER_LAB_ENABLE =
+ ConfigOptions.key("rss.server.buffer.lab.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether enable LAB(Local allocation buffer) for
shuffle buffers.");
+
+ public static final ConfigOption<Integer>
SERVER_SHUFFLE_BUFFER_LAB_CHUNK_SIZE =
+ ConfigOptions.key("rss.server.buffer.lab.chunkSize")
+ .intType()
+ .defaultValue(1024 * 100) // 100K
+ .withDescription(
+ "Defines the pre-allocated chunk size per partition for LAB. "
+ + "Each partition reserves one chunk of this size. "
+ + "Larger values may cause memory waste when processing many
partitions, "
+ + "while smaller values generate excessive small chunks,
increasing GC frequency and overhead. "
+ + "Configure based on expected data size, available memory,
and GC tolerance to balance efficiency.");
+
+ public static final ConfigOption<Double>
SERVER_SHUFFLE_BUFFER_LAB_MAX_ALLOC_RATIO =
+ ConfigOptions.key("rss.server.buffer.lab.maxAllocRatio")
+ .doubleType()
+ .checkValue(
+ ConfigUtils.DOUBLE_VALIDATOR_ZERO_TO_ONE,
+ "The lab max alloc ratio must be between 0.0 and 1.0")
+ .defaultValue(0.2)
+ .withDescription(
+ "If the block size is not small, we don't need to put it in the
chunk."
+ + " If the ratio is 0.2, it means the blocks which size is
less or equal than "
+ + SERVER_SHUFFLE_BUFFER_LAB_CHUNK_SIZE.key()
+ + " * 0.2 will be put in the chunk.");
+
+ public static final ConfigOption<Double>
SERVER_SHUFFLE_BUFFER_LAB_CHUNK_POOL_CAPACITY_RATIO =
+ ConfigOptions.key("rss.server.buffer.lab.chunkPoolCapacityRatio")
+ .doubleType()
+ .defaultValue(1.0)
+ .withDescription(
+ "Controls the maximum memory capacity ratio between LAB's chunk
pool and the configured buffer capacity. "
+ + "The ratio represents (total memory of chunk pool) / "
+ + SERVER_BUFFER_CAPACITY.key()
+ + ".");
+
public static final ConfigOption<String> STORAGE_MEDIA_PROVIDER_ENV_KEY =
ConfigOptions.key("rss.server.storageMediaProvider.from.env.key")
.stringType()
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 3977f5fdc..979761f9e 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -478,7 +478,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
+ e.getMessage();
ret = StatusCode.INTERNAL_ERROR;
responseMessage = errorMsg;
- LOG.error(errorMsg);
+ LOG.error(errorMsg, e);
hasFailureOccurred = true;
break;
} finally {
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index c4ed44d29..0b6b246b5 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -55,6 +55,9 @@ import org.apache.uniffle.server.ShuffleFlushManager;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerMetrics;
import org.apache.uniffle.server.ShuffleTaskManager;
+import org.apache.uniffle.server.buffer.lab.ChunkCreator;
+import org.apache.uniffle.server.buffer.lab.LABShuffleBufferWithLinkedList;
+import org.apache.uniffle.server.buffer.lab.LABShuffleBufferWithSkipList;
import static
org.apache.uniffle.server.ShuffleServerMetrics.BLOCK_COUNT_IN_BUFFER_POOL;
import static
org.apache.uniffle.server.ShuffleServerMetrics.BUFFER_COUNT_IN_BUFFER_POOL;
@@ -66,6 +69,7 @@ public class ShuffleBufferManager {
private static final Logger LOG =
LoggerFactory.getLogger(ShuffleBufferManager.class);
private final ShuffleBufferType shuffleBufferType;
+ private final Boolean isLABEnabled;
private final int flushTryLockTimeout;
private ShuffleTaskManager shuffleTaskManager;
private final ShuffleFlushManager shuffleFlushManager;
@@ -212,6 +216,16 @@ public class ShuffleBufferManager {
ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE));
}
});
+
+ isLABEnabled =
conf.get(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_LAB_ENABLE);
+ if (isLABEnabled) {
+ int chunkSize =
conf.get(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_LAB_CHUNK_SIZE);
+ double chunkPoolCapacityRatio =
+
conf.get(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_LAB_CHUNK_POOL_CAPACITY_RATIO);
+ double maxAllocRatio =
conf.get(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_LAB_MAX_ALLOC_RATIO);
+ int maxAlloc = (int) (chunkSize * maxAllocRatio);
+ ChunkCreator.initialize(chunkSize, (long) (capacity *
chunkPoolCapacityRatio), maxAlloc);
+ }
}
public void setShuffleTaskManager(ShuffleTaskManager taskManager) {
@@ -229,9 +243,11 @@ public class ShuffleBufferManager {
ShuffleServerMetrics.gaugeTotalPartitionNum.inc();
ShuffleBuffer shuffleBuffer;
if (shuffleBufferType == ShuffleBufferType.SKIP_LIST) {
- shuffleBuffer = new ShuffleBufferWithSkipList();
+ shuffleBuffer =
+ isLABEnabled ? new LABShuffleBufferWithSkipList() : new
ShuffleBufferWithSkipList();
} else {
- shuffleBuffer = new ShuffleBufferWithLinkedList();
+ shuffleBuffer =
+ isLABEnabled ? new LABShuffleBufferWithLinkedList() : new
ShuffleBufferWithLinkedList();
}
bufferRangeMap.put(Range.closed(startPartition, endPartition),
shuffleBuffer);
} else {
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index e35c42575..2216e2907 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -61,11 +61,11 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
for (ShufflePartitionedBlock block : data.getBlockList()) {
// If sendShuffleData retried, we may receive duplicate block. The
duplicate
// block would gc without release. Here we must release the duplicated
block.
- if (blocks.add(block)) {
+ if (addBlock(block)) {
currentEncodedLength += block.getEncodedLength();
currentDataLength += block.getDataLength();
} else {
- block.getData().release();
+ releaseBlock(block);
}
}
this.encodedLength += currentEncodedLength;
@@ -74,6 +74,14 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
return currentEncodedLength;
}
+ protected boolean addBlock(ShufflePartitionedBlock block) {
+ return blocks.add(block);
+ }
+
+ protected void releaseBlock(ShufflePartitionedBlock block) {
+ block.getData().release();
+ }
+
@Override
public synchronized ShuffleDataFlushEvent toFlushEvent(
String appId,
@@ -110,12 +118,7 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
spBlocks,
isValid,
this);
- event.addCleanupCallback(
- () -> {
- this.clearInFlushBuffer(event.getEventId());
- inFlushedQueueBlocks.forEach(spb -> spb.getData().release());
- inFlushSize.addAndGet(-event.getEncodedLength());
- });
+ event.addCleanupCallback(createCallbackForFlush(event));
inFlushBlockMap.put(eventId, inFlushedQueueBlocks);
blocks = new LinkedHashSet<>();
inFlushSize.addAndGet(encodedLength);
@@ -124,6 +127,15 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
return event;
}
+ protected Runnable createCallbackForFlush(ShuffleDataFlushEvent event) {
+ Set<ShufflePartitionedBlock> inFlushedQueueBlocks = blocks;
+ return () -> {
+ this.clearInFlushBuffer(event.getEventId());
+ inFlushedQueueBlocks.forEach(this::releaseBlock);
+ inFlushSize.addAndGet(-event.getEncodedLength());
+ };
+ }
+
@Override
public Set<ShufflePartitionedBlock> getBlocks() {
return blocks;
@@ -147,7 +159,7 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
evicted = true;
for (ShufflePartitionedBlock spb : blocks) {
try {
- spb.getData().release();
+ releaseBlock(spb);
releasedSize += spb.getEncodedLength();
} catch (Throwable t) {
lastException = t;
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index d6c47933a..33ba313ce 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -69,12 +69,12 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
// If sendShuffleData retried, we may receive duplicate block. The
duplicate
// block would gc without release. Here we must release the duplicated
block.
if (!blocksMap.containsKey(block.getBlockId())) {
- blocksMap.put(block.getBlockId(), block);
+ addBlock(block);
blockCount++;
currentEncodedLength += block.getEncodedLength();
currentDataLength += block.getDataLength();
} else {
- block.getData().release();
+ releaseBlock(block);
}
}
this.encodedLength += currentEncodedLength;
@@ -83,6 +83,14 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
return currentEncodedLength;
}
+ protected void addBlock(ShufflePartitionedBlock block) {
+ blocksMap.put(block.getBlockId(), block);
+ }
+
+ protected void releaseBlock(ShufflePartitionedBlock block) {
+ block.getData().release();
+ }
+
@Override
public synchronized ShuffleDataFlushEvent toFlushEvent(
String appId,
@@ -108,12 +116,7 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
spBlocks,
isValid,
this);
- event.addCleanupCallback(
- () -> {
- this.clearInFlushBuffer(event.getEventId());
- spBlocks.forEach(spb -> spb.getData().release());
- inFlushSize.addAndGet(-event.getEncodedLength());
- });
+ event.addCleanupCallback(createCallbackForFlush(event));
inFlushBlockMap.put(eventId, blocksMap);
blocksMap = newConcurrentSkipListMap();
blockCount = 0;
@@ -123,6 +126,14 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
return event;
}
+ protected Runnable createCallbackForFlush(ShuffleDataFlushEvent event) {
+ return () -> {
+ this.clearInFlushBuffer(event.getEventId());
+ event.getShuffleBlocks().forEach(this::releaseBlock);
+ inFlushSize.addAndGet(-event.getEncodedLength());
+ };
+ }
+
@Override
public Set<ShufflePartitionedBlock> getBlocks() {
return new LinkedHashSet<>(blocksMap.values());
@@ -146,7 +157,7 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
evicted = true;
for (ShufflePartitionedBlock spb : blocksMap.values()) {
try {
- spb.getData().release();
+ releaseBlock(spb);
releasedSize += spb.getEncodedLength();
} catch (Throwable t) {
lastException = t;
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/lab/Chunk.java
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/Chunk.java
new file mode 100644
index 000000000..d0aacf40c
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/lab/Chunk.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer.lab;
+
+import io.netty.buffer.ByteBuf;
+
+/** A chunk of memory out of which allocations are sliced. */
+public abstract class Chunk {
+ /** Actual underlying data */
+ protected ByteBuf data;
+ /** Size of chunk in bytes */
+ protected final int size;
+ // The unique id associated with the chunk.
+ private final int id;
+ private final boolean fromPool;
+
+ /**
+ * Create an uninitialized chunk. Note that memory is not allocated yet, so
this is cheap.
+ *
+ * @param size in bytes
+ * @param id the chunk id
+ */
+ public Chunk(int size, int id) {
+ this(size, id, false);
+ }
+
+ /**
+ * Create an uninitialized chunk. Note that memory is not allocated yet, so
this is cheap.
+ *
+ * @param size in bytes
+ * @param id the chunk id
+ * @param fromPool if the chunk is formed by pool
+ */
+ public Chunk(int size, int id, boolean fromPool) {
+ this.size = size;
+ this.id = id;
+ this.fromPool = fromPool;
+ }
+
+ int getId() {
+ return this.id;
+ }
+
+ boolean isFromPool() {
+ return this.fromPool;
+ }
+
+ /**
+ * Actually claim the memory for this chunk. This should only be called from
the thread that
+ * constructed the chunk. It is thread-safe against other threads calling
alloc(), who will block
+ * until the allocation is complete.
+ */
+ public void init() {
+ allocateDataBuffer();
+ }
+
+ abstract void allocateDataBuffer();
+
+ /**
+ * Try to get the allocated offset from the chunk.
+ *
+ * @return the offset of the successful allocation, or -1 to indicate
not-enough-space
+ */
+ public int getAllocOffset(int size) {
+ if (data.writerIndex() + size > data.capacity()) {
+ return -1;
+ }
+ return data.writerIndex();
+ }
+
+ void reset() {
+ data.clear();
+ }
+
+ /** @return This chunk's backing data. */
+ ByteBuf getData() {
+ return this.data;
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/lab/ChunkCreator.java
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/ChunkCreator.java
new file mode 100644
index 000000000..cfab30b87
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/ChunkCreator.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer.lab;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Does the management of LAB chunk creations. A monotonically incrementing id
is associated with
+ * every chunk
+ */
+public class ChunkCreator {
+ private static final Logger LOG =
LoggerFactory.getLogger(ChunkCreator.class);
+ // monotonically increasing chunkid. Starts at 1.
+ private final AtomicInteger chunkID = new AtomicInteger(1);
+
+ // mapping from chunk IDs to chunks
+ private final Map<Integer, Chunk> chunkIdMap = new
ConcurrentHashMap<Integer, Chunk>();
+ static ChunkCreator instance;
+ private final int maxAlloc;
+ private final ChunkPool chunksPool;
+ private final int chunkSize;
+
+ ChunkCreator(int chunkSize, long bufferCapacity, int maxAlloc) {
+ this.chunkSize = chunkSize;
+ this.maxAlloc = maxAlloc;
+ this.chunksPool = initializePool(bufferCapacity, chunkSize);
+ }
+
+ /**
+ * Initializes the instance of ChunkCreator
+ *
+ * @param chunkSize the chunkSize
+ * @param bufferCapacity the buffer capacity
+ * @return singleton ChunkCreator
+ */
+ public static synchronized void initialize(int chunkSize, long
bufferCapacity, int maxAlloc) {
+ if (instance != null) {
+ LOG.warn("ChunkCreator instance is already initialized.");
+ }
+ instance = new ChunkCreator(chunkSize, bufferCapacity, maxAlloc);
+ }
+
+ public static ChunkCreator getInstance() {
+ return instance;
+ }
+
+ /**
+ * Creates and inits a chunk with specific index type and type.
+ *
+ * @return the chunk that was initialized
+ */
+ Chunk getChunk() {
+ return getChunk(chunksPool.getChunkSize());
+ }
+
+ /**
+ * Creates and inits a chunk.
+ *
+ * @return the chunk that was initialized
+ * @param size the size of the chunk to be allocated, in bytes
+ */
+ Chunk getChunk(int size) {
+ Chunk chunk = null;
+ ChunkPool pool = null;
+
+ // if the size is suitable for one of the pools
+ if (chunksPool != null && size == chunksPool.getChunkSize()) {
+ pool = chunksPool;
+ }
+
+ if (pool != null) {
+ chunk = pool.getChunk();
+ if (chunk == null) {
+ LOG.warn(
+ "The chunk pool is full. Reached maxCount= "
+ + pool.getMaxCount()
+ + ". Creating chunk outside of the pool.");
+ }
+ }
+
+ if (chunk == null) {
+ chunk = createChunk(false, size);
+ }
+ chunk.init();
+ return chunk;
+ }
+
+ /**
+ * Creates the chunk
+ *
+ * @param pool indicates if the chunks have to be created which will be used
by the Pool
+ * @param size the size of the chunk to be allocated, in bytes
+ * @return the chunk
+ */
+ private Chunk createChunk(boolean pool, int size) {
+ Chunk chunk;
+ int id = chunkID.getAndIncrement();
+ Preconditions.checkArgument(id > 0, "chunkId should be positive.");
+ chunk = new OffheapChunk(size, id, pool);
+ this.chunkIdMap.put(chunk.getId(), chunk);
+ return chunk;
+ }
+
+ // Chunks from pool are created covered with strong references anyway
+ private Chunk createChunkForPool(int chunkSize) {
+ if (chunkSize != chunksPool.getChunkSize()) {
+ return null;
+ }
+ return createChunk(true, chunkSize);
+ }
+
+ private void removeChunks(List<Integer> chunkIDs) {
+ chunkIDs.forEach(this::removeChunk);
+ }
+
+ Chunk removeChunk(int chunkId) {
+ Chunk c = this.chunkIdMap.remove(chunkId);
+ c.getData().release();
+ return c;
+ }
+
+ /**
+ * A pool of {@link Chunk} instances.
+ *
+ * <p>ChunkPool caches a number of retired chunks for reusing, it could
decrease allocating bytes
+ * when writing, thereby optimizing the garbage collection on JVM.
+ */
+ private class ChunkPool {
+ private final int chunkSize;
+ private final int maxCount;
+
+ // A queue of reclaimed chunks
+ private final BlockingQueue<Chunk> reclaimedChunks;
+
+ /** Statistics thread schedule pool */
+ private final ScheduledExecutorService scheduleThreadPool;
+ /** Statistics thread */
+ private static final int statThreadPeriod = 60 * 5;
+
+ private final AtomicLong chunkCount = new AtomicLong();
+ private final LongAdder reusedChunkCount = new LongAdder();
+
+ ChunkPool(int chunkSize, int maxCount) {
+ this.chunkSize = chunkSize;
+ this.maxCount = maxCount;
+ this.reclaimedChunks = new LinkedBlockingQueue<>();
+ final String n = Thread.currentThread().getName();
+ scheduleThreadPool =
+ Executors.newScheduledThreadPool(
+ 1,
+ new ThreadFactoryBuilder()
+ .setNameFormat(n + "-ChunkPool Statistics")
+ .setDaemon(true)
+ .build());
+ this.scheduleThreadPool.scheduleAtFixedRate(
+ new StatisticsThread(), statThreadPeriod, statThreadPeriod,
TimeUnit.SECONDS);
+ }
+
+ /**
+ * Poll a chunk from the pool, reset it if not null, else create a new
chunk to return if we
+ * have not yet created max allowed chunks count. When we have already
created max allowed
+ * chunks and no free chunks as of now, return null. It is the
responsibility of the caller to
+ * make a chunk then. Note: Chunks returned by this pool must be put back
to the pool after its
+ * use.
+ *
+ * @return a chunk
+ * @see #putbackChunks(Chunk)
+ */
+ Chunk getChunk() {
+ Chunk chunk = reclaimedChunks.poll();
+ if (chunk != null) {
+ chunk.reset();
+ reusedChunkCount.increment();
+ } else {
+ // Make a chunk if we have not yet created the maxCount chunks
+ while (true) {
+ long created = this.chunkCount.get();
+ if (created < this.maxCount) {
+ if (this.chunkCount.compareAndSet(created, created + 1)) {
+ chunk = createChunkForPool(chunkSize);
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+ }
+ return chunk;
+ }
+
+ int getChunkSize() {
+ return chunkSize;
+ }
+
+ /**
+ * Add the chunks to the pool, when the pool achieves the max size, it
will skip the remaining
+ * chunks
+ */
+ private void putbackChunks(Chunk c) {
+ int toAdd = this.maxCount - reclaimedChunks.size();
+ if (c.isFromPool() && c.size == chunkSize && toAdd > 0) {
+ reclaimedChunks.add(c);
+ } else {
+ // remove the chunk (that is not going to pool)
+ ChunkCreator.this.removeChunk(c.getId());
+ }
+ }
+
+ private class StatisticsThread extends Thread {
+ StatisticsThread() {
+ super("MemStoreChunkPool.StatisticsThread");
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ logStats();
+ }
+
+ private void logStats() {
+ long created = chunkCount.get();
+ long reused = reusedChunkCount.sum();
+ long total = created + reused;
+ LOG.info(
+ "ChunkPool stats (chunk size={}): current pool size={}, created
chunk count={}, "
+ + "reused chunk count={}, reuseRatio={}",
+ chunkSize,
+ reclaimedChunks.size(),
+ created,
+ reused,
+ (total == 0 ? "0" : StringUtils.formatPercent((float) reused /
(float) total, 2)));
+ }
+ }
+
+ private int getMaxCount() {
+ return this.maxCount;
+ }
+ }
+
+ private ChunkPool initializePool(long bufferCapacity, int chunkSize) {
+ int maxCount = Math.max((int) (bufferCapacity / chunkSize), 1);
+ LOG.info(
+ "Allocating ChunkPool with chunk size {}, max count {}",
+ StringUtils.byteDesc(chunkSize),
+ maxCount);
+ return new ChunkPool(chunkSize, maxCount);
+ }
+
+ int getChunkSize() {
+ return chunkSize;
+ }
+
+ int getMaxAlloc() {
+ return maxAlloc;
+ }
+
+ synchronized void putBackChunks(List<Integer> chunks) {
+ // if there is no pool just try to clear the chunkIdMap in case there is
something
+ if (chunksPool == null) {
+ this.removeChunks(chunks);
+ return;
+ }
+
+ // if there is a pool, go over all chunk IDs that came back, the chunks
may be from pool or not
+ for (int chunkID : chunks) {
+ // translate chunk ID to chunk, if chunk initially wasn't in pool
+ // this translation will (most likely) return null
+ Chunk chunk = chunkIdMap.get(chunkID);
+ if (chunk != null) {
+ if (chunk.isFromPool()) {
+ chunksPool.putbackChunks(chunk);
+ } else {
+ // chunks which are not from one of the pools
+ // should be released without going to the pools.
+ this.removeChunk(chunkID);
+ }
+ } else {
+ LOG.warn("Chunk {} can not be found in chunkIdMap, ignore it",
chunkID);
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/lab/LAB.java
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/LAB.java
new file mode 100644
index 000000000..5d3b7000a
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/lab/LAB.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer.lab;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+
+/**
+ * Local allocation buffer.
+ *
+ * <p>The LAB is basically a bump-the-pointer allocator that allocates big
(100K) chunks from and
+ * then doles it out to threads that request slices into the array. These
chunks can get pooled as
+ * well. See {@link ChunkCreator}.
+ *
+ * <p>The purpose of this is to combat heap fragmentation in the shuffle
server. By ensuring that
+ * all blocks in a given partition refer only to large chunks of contiguous
memory, we ensure that
+ * large blocks get freed up when the partition is flushed.
+ *
+ * <p>Without the LAB, the byte array allocated during insertion end up
interleaved throughout the
+ * heap, and the old generation gets progressively more fragmented until a
stop-the-world compacting
+ * collection occurs.
+ *
+ * <p>This manages the large sized chunks. When blocks are to be added to
partition, LAB's {@link
+ * #tryCopyBlockToChunk(ShufflePartitionedBlock)} gets called. This allocates
enough size in the
+ * chunk to hold this block's data and copies into this area and then recreate
a
+ * LABShufflePartitionedBlock over this copied data.
+ *
+ * <p>
+ *
+ * @see ChunkCreator
+ */
+public class LAB {
+ private Chunk currChunk;
+
+ List<Integer> chunks = new LinkedList<>();
+ private final int maxAlloc;
+ private final ChunkCreator chunkCreator;
+
+ public LAB() {
+ this.chunkCreator = ChunkCreator.getInstance();
+ maxAlloc = chunkCreator.getMaxAlloc();
+ }
+
+ public ShufflePartitionedBlock tryCopyBlockToChunk(ShufflePartitionedBlock
block) {
+ int size = block.getDataLength();
+ if (size > maxAlloc) {
+ return block;
+ }
+ Chunk c;
+ int allocOffset;
+ while (true) {
+ // Try to get the chunk
+ c = getOrMakeChunk();
+ // Try to allocate from this chunk
+ allocOffset = c.getAllocOffset(size);
+ if (allocOffset != -1) {
+ break;
+ }
+ // not enough space!
+ currChunk = null;
+ }
+ c.getData().writeBytes(block.getData());
+ return new LABShufflePartitionedBlock(
+ block.getDataLength(),
+ block.getUncompressLength(),
+ block.getCrc(),
+ block.getBlockId(),
+ block.getTaskAttemptId(),
+ c.getData().slice(allocOffset, size));
+ }
+
+ public void close() {
+ recycleChunks();
+ }
+
+ private void recycleChunks() {
+ chunkCreator.putBackChunks(chunks);
+ }
+
+ /** Get the current chunk, or, if there is no current chunk, allocate a new
one. */
+ private Chunk getOrMakeChunk() {
+ Chunk c = currChunk;
+ if (c != null) {
+ return c;
+ }
+ c = this.chunkCreator.getChunk();
+ currChunk = c;
+ chunks.add(c.getId());
+ return c;
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/lab/LABShuffleBufferWithLinkedList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/LABShuffleBufferWithLinkedList.java
new file mode 100644
index 000000000..eb6fe0cd5
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/LABShuffleBufferWithLinkedList.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer.lab;
+
+import java.util.function.Supplier;
+
+import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.buffer.ShuffleBufferWithLinkedList;
+
+public class LABShuffleBufferWithLinkedList extends ShuffleBufferWithLinkedList
+ implements SupportsLAB {
+ private LAB lab;
+
+ public LABShuffleBufferWithLinkedList() {
+ super();
+ lab = new LAB();
+ }
+
+ @Override
+ protected boolean addBlock(ShufflePartitionedBlock block) {
+ ShufflePartitionedBlock newBlock = lab.tryCopyBlockToChunk(block);
+ boolean addSuccess = super.addBlock(newBlock);
+ if (addSuccess && newBlock.isOnLAB()) {
+ super.releaseBlock(block);
+ }
+ return addSuccess;
+ }
+
+ @Override
+ protected void releaseBlock(ShufflePartitionedBlock block) {
+ if (!block.isOnLAB()) {
+ super.releaseBlock(block);
+ }
+ }
+
+ @Override
+ public synchronized ShuffleDataFlushEvent toFlushEvent(
+ String appId,
+ int shuffleId,
+ int startPartition,
+ int endPartition,
+ Supplier<Boolean> isValid,
+ ShuffleDataDistributionType dataDistributionType) {
+ ShuffleDataFlushEvent event =
+ super.toFlushEvent(
+ appId, shuffleId, startPartition, endPartition, isValid,
dataDistributionType);
+ lab = new LAB();
+ return event;
+ }
+
+ @Override
+ protected Runnable createCallbackForFlush(ShuffleDataFlushEvent event) {
+ Runnable runnable = super.createCallbackForFlush(event);
+ final LAB labRef = lab;
+ return () -> {
+ runnable.run();
+ labRef.close();
+ };
+ }
+
+ @Override
+ public synchronized long release() {
+ long size = super.release();
+ lab.close();
+ return size;
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/lab/LABShuffleBufferWithSkipList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/LABShuffleBufferWithSkipList.java
new file mode 100644
index 000000000..8609be3cf
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/LABShuffleBufferWithSkipList.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer.lab;
+
+import java.util.function.Supplier;
+
+import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.buffer.ShuffleBufferWithSkipList;
+
+public class LABShuffleBufferWithSkipList extends ShuffleBufferWithSkipList
implements SupportsLAB {
+ private LAB lab;
+
+ public LABShuffleBufferWithSkipList() {
+ super();
+ this.lab = new LAB();
+ }
+
+ @Override
+ protected void addBlock(ShufflePartitionedBlock block) {
+ ShufflePartitionedBlock newBlock = lab.tryCopyBlockToChunk(block);
+ if (newBlock.isOnLAB()) {
+ super.releaseBlock(block);
+ }
+ super.addBlock(newBlock);
+ }
+
+ @Override
+ protected void releaseBlock(ShufflePartitionedBlock block) {
+ if (!block.isOnLAB()) {
+ super.releaseBlock(block);
+ }
+ }
+
+ @Override
+ public synchronized ShuffleDataFlushEvent toFlushEvent(
+ String appId,
+ int shuffleId,
+ int startPartition,
+ int endPartition,
+ Supplier<Boolean> isValid,
+ ShuffleDataDistributionType dataDistributionType) {
+ ShuffleDataFlushEvent event =
+ super.toFlushEvent(
+ appId, shuffleId, startPartition, endPartition, isValid,
dataDistributionType);
+ lab = new LAB();
+ return event;
+ }
+
+ @Override
+ protected Runnable createCallbackForFlush(ShuffleDataFlushEvent event) {
+ Runnable runnable = super.createCallbackForFlush(event);
+ final LAB labRef = lab;
+ return () -> {
+ runnable.run();
+ labRef.close();
+ };
+ }
+
+ @Override
+ public synchronized long release() {
+ long size = super.release();
+ lab.close();
+ return size;
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/lab/LABShufflePartitionedBlock.java
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/LABShufflePartitionedBlock.java
new file mode 100644
index 000000000..ed3618120
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/LABShufflePartitionedBlock.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer.lab;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+
+public class LABShufflePartitionedBlock extends ShufflePartitionedBlock {
+
+ public LABShufflePartitionedBlock(
+ int dataLength,
+ int uncompressLength,
+ long crc,
+ long blockId,
+ long taskAttemptId,
+ ByteBuf slice) {
+ super(dataLength, uncompressLength, crc, blockId, taskAttemptId, slice);
+ }
+
+ @Override
+ public boolean isOnLAB() {
+ return true;
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/lab/OffheapChunk.java
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/OffheapChunk.java
new file mode 100644
index 000000000..24c13e27b
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/OffheapChunk.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer.lab;
+
+import io.netty.buffer.Unpooled;
+
+/** An off heap chunk implementation. */
+public class OffheapChunk extends Chunk {
+
+ OffheapChunk(int size, int id) {
+ super(size, id);
+ }
+
+ OffheapChunk(int size, int id, boolean fromPool) {
+ super(size, id, fromPool);
+ }
+
+ @Override
+ void allocateDataBuffer() {
+ if (data == null) {
+ data = Unpooled.directBuffer(this.size);
+ data.writeInt(this.getId());
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/lab/SupportsLAB.java
b/server/src/main/java/org/apache/uniffle/server/buffer/lab/SupportsLAB.java
new file mode 100644
index 000000000..cb5dc4cee
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/lab/SupportsLAB.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer.lab;
+
+public interface SupportsLAB {}
diff --git
a/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
b/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
index 48a9e0fb6..e4000bc39 100644
--- a/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
+++ b/server/src/main/java/org/apache/uniffle/server/merge/Partition.java
@@ -51,6 +51,7 @@ import
org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.serializer.SerInputStream;
import org.apache.uniffle.common.serializer.SerOutputStream;
+import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.server.ShuffleDataReadEvent;
import org.apache.uniffle.server.buffer.ShuffleBuffer;
import org.apache.uniffle.server.buffer.ShuffleBufferWithSkipList;
@@ -158,8 +159,13 @@ public class Partition<K, V> {
try {
// If ByteBuf is released by flush cleanup will throw
IllegalReferenceCountException.
// Then we need get block buffer from file
- ByteBuf byteBuf = block.getData().retain().duplicate();
- cachedBlocks.put(blockId, byteBuf.slice(0, block.getDataLength()));
+ if (block.isOnLAB()) {
+ ByteBuf byteBuf = ByteBufUtils.copy(block.getData());
+ cachedBlocks.put(blockId, byteBuf);
+ } else {
+ ByteBuf byteBuf = block.getData().retain().duplicate();
+ cachedBlocks.put(blockId, byteBuf.slice(0, block.getDataLength()));
+ }
} catch (IllegalReferenceCountException irce) {
allCached = false;
LOG.warn("Can't read bytes from block in memory, maybe already been
flushed!");
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index ab6ce2834..c7f438d34 100644
---
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -337,7 +337,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
+ e.getMessage();
ret = StatusCode.INTERNAL_ERROR;
responseMessage = errorMsg;
- LOG.error(errorMsg);
+ LOG.error(errorMsg, e);
hasFailureOccurred = true;
} finally {
// Once the cache failure occurs, we should explicitly release
data held by byteBuf
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java
b/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java
index 7aef68c15..dd2070d02 100644
--- a/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java
+++ b/server/src/test/java/org/apache/uniffle/server/buffer/BufferTestBase.java
@@ -68,4 +68,6 @@ public abstract class BufferTestBase {
protected AtomicInteger getAtomSequenceNo() {
return atomSequenceNo;
}
+
+ protected abstract ShuffleBuffer createShuffleBuffer();
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/LABShuffleBufferWithLinkedListTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/LABShuffleBufferWithLinkedListTest.java
new file mode 100644
index 000000000..dbb23e8e5
--- /dev/null
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/LABShuffleBufferWithLinkedListTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer;
+
+import org.junit.jupiter.api.BeforeAll;
+
+import org.apache.uniffle.server.buffer.lab.ChunkCreator;
+import org.apache.uniffle.server.buffer.lab.LABShuffleBufferWithLinkedList;
+
+public class LABShuffleBufferWithLinkedListTest extends
ShuffleBufferWithLinkedListTest {
+ @BeforeAll
+ public static void setUp() {
+ ChunkCreator.initialize(1024 * 3, 1024 * 1024, 512);
+ }
+
+ @Override
+ protected ShuffleBuffer createShuffleBuffer() {
+ return new LABShuffleBufferWithLinkedList();
+ }
+}
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/LABShuffleBufferWithSkipListTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/LABShuffleBufferWithSkipListTest.java
new file mode 100644
index 000000000..36f96d6da
--- /dev/null
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/LABShuffleBufferWithSkipListTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer;
+
+import org.junit.jupiter.api.BeforeAll;
+
+import org.apache.uniffle.server.buffer.lab.ChunkCreator;
+import org.apache.uniffle.server.buffer.lab.LABShuffleBufferWithSkipList;
+
+public class LABShuffleBufferWithSkipListTest extends
ShuffleBufferWithSkipListTest {
+ @BeforeAll
+ public static void setUp() {
+ ChunkCreator.initialize(1024 * 3, 1024 * 1024, 512);
+ }
+
+ @Override
+ protected ShuffleBuffer createShuffleBuffer() {
+ return new LABShuffleBufferWithSkipList();
+ }
+}
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 4dcb4bb60..7b81926e0 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.collect.RangeMap;
import com.google.common.util.concurrent.Uninterruptibles;
+import io.netty.buffer.ByteBuf;
import io.prometheus.client.Collector;
import org.apache.commons.lang3.tuple.Pair;
import org.awaitility.Awaitility;
@@ -175,6 +176,8 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
ShufflePartitionedData spd2 = createData(0, 68);
ShufflePartitionedData spd3 = createData(0, 68);
ShufflePartitionedData spd4 = createData(0, 68);
+ final ByteBuf expected2 =
ByteBufUtils.copy(spd2.getBlockList()[0].getData());
+ final ByteBuf expected3 =
ByteBufUtils.copy(spd3.getBlockList()[0].getData());
shuffleBufferManager.cacheShuffleData(appId, 1, false, spd1);
shuffleBufferManager.cacheShuffleData(appId, 2, false, spd2);
shuffleBufferManager.cacheShuffleData(appId, 2, false, spd3);
@@ -188,10 +191,10 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
// validate get shuffle data
ShuffleDataResult sdr =
shuffleBufferManager.getShuffleData(appId, 2, 0,
Constants.INVALID_BLOCK_ID, 60);
-
assertArrayEquals(ByteBufUtils.readBytes(spd2.getBlockList()[0].getData()),
sdr.getData());
+ assertArrayEquals(ByteBufUtils.readBytes(expected2), sdr.getData());
long lastBlockId = spd2.getBlockList()[0].getBlockId();
sdr = shuffleBufferManager.getShuffleData(appId, 2, 0, lastBlockId, 100);
-
assertArrayEquals(ByteBufUtils.readBytes(spd3.getBlockList()[0].getData()),
sdr.getData());
+ assertArrayEquals(ByteBufUtils.readBytes(expected3), sdr.getData());
// flush happen
ShufflePartitionedData spd5 = createData(0, 10);
shuffleBufferManager.cacheShuffleData(appId, 4, false, spd5);
@@ -206,10 +209,10 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
assertEquals(1, bufferPool.get(appId).get(4).get(0).getBlocks().size());
// data in flush buffer now, it also can be got before flush finish
sdr = shuffleBufferManager.getShuffleData(appId, 2, 0,
Constants.INVALID_BLOCK_ID, 60);
-
assertArrayEquals(ByteBufUtils.readBytes(spd2.getBlockList()[0].getData()),
sdr.getData());
+ assertArrayEquals(ByteBufUtils.readBytes(expected2), sdr.getData());
lastBlockId = spd2.getBlockList()[0].getBlockId();
sdr = shuffleBufferManager.getShuffleData(appId, 2, 0, lastBlockId, 100);
-
assertArrayEquals(ByteBufUtils.readBytes(spd3.getBlockList()[0].getData()),
sdr.getData());
+ assertArrayEquals(ByteBufUtils.readBytes(expected3), sdr.getData());
// cache data again, it should cause flush
spd1 = createData(0, 10);
shuffleBufferManager.cacheShuffleData(appId, 1, false, spd1);
@@ -268,9 +271,12 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
assertEquals(0, shuffleSizeMap.get(appId1).get(1).get());
shuffleBufferManager.releaseMemory(463, true, false);
- shuffleBufferManager.cacheShuffleData(appId1, 1, false, spd1);
- shuffleBufferManager.cacheShuffleData(appId1, 2, false, spd2);
- shuffleBufferManager.cacheShuffleData(appId2, 1, false, spd4);
+ ShufflePartitionedData spd7 = createData(0, 67);
+ ShufflePartitionedData spd8 = createData(0, 68);
+ ShufflePartitionedData spd9 = createData(0, 68);
+ shuffleBufferManager.cacheShuffleData(appId1, 1, false, spd7);
+ shuffleBufferManager.cacheShuffleData(appId1, 2, false, spd8);
+ shuffleBufferManager.cacheShuffleData(appId2, 1, false, spd9);
shuffleBufferManager.removeBuffer(appId1);
assertNull(shuffleSizeMap.get(appId1));
assertEquals(100, shuffleSizeMap.get(appId2).get(1).get());
@@ -863,4 +869,9 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
assertEquals(1, pair.getRight().size());
assertEquals(0, pair.getRight().get(0));
}
+
+ @Override
+ protected ShuffleBuffer createShuffleBuffer() {
+ return null;
+ }
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
index 47fe3dd39..460f61151 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
@@ -46,7 +46,7 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
@Test
public void appendTest() {
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
+ ShuffleBuffer shuffleBuffer = createShuffleBuffer();
shuffleBuffer.append(createData(10));
// ShufflePartitionedBlock has constant 32 bytes overhead
assertEquals(42, shuffleBuffer.getEncodedLength());
@@ -60,7 +60,7 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
@Test
public void appendMultiBlocksTest() {
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
+ ShuffleBuffer shuffleBuffer = createShuffleBuffer();
ShufflePartitionedData data1 = createData(10);
ShufflePartitionedData data2 = createData(10);
ShufflePartitionedBlock[] dataCombine = new ShufflePartitionedBlock[2];
@@ -72,7 +72,7 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
@Test
public void toFlushEventTest() {
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
+ ShuffleBuffer shuffleBuffer = createShuffleBuffer();
ShuffleDataFlushEvent event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1,
null);
assertNull(event);
shuffleBuffer.append(createData(10));
@@ -91,7 +91,7 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
@Test
public void getShuffleDataWithExpectedTaskIdsFilterTest() {
/** case1: all blocks in cached(or in flushed map) and size <
readBufferSize */
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
+ ShuffleBuffer shuffleBuffer = createShuffleBuffer();
ShufflePartitionedData spd1 = createData(1, 1, 15);
ShufflePartitionedData spd2 = createData(1, 0, 15);
ShufflePartitionedData spd3 = createData(1, 2, 55);
@@ -203,17 +203,18 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
@Test
public void getShuffleDataWithLocalOrderTest() {
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
+ ShuffleBuffer shuffleBuffer = createShuffleBuffer();
ShufflePartitionedData spd1 = createData(1, 1, 15);
ShufflePartitionedData spd2 = createData(1, 0, 15);
ShufflePartitionedData spd3 = createData(1, 2, 15);
+ final byte[] expectedData = getExpectedData(spd1, spd2);
+ final byte[] expectedData2 = getExpectedData(spd3);
shuffleBuffer.append(spd1);
shuffleBuffer.append(spd2);
shuffleBuffer.append(spd3);
// First read from the cached data
ShuffleDataResult sdr =
shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 16);
- byte[] expectedData = getExpectedData(spd1, spd2);
compareBufferSegment(
new LinkedList<>(shuffleBuffer.getBlocks()), sdr.getBufferSegments(),
0, 2);
assertArrayEquals(expectedData, sdr.getData());
@@ -223,13 +224,12 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null,
ShuffleDataDistributionType.LOCAL_ORDER);
long lastBlockId = sdr.getBufferSegments().get(1).getBlockId();
sdr = shuffleBuffer.getShuffleData(lastBlockId, 16);
- expectedData = getExpectedData(spd3);
compareBufferSegment(
new
LinkedList<>(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId())),
sdr.getBufferSegments(),
2,
1);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData2, sdr.getData());
Iterator<ShufflePartitionedBlock> it =
event1.getShuffleBlocks().iterator();
assertEquals(0, it.next().getTaskAttemptId());
assertEquals(1, it.next().getTaskAttemptId());
@@ -254,68 +254,69 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
@Test
public void getShuffleDataTest() {
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
+ ShuffleBuffer shuffleBuffer = createShuffleBuffer();
// case1: cached data only, blockId = -1, readBufferSize > buffer size
ShufflePartitionedData spd1 = createData(10);
ShufflePartitionedData spd2 = createData(20);
+ final byte[] expectedData = getExpectedData(spd1, spd2);
shuffleBuffer.append(spd1);
shuffleBuffer.append(spd2);
- byte[] expectedData = getExpectedData(spd1, spd2);
ShuffleDataResult sdr =
shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 40);
compareBufferSegment(
new LinkedList<>(shuffleBuffer.getBlocks()), sdr.getBufferSegments(),
0, 2);
assertArrayEquals(expectedData, sdr.getData());
// case2: cached data only, blockId = -1, readBufferSize = buffer size
- shuffleBuffer = new ShuffleBufferWithLinkedList();
+ shuffleBuffer = createShuffleBuffer();
spd1 = createData(20);
spd2 = createData(20);
+ final byte[] expectedData2 = getExpectedData(spd1, spd2);
shuffleBuffer.append(spd1);
shuffleBuffer.append(spd2);
- expectedData = getExpectedData(spd1, spd2);
sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 40);
compareBufferSegment(
new LinkedList<>(shuffleBuffer.getBlocks()), sdr.getBufferSegments(),
0, 2);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData2, sdr.getData());
// case3-1: cached data only, blockId = -1, readBufferSize < buffer size
- shuffleBuffer = new ShuffleBufferWithLinkedList();
+ shuffleBuffer = createShuffleBuffer();
spd1 = createData(20);
spd2 = createData(21);
+ final byte[] expectedData3 = getExpectedData(spd1, spd2);
shuffleBuffer.append(spd1);
shuffleBuffer.append(spd2);
- expectedData = getExpectedData(spd1, spd2);
sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 40);
compareBufferSegment(
new LinkedList<>(shuffleBuffer.getBlocks()), sdr.getBufferSegments(),
0, 2);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData3, sdr.getData());
// case3-2: cached data only, blockId = -1, readBufferSize < buffer size
- shuffleBuffer = new ShuffleBufferWithLinkedList();
+ shuffleBuffer = createShuffleBuffer();
spd1 = createData(15);
spd2 = createData(15);
ShufflePartitionedData spd3 = createData(15);
+ final byte[] expectedData4 = getExpectedData(spd1, spd2);
+ final byte[] expectedData5 = getExpectedData(spd3);
shuffleBuffer.append(spd1);
shuffleBuffer.append(spd2);
shuffleBuffer.append(spd3);
- expectedData = getExpectedData(spd1, spd2);
sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 25);
compareBufferSegment(
new LinkedList<>(shuffleBuffer.getBlocks()), sdr.getBufferSegments(),
0, 2);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData4, sdr.getData());
// case4: cached data only, blockId != -1 && exist, readBufferSize <
buffer size
long lastBlockId = spd2.getBlockList()[0].getBlockId();
sdr = shuffleBuffer.getShuffleData(lastBlockId, 25);
- expectedData = getExpectedData(spd3);
compareBufferSegment(
new LinkedList<>(shuffleBuffer.getBlocks()), sdr.getBufferSegments(),
2, 1);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData5, sdr.getData());
// case5: flush data only, blockId = -1, readBufferSize < buffer size
- shuffleBuffer = new ShuffleBufferWithLinkedList();
+ shuffleBuffer = createShuffleBuffer();
spd1 = createData(15);
spd2 = createData(15);
+ final byte[] expectedData6 = getExpectedData(spd1, spd2);
shuffleBuffer.append(spd1);
shuffleBuffer.append(spd2);
ShuffleDataFlushEvent event1 = shuffleBuffer.toFlushEvent("appId", 0, 0,
1, null);
@@ -326,52 +327,78 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
sdr.getBufferSegments(),
0,
2);
- expectedData = getExpectedData(spd1, spd2);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData6, sdr.getData());
// case5: flush data only, blockId = lastBlockId
sdr = shuffleBuffer.getShuffleData(spd2.getBlockList()[0].getBlockId(),
20);
assertEquals(0, sdr.getBufferSegments().size());
// case6: no data in buffer & flush buffer
- shuffleBuffer = new ShuffleBufferWithLinkedList();
+ shuffleBuffer = createShuffleBuffer();
sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 10);
assertEquals(0, sdr.getBufferSegments().size());
assertEquals(0, sdr.getDataLength());
// case7: get data with multiple flush buffer and cached buffer
- shuffleBuffer = new ShuffleBufferWithLinkedList();
+ shuffleBuffer = createShuffleBuffer();
spd1 = createData(15);
spd2 = createData(15);
spd3 = createData(15);
+ final ShufflePartitionedData spd4 = createData(15);
+ final ShufflePartitionedData spd5 = createData(15);
+ final ShufflePartitionedData spd6 = createData(15);
+ final ShufflePartitionedData spd7 = createData(15);
+ final ShufflePartitionedData spd8 = createData(15);
+ final ShufflePartitionedData spd9 = createData(15);
+ final ShufflePartitionedData spd10 = createData(15);
+ final ShufflePartitionedData spd11 = createData(15);
+ final ShufflePartitionedData spd12 = createData(15);
+ final ShufflePartitionedData spd13 = createData(15);
+ final ShufflePartitionedData spd14 = createData(15);
+ final ShufflePartitionedData spd15 = createData(15);
+ final byte[] expectedData7 = getExpectedData(spd1);
+ final byte[] expectedData8 = getExpectedData(spd2);
+ final byte[] expectedData9 = getExpectedData(spd1, spd2);
+ final byte[] expectedData10 = getExpectedData(spd2, spd3);
+ final byte[] expectedData11 = getExpectedData(spd1, spd2, spd3, spd4);
+ final byte[] expectedData12 = getExpectedData(spd4);
+ final byte[] expectedData13 = getExpectedData(spd6);
+ final byte[] expectedData14 = getExpectedData(spd4, spd5, spd6);
+ final byte[] expectedData15 = getExpectedData(spd3, spd4, spd5, spd6,
spd7);
+ final byte[] expectedData16 = getExpectedData(spd6, spd7);
+ final byte[] expectedData17 = getExpectedData(spd6, spd7, spd8, spd9);
+ final byte[] expectedData18 = getExpectedData(spd9, spd10);
+ final byte[] expectedData19 = getExpectedData(spd10);
+ final byte[] expectedData20 = getExpectedData(spd12);
+ final byte[] expectedData21 = getExpectedData(spd10, spd11, spd12);
+ final byte[] expectedData22 = getExpectedData(spd12, spd13);
+ final byte[] expectedData23 = getExpectedData(spd13);
+ final byte[] expectedData24 = getExpectedData(spd14, spd15);
+ final byte[] expectedData25 = getExpectedData(spd15);
+ final byte[] expectedData26 = getExpectedData(spd12, spd13, spd14, spd15);
+ final byte[] expectedData27 =
+ getExpectedData(
+ spd1, spd2, spd3, spd4, spd5, spd6, spd7, spd8, spd9, spd10,
spd11, spd12, spd13, spd14,
+ spd15);
shuffleBuffer.append(spd1);
shuffleBuffer.append(spd2);
shuffleBuffer.append(spd3);
event1 = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null);
- ShufflePartitionedData spd4 = createData(15);
- ShufflePartitionedData spd5 = createData(15);
- ShufflePartitionedData spd6 = createData(15);
+
shuffleBuffer.append(spd4);
shuffleBuffer.append(spd5);
shuffleBuffer.append(spd6);
+
final ShuffleDataFlushEvent event2 = shuffleBuffer.toFlushEvent("appId",
0, 0, 1, null);
- ShufflePartitionedData spd7 = createData(15);
- ShufflePartitionedData spd8 = createData(15);
- ShufflePartitionedData spd9 = createData(15);
+
shuffleBuffer.append(spd7);
shuffleBuffer.append(spd8);
shuffleBuffer.append(spd9);
final ShuffleDataFlushEvent event3 = shuffleBuffer.toFlushEvent("appId",
0, 0, 1, null);
- ShufflePartitionedData spd10 = createData(15);
- ShufflePartitionedData spd11 = createData(15);
- ShufflePartitionedData spd12 = createData(15);
shuffleBuffer.append(spd10);
shuffleBuffer.append(spd11);
shuffleBuffer.append(spd12);
final ShuffleDataFlushEvent event4 = shuffleBuffer.toFlushEvent("appId",
0, 0, 1, null);
- ShufflePartitionedData spd13 = createData(15);
- ShufflePartitionedData spd14 = createData(15);
- ShufflePartitionedData spd15 = createData(15);
shuffleBuffer.append(spd13);
shuffleBuffer.append(spd14);
shuffleBuffer.append(spd15);
@@ -387,179 +414,158 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
// case7 to get spd1
List<ShufflePartitionedBlock> expectedBlocks =
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
- expectedData = getExpectedData(spd1);
sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 10);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 0, 1);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData7, sdr.getData());
// case7 to get spd2
expectedBlocks =
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
- expectedData = getExpectedData(spd2);
sdr = shuffleBuffer.getShuffleData(spd1.getBlockList()[0].getBlockId(),
10);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 1, 1);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData8, sdr.getData());
// case7 to get spd1, spd2
expectedBlocks =
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
- expectedData = getExpectedData(spd1, spd2);
sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 20);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 0, 2);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData9, sdr.getData());
// case7 to get spd2, spd3
expectedBlocks =
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
- expectedData = getExpectedData(spd2, spd3);
sdr = shuffleBuffer.getShuffleData(spd1.getBlockList()[0].getBlockId(),
20);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 1, 2);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData10, sdr.getData());
// case7 to get spd1, spd2, spd3, spd4
expectedBlocks =
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
- expectedData = getExpectedData(spd1, spd2, spd3, spd4);
sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 50);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 0, 4);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData11, sdr.getData());
// case7 to get spd2, spd3
expectedBlocks =
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
- expectedData = getExpectedData(spd2, spd3);
sdr = shuffleBuffer.getShuffleData(spd1.getBlockList()[0].getBlockId(),
20);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 1, 2);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData10, sdr.getData());
// case7 to get spd4
expectedBlocks =
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
- expectedData = getExpectedData(spd4);
sdr = shuffleBuffer.getShuffleData(spd3.getBlockList()[0].getBlockId(),
10);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 0, 1);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData12, sdr.getData());
// case7 to get spd6
expectedBlocks =
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
- expectedData = getExpectedData(spd6);
sdr = shuffleBuffer.getShuffleData(spd5.getBlockList()[0].getBlockId(),
10);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 2, 1);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData13, sdr.getData());
// case7 to get spd4, spd5, spd6
expectedBlocks =
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
- expectedData = getExpectedData(spd4, spd5, spd6);
sdr = shuffleBuffer.getShuffleData(spd3.getBlockList()[0].getBlockId(),
40);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 0, 3);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData14, sdr.getData());
// case7 to get spd3, spd4, spd5, spd6, spd7
expectedBlocks =
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event3.getEventId()));
- expectedData = getExpectedData(spd3, spd4, spd5, spd6, spd7);
sdr = shuffleBuffer.getShuffleData(spd2.getBlockList()[0].getBlockId(),
70);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 2, 5);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData15, sdr.getData());
// case7 to get spd6, spd7
expectedBlocks =
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event3.getEventId()));
- expectedData = getExpectedData(spd6, spd7);
sdr = shuffleBuffer.getShuffleData(spd5.getBlockList()[0].getBlockId(),
20);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 2, 2);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData16, sdr.getData());
// case7 to get spd6, spd7, spd8, spd9
expectedBlocks =
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event3.getEventId()));
- expectedData = getExpectedData(spd6, spd7, spd8, spd9);
sdr = shuffleBuffer.getShuffleData(spd5.getBlockList()[0].getBlockId(),
50);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 2, 4);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData17, sdr.getData());
// case7 to get spd9, spd10
expectedBlocks =
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event3.getEventId()));
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
- expectedData = getExpectedData(spd9, spd10);
sdr = shuffleBuffer.getShuffleData(spd8.getBlockList()[0].getBlockId(),
20);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 2, 2);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData18, sdr.getData());
// case7 to get spd10
expectedBlocks =
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
- expectedData = getExpectedData(spd10);
sdr = shuffleBuffer.getShuffleData(spd9.getBlockList()[0].getBlockId(),
10);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 0, 1);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData19, sdr.getData());
// case7 to get spd12
expectedBlocks =
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
- expectedData = getExpectedData(spd12);
sdr = shuffleBuffer.getShuffleData(spd11.getBlockList()[0].getBlockId(),
10);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 2, 1);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData20, sdr.getData());
// case7 to get spd10, spd11, spd12
expectedBlocks =
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
- expectedData = getExpectedData(spd10, spd11, spd12);
sdr = shuffleBuffer.getShuffleData(spd9.getBlockList()[0].getBlockId(),
40);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 0, 3);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData21, sdr.getData());
// case7 to get spd12, spd13
expectedBlocks =
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
expectedBlocks.addAll(shuffleBuffer.getBlocks());
- expectedData = getExpectedData(spd12, spd13);
sdr = shuffleBuffer.getShuffleData(spd11.getBlockList()[0].getBlockId(),
20);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 2, 2);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData22, sdr.getData());
// case7 to get spd13
expectedBlocks = Lists.newArrayList(shuffleBuffer.getBlocks());
expectedBlocks.addAll(shuffleBuffer.getBlocks());
- expectedData = getExpectedData(spd13);
sdr = shuffleBuffer.getShuffleData(spd12.getBlockList()[0].getBlockId(),
10);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 0, 1);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData23, sdr.getData());
// case7 to get spd14, spd15
expectedBlocks = Lists.newArrayList(shuffleBuffer.getBlocks());
expectedBlocks.addAll(shuffleBuffer.getBlocks());
- expectedData = getExpectedData(spd14, spd15);
sdr = shuffleBuffer.getShuffleData(spd13.getBlockList()[0].getBlockId(),
20);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 1, 2);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData24, sdr.getData());
// case7 to get spd15
expectedBlocks = Lists.newArrayList(shuffleBuffer.getBlocks());
expectedBlocks.addAll(shuffleBuffer.getBlocks());
- expectedData = getExpectedData(spd15);
sdr = shuffleBuffer.getShuffleData(spd14.getBlockList()[0].getBlockId(),
10);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 2, 1);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData25, sdr.getData());
// case7 to get spd12, spd13, spd14, spd15
expectedBlocks =
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
expectedBlocks.addAll(shuffleBuffer.getBlocks());
- expectedData = getExpectedData(spd12, spd13, spd14, spd15);
sdr = shuffleBuffer.getShuffleData(spd11.getBlockList()[0].getBlockId(),
50);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 2, 4);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData26, sdr.getData());
// case7 to get spd1 - spd15
expectedBlocks =
@@ -568,13 +574,9 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event3.getEventId()));
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
expectedBlocks.addAll(shuffleBuffer.getBlocks());
- expectedData =
- getExpectedData(
- spd1, spd2, spd3, spd4, spd5, spd6, spd7, spd8, spd9, spd10,
spd11, spd12, spd13, spd14,
- spd15);
sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 220);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 0, 15);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData27, sdr.getData());
// case7 after get spd15
sdr = shuffleBuffer.getShuffleData(spd15.getBlockList()[0].getBlockId(),
20);
@@ -583,21 +585,22 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
// case7 can't find blockId, read from start
expectedBlocks =
Lists.newArrayList(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
- expectedData = getExpectedData(spd1, spd2);
sdr = shuffleBuffer.getShuffleData(-200, 20);
compareBufferSegment(expectedBlocks, sdr.getBufferSegments(), 0, 2);
- assertArrayEquals(expectedData, sdr.getData());
+ assertArrayEquals(expectedData9, sdr.getData());
}
@Test
public void appendRepeatBlockTest() {
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
+ ShuffleBuffer shuffleBuffer = createShuffleBuffer();
ShufflePartitionedData block = createData(10);
shuffleBuffer.append(block);
// ShufflePartitionedBlock has constant 32 bytes overhead
assertEquals(42, shuffleBuffer.getEncodedLength());
- shuffleBuffer.append(block);
+ ShufflePartitionedData block2 = createData(10);
+ block2.getBlockList()[0].setBlockId(block.getBlockList()[0].getBlockId());
+ shuffleBuffer.append(block2);
// The repeat block should not append to shuffleBuffer
assertEquals(42, shuffleBuffer.getEncodedLength());
}
@@ -611,6 +614,7 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
int offset = 0;
for (ShufflePartitionedData spd : spds) {
ShufflePartitionedBlock block = spd.getBlockList()[0];
+ block.getData().resetReaderIndex();
ByteBufUtils.readBytes(block.getData(), expectedData, offset,
block.getDataLength());
offset += block.getDataLength();
}
@@ -641,4 +645,9 @@ public class ShuffleBufferWithLinkedListTest extends
BufferTestBase {
protected AtomicInteger getAtomSequenceNo() {
return atomSequenceNo;
}
+
+ @Override
+ protected ShuffleBuffer createShuffleBuffer() {
+ return new ShuffleBufferWithLinkedList();
+ }
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
index 87ce14569..e311bc80b 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
@@ -39,7 +39,7 @@ public class ShuffleBufferWithSkipListTest extends
BufferTestBase {
@Test
public void appendTest() {
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList();
+ ShuffleBuffer shuffleBuffer = createShuffleBuffer();
shuffleBuffer.append(createData(10));
// ShufflePartitionedBlock has constant 32 bytes overhead
assertEquals(42, shuffleBuffer.getEncodedLength());
@@ -53,7 +53,7 @@ public class ShuffleBufferWithSkipListTest extends
BufferTestBase {
@Test
public void appendMultiBlocksTest() {
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList();
+ ShuffleBuffer shuffleBuffer = createShuffleBuffer();
ShufflePartitionedData data1 = createData(10);
ShufflePartitionedData data2 = createData(10);
ShufflePartitionedBlock[] dataCombine = new ShufflePartitionedBlock[2];
@@ -65,7 +65,7 @@ public class ShuffleBufferWithSkipListTest extends
BufferTestBase {
@Test
public void toFlushEventTest() {
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList();
+ ShuffleBuffer shuffleBuffer = createShuffleBuffer();
ShuffleDataFlushEvent event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1,
null);
assertNull(event);
shuffleBuffer.append(createData(10));
@@ -79,7 +79,7 @@ public class ShuffleBufferWithSkipListTest extends
BufferTestBase {
@Test
public void getShuffleDataWithExpectedTaskIdsFilterTest() {
/** case1: all blocks in cached(or in flushed map) and size <
readBufferSize */
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList();
+ ShuffleBuffer shuffleBuffer = createShuffleBuffer();
ShufflePartitionedData spd1 = createData(1, 1, 15);
ShufflePartitionedData spd2 = createData(1, 0, 15);
ShufflePartitionedData spd3 = createData(1, 2, 55);
@@ -199,13 +199,15 @@ public class ShuffleBufferWithSkipListTest extends
BufferTestBase {
@Test
public void appendRepeatBlockTest() {
- ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList();
+ ShuffleBuffer shuffleBuffer = createShuffleBuffer();
ShufflePartitionedData block = createData(10);
shuffleBuffer.append(block);
// ShufflePartitionedBlock has constant 32 bytes overhead
assertEquals(42, shuffleBuffer.getEncodedLength());
- shuffleBuffer.append(block);
+ ShufflePartitionedData block2 = createData(10);
+ block2.getBlockList()[0].setBlockId(block.getBlockList()[0].getBlockId());
+ shuffleBuffer.append(block2);
// The repeat block should not append to shuffleBuffer
assertEquals(42, shuffleBuffer.getEncodedLength());
}
@@ -214,4 +216,9 @@ public class ShuffleBufferWithSkipListTest extends
BufferTestBase {
protected AtomicInteger getAtomSequenceNo() {
return atomSequenceNo;
}
+
+ @Override
+ protected ShuffleBuffer createShuffleBuffer() {
+ return new ShuffleBufferWithSkipList();
+ }
}