This is an automated email from the ASF dual-hosted git repository.

xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 6599ef27e [#1809] fix(server): add block counter for skip-list buffer 
(#1846)
6599ef27e is described below

commit 6599ef27e257d117ffb01855cf0d2e8351b90eec
Author: xianjingfeng <xianjingfeng...@gmail.com>
AuthorDate: Tue Jul 2 15:54:56 2024 +0800

    [#1809] fix(server): add block counter for skip-list buffer (#1846)
    
    ### What changes were proposed in this pull request?
    Add block counter for SKIP_LIST buffer.
    
    ### Why are the changes needed?
    The performance of getting the size of skip-list is not good.
    Fix: #1809
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Existing UT.
---
 .../main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java | 2 ++
 .../org/apache/uniffle/server/buffer/ShuffleBufferManager.java    | 4 ++--
 .../apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java | 5 +++++
 .../apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java   | 8 ++++++++
 4 files changed, 17 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index cb01bb0a9..1884a50c0 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -57,6 +57,8 @@ public interface ShuffleBuffer {
   /** Only for test */
   List<ShufflePartitionedBlock> getBlocks();
 
+  int getBlockCount();
+
   void release();
 
   void clearInFlushBuffer(long eventId);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index d17716daf..c62ebd8c5 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -278,7 +278,7 @@ public class ShuffleBufferManager {
     // than rss.server.flush.cold.storage.threshold.size, otherwise cold 
storage will be useless.
     if ((isHugePartition || this.bufferFlushEnabled)
         && (buffer.getSize() > this.bufferFlushThreshold
-            || buffer.getBlocks().size() > bufferFlushBlocksNumThreshold)) {
+            || buffer.getBlockCount() > bufferFlushBlocksNumThreshold)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug(
             "Start to flush single buffer. Details - shuffleId:{}, 
startPartition:{}, endPartition:{}, isHugePartition:{}, bufferSize:{}, 
blocksNum:{}",
@@ -287,7 +287,7 @@ public class ShuffleBufferManager {
             endPartition,
             isHugePartition,
             buffer.getSize(),
-            buffer.getBlocks().size());
+            buffer.getBlockCount());
       }
       flushBuffer(buffer, appId, shuffleId, startPartition, endPartition, 
isHugePartition);
     }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index c01bb0823..d37dc446f 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -105,6 +105,11 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
     return blocks;
   }
 
+  @Override
+  public int getBlockCount() {
+    return getBlocks().size();
+  }
+
   @Override
   public void release() {
     blocks.forEach(spb -> spb.getData().release());
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index 4783b4f56..db0ce2543 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -42,6 +42,7 @@ import org.apache.uniffle.server.ShuffleFlushManager;
 public class ShuffleBufferWithSkipList extends AbstractShuffleBuffer {
   private ConcurrentSkipListMap<Long, ShufflePartitionedBlock> blocksMap;
   private final Map<Long, ConcurrentSkipListMap<Long, 
ShufflePartitionedBlock>> inFlushBlockMap;
+  private int blockCount;
 
   public ShuffleBufferWithSkipList(long capacity) {
     super(capacity);
@@ -63,6 +64,7 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
     synchronized (this) {
       for (ShufflePartitionedBlock block : data.getBlockList()) {
         blocksMap.put(block.getBlockId(), block);
+        blockCount++;
         mSize += block.getSize();
       }
       size += mSize;
@@ -94,6 +96,7 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
         });
     inFlushBlockMap.put(eventId, blocksMap);
     blocksMap = newConcurrentSkipListMap();
+    blockCount = 0;
     size = 0;
     return event;
   }
@@ -103,6 +106,11 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
     return new LinkedList<>(blocksMap.values());
   }
 
+  @Override
+  public int getBlockCount() {
+    return blockCount;
+  }
+
   @Override
   public void release() {
     blocksMap.values().forEach(spb -> spb.getData().release());

Reply via email to