This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 770eab124 [#2494] feat(spark): Add more statistics about overlapping
decompression (#2633)
770eab124 is described below
commit 770eab1245d3df79d41b11f985aa387a17a6104b
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Sep 26 17:34:06 2025 +0800
[#2494] feat(spark): Add more statistics about overlapping decompression
(#2633)
### What changes were proposed in this pull request?
1. Add more statistics about overlapping decompression to measure speedup
ratio
### Why are the changes needed?
#2494
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Neen't
---
.../shuffle/reader/RssShuffleDataIterator.java | 1 +
.../uniffle/client/impl/DecompressionWorker.java | 37 +++++++++++++++++++---
.../client/response/DecompressedShuffleBlock.java | 12 +++++--
3 files changed, 43 insertions(+), 7 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
index 4a829b3a4..015fd16e6 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
@@ -145,6 +145,7 @@ public class RssShuffleDataIterator<K, C> extends
AbstractIterator<Product2<K, C
decompressed = uncompressedData;
} else {
decompressed = shuffleBlock.getByteBuffer();
+ unCompressedBytesLength += shuffleBlock.getUncompressLength();
}
long uncompressionDuration = System.currentTimeMillis() -
startUncompression;
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
index 46b0f71f3..c57f5519c 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,6 +33,7 @@ import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.util.JavaUtils;
+import org.apache.uniffle.common.util.ThreadUtils;
public class DecompressionWorker {
private static final Logger LOG =
LoggerFactory.getLogger(DecompressionWorker.class);
@@ -39,9 +41,13 @@ public class DecompressionWorker {
private final ExecutorService executorService;
private final ConcurrentHashMap<Integer, ConcurrentHashMap<Integer,
DecompressedShuffleBlock>>
tasks;
- private Codec codec;
- private final ThreadLocal<ByteBuffer> bufferLocal =
- ThreadLocal.withInitial(() -> ByteBuffer.allocate(0));
+ private final Codec codec;
+
+ private final AtomicLong decompressionMillis = new AtomicLong(0);
+ private final AtomicLong decompressionBufferAllocationMillis = new
AtomicLong(0);
+
+ // the millis for the block get operation to measure profit from overlapping
decompression
+ private final AtomicLong waitMillis = new AtomicLong(0);
public DecompressionWorker(Codec codec, int threads) {
if (codec == null) {
@@ -51,7 +57,8 @@ public class DecompressionWorker {
throw new IllegalArgumentException("Threads must be greater than 0");
}
this.tasks = JavaUtils.newConcurrentMap();
- this.executorService = Executors.newFixedThreadPool(threads);
+ this.executorService =
+ Executors.newFixedThreadPool(threads,
ThreadUtils.getThreadFactory("decompressionWorker"));
this.codec = codec;
}
@@ -74,17 +81,27 @@ public class DecompressionWorker {
buffer.limit(offset + length);
int uncompressedLen = bufferSegment.getUncompressLength();
+
+ long startBufferAllocation = System.currentTimeMillis();
ByteBuffer dst =
buffer.isDirect()
? ByteBuffer.allocateDirect(uncompressedLen)
: ByteBuffer.allocate(uncompressedLen);
+ decompressionBufferAllocationMillis.addAndGet(
+ System.currentTimeMillis() - startBufferAllocation);
+
+ long startDecompression = System.currentTimeMillis();
codec.decompress(buffer, uncompressedLen, dst, 0);
+ decompressionMillis.addAndGet(System.currentTimeMillis() -
startDecompression);
+
return dst;
},
executorService);
ConcurrentHashMap<Integer, DecompressedShuffleBlock> blocks =
tasks.computeIfAbsent(batchIndex, k -> new ConcurrentHashMap<>());
- blocks.put(index++, new DecompressedShuffleBlock(f));
+ blocks.put(
+ index++,
+ new DecompressedShuffleBlock(f, waitMillis ->
this.waitMillis.addAndGet(waitMillis)));
}
}
@@ -98,6 +115,16 @@ public class DecompressionWorker {
}
public void close() {
+ long bufferAllocation = decompressionBufferAllocationMillis.get();
+ long decompression = decompressionMillis.get();
+ long wait = waitMillis.get();
+ LOG.info(
+ "The statistic of overlapping compression is that bufferAllocation:
{}(ms), "
+ + "decompression: {}(ms), wait: {}(ms),
overlappingRatio((bufferAllocation+decompression)/wait)={}",
+ bufferAllocation,
+ decompression,
+ wait,
+ wait == 0 ? 0 : (bufferAllocation + decompression) / wait);
executorService.shutdown();
}
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
b/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
index 9723a7f5f..0462bb768 100644
---
a/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
+++
b/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
@@ -19,14 +19,17 @@ package org.apache.uniffle.client.response;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
import org.apache.uniffle.common.exception.RssException;
public class DecompressedShuffleBlock implements ShuffleBlock {
private CompletableFuture<ByteBuffer> f;
+ private Consumer<Long> waitMillisCallback;
- public DecompressedShuffleBlock(CompletableFuture<ByteBuffer> f) {
+ public DecompressedShuffleBlock(CompletableFuture<ByteBuffer> f,
Consumer<Long> consumer) {
this.f = f;
+ this.waitMillisCallback = consumer;
}
@Override
@@ -38,7 +41,12 @@ public class DecompressedShuffleBlock implements
ShuffleBlock {
@Override
public ByteBuffer getByteBuffer() {
try {
- return f.get();
+ long startTime = System.currentTimeMillis();
+ ByteBuffer buffer = f.get();
+ if (waitMillisCallback != null) {
+ waitMillisCallback.accept(System.currentTimeMillis() - startTime);
+ }
+ return buffer;
} catch (Exception e) {
throw new RssException(e);
}