This is an automated email from the ASF dual-hosted git repository. xianjingfeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new 6599ef27e [#1809] fix(server): add block counter for skip-list buffer (#1846) 6599ef27e is described below commit 6599ef27e257d117ffb01855cf0d2e8351b90eec Author: xianjingfeng <xianjingfeng...@gmail.com> AuthorDate: Tue Jul 2 15:54:56 2024 +0800 [#1809] fix(server): add block counter for skip-list buffer (#1846) ### What changes were proposed in this pull request? Add block counter for SKIP_LIST buffer. ### Why are the changes needed? The performance of getting the size of skip-list is not good. Fix: #1809 ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing UT. --- .../main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java | 2 ++ .../org/apache/uniffle/server/buffer/ShuffleBufferManager.java | 4 ++-- .../apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java | 5 +++++ .../apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java | 8 ++++++++ 4 files changed, 17 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java index cb01bb0a9..1884a50c0 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java @@ -57,6 +57,8 @@ public interface ShuffleBuffer { /** Only for test */ List<ShufflePartitionedBlock> getBlocks(); + int getBlockCount(); + void release(); void clearInFlushBuffer(long eventId); diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java index d17716daf..c62ebd8c5 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java @@ -278,7 +278,7 @@ public class ShuffleBufferManager { // than rss.server.flush.cold.storage.threshold.size, otherwise cold storage will be useless. if ((isHugePartition || this.bufferFlushEnabled) && (buffer.getSize() > this.bufferFlushThreshold - || buffer.getBlocks().size() > bufferFlushBlocksNumThreshold)) { + || buffer.getBlockCount() > bufferFlushBlocksNumThreshold)) { if (LOG.isDebugEnabled()) { LOG.debug( "Start to flush single buffer. Details - shuffleId:{}, startPartition:{}, endPartition:{}, isHugePartition:{}, bufferSize:{}, blocksNum:{}", @@ -287,7 +287,7 @@ public class ShuffleBufferManager { endPartition, isHugePartition, buffer.getSize(), - buffer.getBlocks().size()); + buffer.getBlockCount()); } flushBuffer(buffer, appId, shuffleId, startPartition, endPartition, isHugePartition); } diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java index c01bb0823..d37dc446f 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java @@ -105,6 +105,11 @@ public class ShuffleBufferWithLinkedList extends AbstractShuffleBuffer { return blocks; } + @Override + public int getBlockCount() { + return getBlocks().size(); + } + @Override public void release() { blocks.forEach(spb -> spb.getData().release()); diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java index 4783b4f56..db0ce2543 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java @@ -42,6 +42,7 @@ import org.apache.uniffle.server.ShuffleFlushManager; public class ShuffleBufferWithSkipList extends AbstractShuffleBuffer { private ConcurrentSkipListMap<Long, ShufflePartitionedBlock> blocksMap; private final Map<Long, ConcurrentSkipListMap<Long, ShufflePartitionedBlock>> inFlushBlockMap; + private int blockCount; public ShuffleBufferWithSkipList(long capacity) { super(capacity); @@ -63,6 +64,7 @@ public class ShuffleBufferWithSkipList extends AbstractShuffleBuffer { synchronized (this) { for (ShufflePartitionedBlock block : data.getBlockList()) { blocksMap.put(block.getBlockId(), block); + blockCount++; mSize += block.getSize(); } size += mSize; @@ -94,6 +96,7 @@ public class ShuffleBufferWithSkipList extends AbstractShuffleBuffer { }); inFlushBlockMap.put(eventId, blocksMap); blocksMap = newConcurrentSkipListMap(); + blockCount = 0; size = 0; return event; } @@ -103,6 +106,11 @@ public class ShuffleBufferWithSkipList extends AbstractShuffleBuffer { return new LinkedList<>(blocksMap.values()); } + @Override + public int getBlockCount() { + return blockCount; + } + @Override public void release() { blocksMap.values().forEach(spb -> spb.getData().release());