This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 770eab124 [#2494] feat(spark): Add more statistics about overlapping 
decompression (#2633)
770eab124 is described below

commit 770eab1245d3df79d41b11f985aa387a17a6104b
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Sep 26 17:34:06 2025 +0800

    [#2494] feat(spark): Add more statistics about overlapping decompression 
(#2633)
    
    ### What changes were proposed in this pull request?
    
    1. Add more statistics about overlapping decompression to measure speedup 
ratio
    
    ### Why are the changes needed?
    
    #2494
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Neen't
---
 .../shuffle/reader/RssShuffleDataIterator.java     |  1 +
 .../uniffle/client/impl/DecompressionWorker.java   | 37 +++++++++++++++++++---
 .../client/response/DecompressedShuffleBlock.java  | 12 +++++--
 3 files changed, 43 insertions(+), 7 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
index 4a829b3a4..015fd16e6 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
@@ -145,6 +145,7 @@ public class RssShuffleDataIterator<K, C> extends 
AbstractIterator<Product2<K, C
           decompressed = uncompressedData;
         } else {
           decompressed = shuffleBlock.getByteBuffer();
+          unCompressedBytesLength += shuffleBlock.getUncompressLength();
         }
         long uncompressionDuration = System.currentTimeMillis() - 
startUncompression;
 
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java 
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
index 46b0f71f3..c57f5519c 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,6 +33,7 @@ import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.compression.Codec;
 import org.apache.uniffle.common.util.JavaUtils;
+import org.apache.uniffle.common.util.ThreadUtils;
 
 public class DecompressionWorker {
   private static final Logger LOG = 
LoggerFactory.getLogger(DecompressionWorker.class);
@@ -39,9 +41,13 @@ public class DecompressionWorker {
   private final ExecutorService executorService;
   private final ConcurrentHashMap<Integer, ConcurrentHashMap<Integer, 
DecompressedShuffleBlock>>
       tasks;
-  private Codec codec;
-  private final ThreadLocal<ByteBuffer> bufferLocal =
-      ThreadLocal.withInitial(() -> ByteBuffer.allocate(0));
+  private final Codec codec;
+
+  private final AtomicLong decompressionMillis = new AtomicLong(0);
+  private final AtomicLong decompressionBufferAllocationMillis = new 
AtomicLong(0);
+
+  // the millis for the block get operation to measure profit from overlapping 
decompression
+  private final AtomicLong waitMillis = new AtomicLong(0);
 
   public DecompressionWorker(Codec codec, int threads) {
     if (codec == null) {
@@ -51,7 +57,8 @@ public class DecompressionWorker {
       throw new IllegalArgumentException("Threads must be greater than 0");
     }
     this.tasks = JavaUtils.newConcurrentMap();
-    this.executorService = Executors.newFixedThreadPool(threads);
+    this.executorService =
+        Executors.newFixedThreadPool(threads, 
ThreadUtils.getThreadFactory("decompressionWorker"));
     this.codec = codec;
   }
 
@@ -74,17 +81,27 @@ public class DecompressionWorker {
                 buffer.limit(offset + length);
 
                 int uncompressedLen = bufferSegment.getUncompressLength();
+
+                long startBufferAllocation = System.currentTimeMillis();
                 ByteBuffer dst =
                     buffer.isDirect()
                         ? ByteBuffer.allocateDirect(uncompressedLen)
                         : ByteBuffer.allocate(uncompressedLen);
+                decompressionBufferAllocationMillis.addAndGet(
+                    System.currentTimeMillis() - startBufferAllocation);
+
+                long startDecompression = System.currentTimeMillis();
                 codec.decompress(buffer, uncompressedLen, dst, 0);
+                decompressionMillis.addAndGet(System.currentTimeMillis() - 
startDecompression);
+
                 return dst;
               },
               executorService);
       ConcurrentHashMap<Integer, DecompressedShuffleBlock> blocks =
           tasks.computeIfAbsent(batchIndex, k -> new ConcurrentHashMap<>());
-      blocks.put(index++, new DecompressedShuffleBlock(f));
+      blocks.put(
+          index++,
+          new DecompressedShuffleBlock(f, waitMillis -> 
this.waitMillis.addAndGet(waitMillis)));
     }
   }
 
@@ -98,6 +115,16 @@ public class DecompressionWorker {
   }
 
   public void close() {
+    long bufferAllocation = decompressionBufferAllocationMillis.get();
+    long decompression = decompressionMillis.get();
+    long wait = waitMillis.get();
+    LOG.info(
+        "The statistic of overlapping compression is that bufferAllocation: 
{}(ms), "
+            + "decompression: {}(ms), wait: {}(ms), 
overlappingRatio((bufferAllocation+decompression)/wait)={}",
+        bufferAllocation,
+        decompression,
+        wait,
+        wait == 0 ? 0 : (bufferAllocation + decompression) / wait);
     executorService.shutdown();
   }
 }
diff --git 
a/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
 
b/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
index 9723a7f5f..0462bb768 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
@@ -19,14 +19,17 @@ package org.apache.uniffle.client.response;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 
 import org.apache.uniffle.common.exception.RssException;
 
 public class DecompressedShuffleBlock implements ShuffleBlock {
   private CompletableFuture<ByteBuffer> f;
+  private Consumer<Long> waitMillisCallback;
 
-  public DecompressedShuffleBlock(CompletableFuture<ByteBuffer> f) {
+  public DecompressedShuffleBlock(CompletableFuture<ByteBuffer> f, 
Consumer<Long> consumer) {
     this.f = f;
+    this.waitMillisCallback = consumer;
   }
 
   @Override
@@ -38,7 +41,12 @@ public class DecompressedShuffleBlock implements 
ShuffleBlock {
   @Override
   public ByteBuffer getByteBuffer() {
     try {
-      return f.get();
+      long startTime = System.currentTimeMillis();
+      ByteBuffer buffer = f.get();
+      if (waitMillisCallback != null) {
+        waitMillisCallback.accept(System.currentTimeMillis() - startTime);
+      }
+      return buffer;
     } catch (Exception e) {
       throw new RssException(e);
     }

Reply via email to