This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 21bdae957 [#2154] fix: Correct the data length and buffer size metrics 
and logs (#2155)
21bdae957 is described below

commit 21bdae957588fc21758fbba0e72d0e2bb8527075
Author: maobaolong <[email protected]>
AuthorDate: Thu Oct 17 10:02:09 2024 +0800

    [#2154] fix: Correct the data length and buffer size metrics and logs 
(#2155)
    
    ### What changes were proposed in this pull request?
    
    Correct the data length and buffer size metrics and logs
    
    ### Why are the changes needed?
    
    Fix: #2154
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    UTs.
---
 .../uniffle/server/DefaultFlushEventHandler.java   |  4 +-
 .../apache/uniffle/server/HugePartitionUtils.java  |  2 +-
 .../uniffle/server/ShuffleDataFlushEvent.java      | 45 +++++++++++++++++++---
 .../apache/uniffle/server/ShuffleFlushManager.java |  6 +--
 .../server/buffer/AbstractShuffleBuffer.java       | 16 ++++++--
 .../uniffle/server/buffer/ShuffleBuffer.java       |  6 ++-
 .../server/buffer/ShuffleBufferManager.java        | 16 ++++----
 .../server/buffer/ShuffleBufferWithLinkedList.java | 28 ++++++++++----
 .../server/buffer/ShuffleBufferWithSkipList.java   | 29 ++++++++++----
 .../server/storage/HadoopStorageManager.java       |  2 +-
 .../server/storage/LocalStorageManager.java        |  4 +-
 .../server/storage/SingleStorageManager.java       | 18 +++------
 .../hybrid/DefaultStorageManagerSelector.java      |  2 +-
 .../uniffle/server/ShuffleFlushManagerTest.java    |  3 ++
 .../server/buffer/ShuffleBufferManagerTest.java    | 18 ++++-----
 .../buffer/ShuffleBufferWithLinkedListTest.java    | 18 ++++-----
 .../buffer/ShuffleBufferWithSkipListTest.java      | 18 ++++-----
 17 files changed, 152 insertions(+), 83 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java 
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
index 5b06bce76..9fa586628 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
@@ -110,7 +110,7 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
             "Flush event:{} successfully in {} ms and release {} bytes",
             event,
             System.currentTimeMillis() - start,
-            event.getSize());
+            event.getEncodedLength());
       }
     } catch (Exception e) {
       if (e instanceof EventRetryException) {
@@ -134,7 +134,7 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
             "Flush event: {} failed in {} ms and release {} bytes. This will 
make data lost.",
             event,
             System.currentTimeMillis() - start,
-            event.getSize());
+            event.getEncodedLength());
         return;
       }
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java 
b/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
index 3d7099c8b..f2501cd42 100644
--- a/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
+++ b/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
@@ -133,7 +133,7 @@ public class HugePartitionUtils {
     if (usedPartitionDataSize > 
shuffleBufferManager.getHugePartitionSizeThreshold()) {
       ShuffleBuffer buffer =
           shuffleBufferManager.getShuffleBufferEntry(appId, shuffleId, 
partitionId).getValue();
-      long memoryUsed = buffer.getInFlushSize() + buffer.getSize();
+      long memoryUsed = buffer.getInFlushSize() + buffer.getEncodedLength();
       if (memoryUsed > shuffleBufferManager.getHugePartitionMemoryLimitSize()) 
{
         LOG.warn(
             "AppId: {}, shuffleId: {}, partitionId: {}, memory used: {}, "
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
index b1c889adb..7823cad04 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,7 +39,11 @@ public class ShuffleDataFlushEvent {
   private final int shuffleId;
   private final int startPartition;
   private final int endPartition;
-  private final long size;
+  /** The memory cost size include encoded length */
+  private final long encodedLength;
+  /** The data size of this shuffle block */
+  private final long dataLength;
+
   private final Collection<ShufflePartitionedBlock> shuffleBlocks;
   private final Supplier<Boolean> valid;
   private final ShuffleBuffer shuffleBuffer;
@@ -51,13 +56,38 @@ public class ShuffleDataFlushEvent {
   private boolean ownedByHugePartition = false;
   private long startPendingTime;
 
+  @VisibleForTesting
+  public ShuffleDataFlushEvent(
+      long eventId,
+      String appId,
+      int shuffleId,
+      int startPartition,
+      int endPartition,
+      long encodedLength,
+      Collection<ShufflePartitionedBlock> shuffleBlocks,
+      Supplier<Boolean> valid,
+      ShuffleBuffer shuffleBuffer) {
+    this(
+        eventId,
+        appId,
+        shuffleId,
+        startPartition,
+        endPartition,
+        encodedLength,
+        encodedLength,
+        shuffleBlocks,
+        valid,
+        shuffleBuffer);
+  }
+
   public ShuffleDataFlushEvent(
       long eventId,
       String appId,
       int shuffleId,
       int startPartition,
       int endPartition,
-      long size,
+      long encodedLength,
+      long dataLength,
       Collection<ShufflePartitionedBlock> shuffleBlocks,
       Supplier<Boolean> valid,
       ShuffleBuffer shuffleBuffer) {
@@ -66,11 +96,12 @@ public class ShuffleDataFlushEvent {
     this.shuffleId = shuffleId;
     this.startPartition = startPartition;
     this.endPartition = endPartition;
-    this.size = size;
+    this.encodedLength = encodedLength;
     this.shuffleBlocks = shuffleBlocks;
     this.valid = valid;
     this.shuffleBuffer = shuffleBuffer;
     this.cleanupCallbackChains = new ArrayList<>();
+    this.dataLength = dataLength;
   }
 
   public Collection<ShufflePartitionedBlock> getShuffleBlocks() {
@@ -81,8 +112,12 @@ public class ShuffleDataFlushEvent {
     return eventId;
   }
 
-  public long getSize() {
-    return size;
+  public long getEncodedLength() {
+    return encodedLength;
+  }
+
+  public long getDataLength() {
+    return dataLength;
   }
 
   public String getAppId() {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index c1b759b66..0d3d5ca0d 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -198,7 +198,7 @@ public class ShuffleFlushManager {
                 event.getStartPartition() + "_" + event.getEndPartition(),
                 event.getUnderStorage().getStorageHost(),
                 event.getUnderStorage().getStoragePath(),
-                event.getSize(),
+                event.getDataLength(),
                 DateFormatUtils.format(startTime, AUDIT_DATE_PATTERN),
                 DateFormatUtils.format(endTime, AUDIT_DATE_PATTERN),
                 endTime - startTime));
@@ -206,9 +206,9 @@ public class ShuffleFlushManager {
       if (null != shuffleTaskInfo) {
         String storageHost = event.getUnderStorage().getStorageHost();
         if (LocalStorage.STORAGE_HOST.equals(storageHost)) {
-          shuffleTaskInfo.addOnLocalFileDataSize(event.getSize());
+          shuffleTaskInfo.addOnLocalFileDataSize(event.getEncodedLength());
         } else {
-          shuffleTaskInfo.addOnHadoopDataSize(event.getSize());
+          shuffleTaskInfo.addOnHadoopDataSize(event.getEncodedLength());
         }
       }
     } finally {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
index c0f880b2d..41911e00d 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
@@ -40,7 +40,10 @@ public abstract class AbstractShuffleBuffer implements 
ShuffleBuffer {
 
   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractShuffleBuffer.class);
 
-  protected long size;
+  /** The memory cost size include encoded length */
+  protected long encodedLength;
+  /** The data size of this shuffle block */
+  protected long dataLength;
 
   protected AtomicLong inFlushSize = new AtomicLong();
 
@@ -48,7 +51,7 @@ public abstract class AbstractShuffleBuffer implements 
ShuffleBuffer {
   public static final long BUFFER_EVICTED = -1L;
 
   public AbstractShuffleBuffer() {
-    this.size = 0;
+    this.encodedLength = 0;
     this.evicted = false;
   }
 
@@ -70,8 +73,13 @@ public abstract class AbstractShuffleBuffer implements 
ShuffleBuffer {
   }
 
   @Override
-  public long getSize() {
-    return size;
+  public long getEncodedLength() {
+    return encodedLength;
+  }
+
+  @Override
+  public long getDataLength() {
+    return dataLength;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index 522f9a057..c181aba42 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -50,7 +50,11 @@ public interface ShuffleBuffer {
   ShuffleDataResult getShuffleData(
       long lastBlockId, int readBufferSize, Roaring64NavigableMap 
expectedTaskIds);
 
-  long getSize();
+  /** @return the buffer memory size include encoded length */
+  long getEncodedLength();
+
+  /** @return the buffer block data size */
+  long getDataLength();
 
   long getInFlushSize();
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 58ce7665b..b7b66be37 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -325,7 +325,7 @@ public class ShuffleBufferManager {
     // When we use multi storage and trigger single buffer flush, the buffer 
size should be bigger
     // than rss.server.flush.cold.storage.threshold.size, otherwise cold 
storage will be useless.
     if ((isHugePartition || this.bufferFlushEnabled)
-        && (buffer.getSize() > this.bufferFlushThreshold
+        && (buffer.getEncodedLength() > this.bufferFlushThreshold
             || buffer.getBlockCount() > bufferFlushBlocksNumThreshold)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug(
@@ -334,7 +334,7 @@ public class ShuffleBufferManager {
             startPartition,
             endPartition,
             isHugePartition,
-            buffer.getSize(),
+            buffer.getEncodedLength(),
             buffer.getBlockCount());
       }
       flushBuffer(buffer, appId, shuffleId, startPartition, endPartition, 
isHugePartition);
@@ -397,9 +397,9 @@ public class ShuffleBufferManager {
               () -> bufferPool.getOrDefault(appId, new 
HashMap<>()).containsKey(shuffleId),
               shuffleFlushManager.getDataDistributionType(appId));
       if (event != null) {
-        event.addCleanupCallback(() -> releaseMemory(event.getSize(), true, 
false));
-        updateShuffleSize(appId, shuffleId, -event.getSize());
-        inFlushSize.addAndGet(event.getSize());
+        event.addCleanupCallback(() -> releaseMemory(event.getEncodedLength(), 
true, false));
+        updateShuffleSize(appId, shuffleId, -event.getEncodedLength());
+        inFlushSize.addAndGet(event.getEncodedLength());
         if (isHugePartition) {
           event.markOwnedByHugePartition();
         }
@@ -575,7 +575,7 @@ public class ShuffleBufferManager {
                 shuffleIdToBuffers.getValue().asMapOfRanges().entrySet()) {
               Range<Integer> range = rangeEntry.getKey();
               ShuffleBuffer shuffleBuffer = rangeEntry.getValue();
-              pickedFlushSize += shuffleBuffer.getSize();
+              pickedFlushSize += shuffleBuffer.getEncodedLength();
               flushBuffer(
                   shuffleBuffer,
                   appId,
@@ -770,11 +770,11 @@ public class ShuffleBufferManager {
           // the actual released size by this thread
           long releasedSize = buffer.release();
           ShuffleServerMetrics.gaugeTotalPartitionNum.dec();
-          if (releasedSize != buffer.getSize()) {
+          if (releasedSize != buffer.getEncodedLength()) {
             LOG.warn(
                 "Release shuffle buffer size {} is not equal to buffer size 
{}, appId: {}, shuffleId: {}",
                 releasedSize,
-                buffer.getSize(),
+                buffer.getEncodedLength(),
                 appId,
                 shuffleId);
           }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index 215ef2f7f..553b2870b 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -55,20 +55,23 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
     if (evicted) {
       return BUFFER_EVICTED;
     }
-    long currentSize = 0;
+    long currentEncodedLength = 0;
+    long currentDataLength = 0;
 
     for (ShufflePartitionedBlock block : data.getBlockList()) {
       // If sendShuffleData retried, we may receive duplicate block. The 
duplicate
       // block would gc without release. Here we must release the duplicated 
block.
       if (blocks.add(block)) {
-        currentSize += block.getEncodedLength();
+        currentEncodedLength += block.getEncodedLength();
+        currentDataLength += block.getDataLength();
       } else {
         block.getData().release();
       }
     }
-    this.size += currentSize;
+    this.encodedLength += currentEncodedLength;
+    this.dataLength += currentDataLength;
 
-    return currentSize;
+    return currentEncodedLength;
   }
 
   @Override
@@ -97,17 +100,26 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
     long eventId = ShuffleFlushManager.ATOMIC_EVENT_ID.getAndIncrement();
     final ShuffleDataFlushEvent event =
         new ShuffleDataFlushEvent(
-            eventId, appId, shuffleId, startPartition, endPartition, size, 
spBlocks, isValid, this);
+            eventId,
+            appId,
+            shuffleId,
+            startPartition,
+            endPartition,
+            encodedLength,
+            dataLength,
+            spBlocks,
+            isValid,
+            this);
     event.addCleanupCallback(
         () -> {
           this.clearInFlushBuffer(event.getEventId());
           inFlushedQueueBlocks.forEach(spb -> spb.getData().release());
-          inFlushSize.addAndGet(-event.getSize());
+          inFlushSize.addAndGet(-event.getEncodedLength());
         });
     inFlushBlockMap.put(eventId, inFlushedQueueBlocks);
     blocks = new LinkedHashSet<>();
-    inFlushSize.addAndGet(size);
-    size = 0;
+    inFlushSize.addAndGet(encodedLength);
+    encodedLength = 0;
     return event;
   }
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index 414d9a66b..4a92c919f 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -62,7 +62,8 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
     if (evicted) {
       return BUFFER_EVICTED;
     }
-    long currentSize = 0;
+    long currentEncodedLength = 0;
+    long currentDataLength = 0;
 
     for (ShufflePartitionedBlock block : data.getBlockList()) {
       // If sendShuffleData retried, we may receive duplicate block. The 
duplicate
@@ -70,14 +71,16 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
       if (!blocksMap.containsKey(block.getBlockId())) {
         blocksMap.put(block.getBlockId(), block);
         blockCount++;
-        currentSize += block.getEncodedLength();
+        currentEncodedLength += block.getEncodedLength();
+        currentDataLength += block.getDataLength();
       } else {
         block.getData().release();
       }
     }
-    this.size += currentSize;
+    this.encodedLength += currentEncodedLength;
+    this.dataLength += currentDataLength;
 
-    return currentSize;
+    return currentEncodedLength;
   }
 
   @Override
@@ -95,18 +98,28 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
     long eventId = ShuffleFlushManager.ATOMIC_EVENT_ID.getAndIncrement();
     final ShuffleDataFlushEvent event =
         new ShuffleDataFlushEvent(
-            eventId, appId, shuffleId, startPartition, endPartition, size, 
spBlocks, isValid, this);
+            eventId,
+            appId,
+            shuffleId,
+            startPartition,
+            endPartition,
+            encodedLength,
+            dataLength,
+            spBlocks,
+            isValid,
+            this);
     event.addCleanupCallback(
         () -> {
           this.clearInFlushBuffer(event.getEventId());
           spBlocks.forEach(spb -> spb.getData().release());
-          inFlushSize.addAndGet(-event.getSize());
+          inFlushSize.addAndGet(-event.getEncodedLength());
         });
     inFlushBlockMap.put(eventId, blocksMap);
     blocksMap = newConcurrentSkipListMap();
     blockCount = 0;
-    inFlushSize.addAndGet(size);
-    size = 0;
+    inFlushSize.addAndGet(encodedLength);
+    encodedLength = 0;
+    dataLength = 0;
     return event;
   }
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
index 99c055e5f..adbbc594c 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
@@ -82,7 +82,7 @@ public class HadoopStorageManager extends 
SingleStorageManager {
       return;
     }
     ShuffleServerMetrics.incHadoopStorageWriteDataSize(
-        storage.getStorageHost(), event.getSize(), 
event.isOwnedByHugePartition());
+        storage.getStorageHost(), event.getDataLength(), 
event.isOwnedByHugePartition());
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 67ace22d8..c33c17f3b 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -275,11 +275,11 @@ public class LocalStorageManager extends 
SingleStorageManager {
     super.updateWriteMetrics(event, writeTime);
     ShuffleServerMetrics.counterTotalLocalFileWriteDataSize
         .labels(ShuffleServerMetrics.LOCAL_DISK_PATH_LABEL_ALL)
-        .inc(event.getSize());
+        .inc(event.getDataLength());
     if (event.getUnderStorage() != null) {
       ShuffleServerMetrics.counterTotalLocalFileWriteDataSize
           .labels(event.getUnderStorage().getStoragePath())
-          .inc(event.getSize());
+          .inc(event.getDataLength());
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
index 936a7550f..655375dd8 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
@@ -26,7 +26,6 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.storage.ApplicationStorageInfo;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RssUtils;
@@ -87,7 +86,7 @@ public abstract class SingleStorageManager implements 
StorageManager {
       if (metrics.getWriteTime() > writeSlowThreshold) {
         ShuffleServerMetrics.counterWriteSlow.inc();
       }
-      
ShuffleServerMetrics.counterTotalWriteDataSize.inc(metrics.getEventSize());
+      
ShuffleServerMetrics.counterTotalWriteDataSize.inc(metrics.getDataSize());
       
ShuffleServerMetrics.counterTotalWriteBlockSize.inc(metrics.getWriteBlocks());
       if (metrics.getEventSize() < eventSizeThresholdL1) {
         ShuffleServerMetrics.counterEventSizeThresholdLevel1.inc();
@@ -101,8 +100,8 @@ public abstract class SingleStorageManager implements 
StorageManager {
       String appId = event.getAppId();
       ApplicationStorageInfo appStorage =
           appStorageInfoMap.computeIfAbsent(appId, id -> new 
ApplicationStorageInfo(appId));
-      appStorage.incUsedBytes(event.getSize());
-      ShuffleServerMetrics.gaugeStorageUsedBytes.inc(event.getSize());
+      appStorage.incUsedBytes(event.getDataLength());
+      ShuffleServerMetrics.gaugeStorageUsedBytes.inc(event.getDataLength());
       if (event.getUnderStorage().containsWriteHandler(appId)) {
         appStorage.incFileNum(1);
         ShuffleServerMetrics.gaugeFlushFileNum.inc();
@@ -134,12 +133,7 @@ public abstract class SingleStorageManager implements 
StorageManager {
 
   public StorageWriteMetrics createStorageWriteMetrics(
       ShuffleDataFlushEvent event, long writeTime) {
-    long length = 0;
-    long blockNum = 0;
-    for (ShufflePartitionedBlock block : event.getShuffleBlocks()) {
-      length += block.getDataLength();
-      blockNum++;
-    }
+    long blockNum = event.getShuffleBlocks().size();
     List<Integer> partitions = Lists.newArrayList();
     for (int partition = event.getStartPartition();
         partition <= event.getEndPartition();
@@ -147,10 +141,10 @@ public abstract class SingleStorageManager implements 
StorageManager {
       partitions.add(partition);
     }
     return new StorageWriteMetrics(
-        event.getSize(),
+        event.getEncodedLength(),
         blockNum,
         writeTime,
-        length,
+        event.getDataLength(),
         partitions,
         event.getAppId(),
         event.getShuffleId());
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/hybrid/DefaultStorageManagerSelector.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/hybrid/DefaultStorageManagerSelector.java
index e9b73d878..c308f194f 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/hybrid/DefaultStorageManagerSelector.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/hybrid/DefaultStorageManagerSelector.java
@@ -39,7 +39,7 @@ public class DefaultStorageManagerSelector extends 
FallbackBasedStorageManagerSe
   @Override
   StorageManager regularSelect(ShuffleDataFlushEvent flushEvent) {
     StorageManager storageManager = warmStorageManager;
-    if (flushEvent.getSize() > flushColdStorageThresholdSize) {
+    if (flushEvent.getEncodedLength() > flushColdStorageThresholdSize) {
       storageManager = coldStorageManager;
     }
     return storageManager;
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 32531b876..ad3faf210 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
 import java.util.stream.IntStream;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -668,6 +669,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase 
{
     } while (size < expectedBlockNum);
   }
 
+  @VisibleForTesting
   public static ShuffleDataFlushEvent createShuffleDataFlushEvent(
       String appId,
       int shuffleId,
@@ -692,6 +694,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase 
{
         startPartition,
         endPartition,
         size,
+        spbs.stream().mapToInt(ShufflePartitionedBlock::getDataLength).sum(),
         spbs,
         isValid,
         null);
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index ab3b0cb97..13dbe94e1 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -177,9 +177,9 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     // validate buffer, no flush happened
     Map<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>> bufferPool =
         shuffleBufferManager.getBufferPool();
-    assertEquals(100, bufferPool.get(appId).get(1).get(0).getSize());
-    assertEquals(200, bufferPool.get(appId).get(2).get(0).getSize());
-    assertEquals(100, bufferPool.get(appId).get(3).get(0).getSize());
+    assertEquals(100, bufferPool.get(appId).get(1).get(0).getEncodedLength());
+    assertEquals(200, bufferPool.get(appId).get(2).get(0).getEncodedLength());
+    assertEquals(100, bufferPool.get(appId).get(3).get(0).getEncodedLength());
     // validate get shuffle data
     ShuffleDataResult sdr =
         shuffleBufferManager.getShuffleData(appId, 2, 0, 
Constants.INVALID_BLOCK_ID, 60);
@@ -300,16 +300,16 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     Map<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>> bufferPool =
         shuffleBufferManager.getBufferPool();
     ShuffleBuffer buffer = bufferPool.get(appId).get(shuffleId).get(0);
-    assertEquals(48, buffer.getSize());
+    assertEquals(48, buffer.getEncodedLength());
     assertEquals(48, shuffleBufferManager.getUsedMemory());
 
     shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, 
createData(0, 16));
-    assertEquals(96, buffer.getSize());
+    assertEquals(96, buffer.getEncodedLength());
     assertEquals(96, shuffleBufferManager.getUsedMemory());
 
     // reach high water lever, flush
     shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, 
createData(0, 273));
-    assertEquals(0, buffer.getSize());
+    assertEquals(0, buffer.getEncodedLength());
     assertEquals(401, shuffleBufferManager.getUsedMemory());
     assertEquals(401, shuffleBufferManager.getInFlushSize());
     verify(mockShuffleFlushManager, times(1)).addToFlushQueue(any());
@@ -334,9 +334,9 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     ShuffleBuffer buffer0 = bufferPool.get(appId).get(shuffleId).get(0);
     ShuffleBuffer buffer1 = bufferPool.get(appId).get(shuffleId).get(1);
     ShuffleBuffer buffer2 = bufferPool.get(appId).get(2).get(0);
-    assertEquals(0, buffer0.getSize());
-    assertEquals(0, buffer1.getSize());
-    assertEquals(64, buffer2.getSize());
+    assertEquals(0, buffer0.getEncodedLength());
+    assertEquals(0, buffer1.getEncodedLength());
+    assertEquals(64, buffer2.getEncodedLength());
     assertEquals(528, shuffleBufferManager.getUsedMemory());
     assertEquals(464, shuffleBufferManager.getInFlushSize());
     verify(mockShuffleFlushManager, times(3)).addToFlushQueue(any());
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
index 31c4492d2..acc86a6db 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedListTest.java
@@ -49,13 +49,13 @@ public class ShuffleBufferWithLinkedListTest extends 
BufferTestBase {
     ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
     shuffleBuffer.append(createData(10));
     // ShufflePartitionedBlock has constant 32 bytes overhead
-    assertEquals(42, shuffleBuffer.getSize());
+    assertEquals(42, shuffleBuffer.getEncodedLength());
 
     shuffleBuffer.append(createData(26));
-    assertEquals(100, shuffleBuffer.getSize());
+    assertEquals(100, shuffleBuffer.getEncodedLength());
 
     shuffleBuffer.append(createData(1));
-    assertEquals(133, shuffleBuffer.getSize());
+    assertEquals(133, shuffleBuffer.getEncodedLength());
   }
 
   @Test
@@ -67,7 +67,7 @@ public class ShuffleBufferWithLinkedListTest extends 
BufferTestBase {
     dataCombine[0] = data1.getBlockList()[0];
     dataCombine[1] = data2.getBlockList()[0];
     shuffleBuffer.append(new ShufflePartitionedData(1, dataCombine));
-    assertEquals(84, shuffleBuffer.getSize());
+    assertEquals(84, shuffleBuffer.getEncodedLength());
   }
 
   @Test
@@ -76,10 +76,10 @@ public class ShuffleBufferWithLinkedListTest extends 
BufferTestBase {
     ShuffleDataFlushEvent event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, 
null);
     assertNull(event);
     shuffleBuffer.append(createData(10));
-    assertEquals(42, shuffleBuffer.getSize());
+    assertEquals(42, shuffleBuffer.getEncodedLength());
     event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null);
-    assertEquals(42, event.getSize());
-    assertEquals(0, shuffleBuffer.getSize());
+    assertEquals(42, event.getEncodedLength());
+    assertEquals(0, shuffleBuffer.getEncodedLength());
     assertEquals(0, shuffleBuffer.getBlocks().size());
   }
 
@@ -590,11 +590,11 @@ public class ShuffleBufferWithLinkedListTest extends 
BufferTestBase {
     ShufflePartitionedData block = createData(10);
     shuffleBuffer.append(block);
     // ShufflePartitionedBlock has constant 32 bytes overhead
-    assertEquals(42, shuffleBuffer.getSize());
+    assertEquals(42, shuffleBuffer.getEncodedLength());
 
     shuffleBuffer.append(block);
     // The repeat block should not append to shuffleBuffer
-    assertEquals(42, shuffleBuffer.getSize());
+    assertEquals(42, shuffleBuffer.getEncodedLength());
   }
 
   private byte[] getExpectedData(ShufflePartitionedData... spds) {
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
index 7e293be34..87ce14569 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipListTest.java
@@ -42,13 +42,13 @@ public class ShuffleBufferWithSkipListTest extends 
BufferTestBase {
     ShuffleBuffer shuffleBuffer = new ShuffleBufferWithSkipList();
     shuffleBuffer.append(createData(10));
     // ShufflePartitionedBlock has constant 32 bytes overhead
-    assertEquals(42, shuffleBuffer.getSize());
+    assertEquals(42, shuffleBuffer.getEncodedLength());
 
     shuffleBuffer.append(createData(26));
-    assertEquals(100, shuffleBuffer.getSize());
+    assertEquals(100, shuffleBuffer.getEncodedLength());
 
     shuffleBuffer.append(createData(1));
-    assertEquals(133, shuffleBuffer.getSize());
+    assertEquals(133, shuffleBuffer.getEncodedLength());
   }
 
   @Test
@@ -60,7 +60,7 @@ public class ShuffleBufferWithSkipListTest extends 
BufferTestBase {
     dataCombine[0] = data1.getBlockList()[0];
     dataCombine[1] = data2.getBlockList()[0];
     shuffleBuffer.append(new ShufflePartitionedData(1, dataCombine));
-    assertEquals(84, shuffleBuffer.getSize());
+    assertEquals(84, shuffleBuffer.getEncodedLength());
   }
 
   @Test
@@ -69,10 +69,10 @@ public class ShuffleBufferWithSkipListTest extends 
BufferTestBase {
     ShuffleDataFlushEvent event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, 
null);
     assertNull(event);
     shuffleBuffer.append(createData(10));
-    assertEquals(42, shuffleBuffer.getSize());
+    assertEquals(42, shuffleBuffer.getEncodedLength());
     event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null);
-    assertEquals(42, event.getSize());
-    assertEquals(0, shuffleBuffer.getSize());
+    assertEquals(42, event.getEncodedLength());
+    assertEquals(0, shuffleBuffer.getEncodedLength());
     assertEquals(0, shuffleBuffer.getBlocks().size());
   }
 
@@ -203,11 +203,11 @@ public class ShuffleBufferWithSkipListTest extends 
BufferTestBase {
     ShufflePartitionedData block = createData(10);
     shuffleBuffer.append(block);
     // ShufflePartitionedBlock has constant 32 bytes overhead
-    assertEquals(42, shuffleBuffer.getSize());
+    assertEquals(42, shuffleBuffer.getEncodedLength());
 
     shuffleBuffer.append(block);
     // The repeat block should not append to shuffleBuffer
-    assertEquals(42, shuffleBuffer.getSize());
+    assertEquals(42, shuffleBuffer.getEncodedLength());
   }
 
   @Override

Reply via email to