This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a46e2d0c [#2599] fix(spark): Fix bug the incorrect shuffle read 
metric for spark (#2600)
1a46e2d0c is described below

commit 1a46e2d0ce3775826654271e9a86e1287ea0a338
Author: Neo Chien <[email protected]>
AuthorDate: Tue Sep 16 14:07:01 2025 +0800

    [#2599] fix(spark): Fix bug the incorrect shuffle read metric for spark 
(#2600)
    
    ### What changes were proposed in this pull request?
    
    Fix bug: Incorrect shuffle read metric for Spark
    
    ### Why are the changes needed?
    for https://github.com/apache/uniffle/issues/2599
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    UT
---
 .../org/apache/spark/shuffle/reader/RssShuffleDataIterator.java  | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
index 3d5b740ef..4a829b3a4 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
@@ -130,14 +130,14 @@ public class RssShuffleDataIterator<K, C> extends 
AbstractIterator<Product2<K, C
       ShuffleBlock shuffleBlock = shuffleReadClient.readShuffleBlockData();
       ByteBuffer rawData = shuffleBlock != null ? shuffleBlock.getByteBuffer() 
: null;
 
-      long fetchDuration = System.currentTimeMillis() - startFetch;
-      shuffleReadMetrics.incFetchWaitTime(fetchDuration);
+      long readDuration = System.currentTimeMillis() - startFetch;
       if (rawData != null) {
         // collect metrics from raw data
         long rawDataLength = rawData.limit() - rawData.position();
         totalRawBytesLength += rawDataLength;
         shuffleReadMetrics.incRemoteBytesRead(rawDataLength);
 
+        long startUncompression = System.currentTimeMillis();
         // get initial data
         ByteBuffer decompressed = null;
         if (shuffleBlock instanceof CompressedShuffleBlock) {
@@ -146,12 +146,15 @@ public class RssShuffleDataIterator<K, C> extends 
AbstractIterator<Product2<K, C
         } else {
           decompressed = shuffleBlock.getByteBuffer();
         }
+        long uncompressionDuration = System.currentTimeMillis() - 
startUncompression;
 
         // create new iterator for shuffle data
         long startSerialization = System.currentTimeMillis();
         recordsIterator = createKVIterator(decompressed);
         long serializationDuration = System.currentTimeMillis() - 
startSerialization;
-        readTime += fetchDuration;
+        shuffleReadMetrics.incFetchWaitTime(
+            serializationDuration + uncompressionDuration + readDuration);
+        readTime += readDuration;
         serializeTime += serializationDuration;
       } else {
         // finish reading records, check data consistent

Reply via email to