This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 1a46e2d0c [#2599] fix(spark): Fix bug the incorrect shuffle read
metric for spark (#2600)
1a46e2d0c is described below
commit 1a46e2d0ce3775826654271e9a86e1287ea0a338
Author: Neo Chien <[email protected]>
AuthorDate: Tue Sep 16 14:07:01 2025 +0800
[#2599] fix(spark): Fix bug the incorrect shuffle read metric for spark
(#2600)
### What changes were proposed in this pull request?
Fix bug: Incorrect shuffle read metric for Spark
### Why are the changes needed?
for https://github.com/apache/uniffle/issues/2599
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
UT
---
.../org/apache/spark/shuffle/reader/RssShuffleDataIterator.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
index 3d5b740ef..4a829b3a4 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
@@ -130,14 +130,14 @@ public class RssShuffleDataIterator<K, C> extends
AbstractIterator<Product2<K, C
ShuffleBlock shuffleBlock = shuffleReadClient.readShuffleBlockData();
ByteBuffer rawData = shuffleBlock != null ? shuffleBlock.getByteBuffer()
: null;
- long fetchDuration = System.currentTimeMillis() - startFetch;
- shuffleReadMetrics.incFetchWaitTime(fetchDuration);
+ long readDuration = System.currentTimeMillis() - startFetch;
if (rawData != null) {
// collect metrics from raw data
long rawDataLength = rawData.limit() - rawData.position();
totalRawBytesLength += rawDataLength;
shuffleReadMetrics.incRemoteBytesRead(rawDataLength);
+ long startUncompression = System.currentTimeMillis();
// get initial data
ByteBuffer decompressed = null;
if (shuffleBlock instanceof CompressedShuffleBlock) {
@@ -146,12 +146,15 @@ public class RssShuffleDataIterator<K, C> extends
AbstractIterator<Product2<K, C
} else {
decompressed = shuffleBlock.getByteBuffer();
}
+ long uncompressionDuration = System.currentTimeMillis() -
startUncompression;
// create new iterator for shuffle data
long startSerialization = System.currentTimeMillis();
recordsIterator = createKVIterator(decompressed);
long serializationDuration = System.currentTimeMillis() -
startSerialization;
- readTime += fetchDuration;
+ shuffleReadMetrics.incFetchWaitTime(
+ serializationDuration + uncompressionDuration + readDuration);
+ readTime += readDuration;
serializeTime += serializationDuration;
} else {
// finish reading records, check data consistent