This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a32171b9 [#2569] feat(spark): Add statistic of shuffle read times 
(#2598)
2a32171b9 is described below

commit 2a32171b9724272276797ede17eb057027df5767
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Sep 4 17:26:41 2025 +0800

    [#2569] feat(spark): Add statistic of shuffle read times (#2598)
    
    ### What changes were proposed in this pull request?
    
    Add statistic of shuffle read times to find the bottleneck for shuffle 
reading
    
    ### Why are the changes needed?
    
    for #2569
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Tests in cluster
---
 .../hadoop/mapreduce/task/reduce/FetcherTest.java  |  6 ++
 .../shuffle/events/TaskShuffleReadInfoEvent.java   | 11 ++-
 .../shuffle/reader/RssShuffleDataIterator.java     |  8 ++
 .../shuffle/manager/ShuffleManagerGrpcService.java |  4 +-
 .../scala/org/apache/spark/UniffleListener.scala   |  6 ++
 .../org/apache/spark/UniffleStatusStore.scala      | 16 ++++
 .../scala/org/apache/spark/ui/ShufflePage.scala    | 63 ++++++++++++--
 .../spark/shuffle/reader/RssShuffleReader.java     |  7 +-
 .../common/shuffle/impl/RssTezFetcherTest.java     |  6 ++
 .../RssTezShuffleDataFetcherTest.java              |  6 ++
 .../uniffle/client/api/ShuffleReadClient.java      |  3 +
 .../uniffle/client/impl/ShuffleReadClientImpl.java |  6 ++
 .../apache/uniffle/common/ShuffleReadTimes.java    | 97 ++++++++++++++++++++++
 .../request/RssReportShuffleReadMetricRequest.java |  7 +-
 proto/src/main/proto/Rss.proto                     |  9 ++
 15 files changed, 244 insertions(+), 11 deletions(-)

diff --git 
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
 
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
index 871f9e58f..85cc09614 100644
--- 
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
+++ 
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
@@ -69,6 +69,7 @@ import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleAssignmentsInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.ShuffleReadTimes;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.compression.Codec;
 import org.apache.uniffle.common.compression.Lz4Codec;
@@ -621,6 +622,11 @@ public class FetcherTest {
 
     @Override
     public void logStatics() {}
+
+    @Override
+    public ShuffleReadTimes getShuffleReadTimes() {
+      return new ShuffleReadTimes();
+    }
   }
 
   static class MockedReporter implements Reporter {
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
index 6e999d898..fb897ee72 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
@@ -19,6 +19,8 @@ package org.apache.spark.shuffle.events;
 
 import java.util.Map;
 
+import org.apache.uniffle.common.ShuffleReadTimes;
+
 public class TaskShuffleReadInfoEvent extends UniffleEvent {
   private int stageId;
   private int shuffleId;
@@ -26,6 +28,7 @@ public class TaskShuffleReadInfoEvent extends UniffleEvent {
   private Map<String, ShuffleReadMetric> metrics;
   private boolean isShuffleReadFailed;
   private String failureReason;
+  private ShuffleReadTimes shuffleReadTimes;
 
   public TaskShuffleReadInfoEvent(
       int stageId,
@@ -33,13 +36,15 @@ public class TaskShuffleReadInfoEvent extends UniffleEvent {
       long taskId,
       Map<String, ShuffleReadMetric> metrics,
       boolean isShuffleReadFailed,
-      String failureReason) {
+      String failureReason,
+      ShuffleReadTimes shuffleReadTimes) {
     this.stageId = stageId;
     this.shuffleId = shuffleId;
     this.taskId = taskId;
     this.metrics = metrics;
     this.isShuffleReadFailed = isShuffleReadFailed;
     this.failureReason = failureReason;
+    this.shuffleReadTimes = shuffleReadTimes;
   }
 
   public int getStageId() {
@@ -65,4 +70,8 @@ public class TaskShuffleReadInfoEvent extends UniffleEvent {
   public String getFailureReason() {
     return failureReason;
   }
+
+  public ShuffleReadTimes getShuffleReadTimes() {
+    return shuffleReadTimes;
+  }
 }
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
index bd0ae0ccb..e6bec30e8 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
+import org.apache.uniffle.common.ShuffleReadTimes;
 import org.apache.uniffle.common.compression.Codec;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.util.RssUtils;
@@ -230,4 +231,11 @@ public class RssShuffleDataIterator<K, C> extends 
AbstractIterator<Product2<K, C
   protected ShuffleReadMetrics getShuffleReadMetrics() {
     return shuffleReadMetrics;
   }
+
+  public ShuffleReadTimes getReadTimes() {
+    ShuffleReadTimes times = shuffleReadClient.getShuffleReadTimes();
+    times.withDecompressed(decompressTime);
+    times.withDeserialized(serializeTime);
+    return times;
+  }
 }
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index b2ce7fbc9..9a7b48eb4 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -44,6 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ReceivingFailureServer;
+import org.apache.uniffle.common.ShuffleReadTimes;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.JavaUtils;
@@ -768,7 +769,8 @@ public class ShuffleManagerGrpcService extends 
ShuffleManagerImplBase {
                                 x.getValue().getHadoopDurationMillis(),
                                 x.getValue().getHadoopByteSize()))),
             request.getIsTaskReadFailed(),
-            request.getShuffleReadFailureReason());
+            request.getShuffleReadFailureReason(),
+            ShuffleReadTimes.fromProto(request.getShuffleReadTimes()));
     RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(event);
     RssProtos.ReportShuffleReadMetricResponse reply =
         RssProtos.ReportShuffleReadMetricResponse.newBuilder()
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
index 8f4aeb529..94fd0cc81 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
@@ -22,6 +22,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, 
SparkListenerJobEnd, SparkListenerJobStart, SparkListenerTaskEnd}
 import org.apache.spark.shuffle.events.{ShuffleAssignmentInfoEvent, 
ShuffleWriteTimes, TaskReassignInfoEvent, TaskShuffleReadInfoEvent, 
TaskShuffleWriteInfoEvent}
 import org.apache.spark.status.ElementTrackingStore
+import org.apache.uniffle.common.ShuffleReadTimes
 
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicLong
@@ -30,6 +31,7 @@ import scala.collection.JavaConverters.mapAsScalaMapConverter
 class UniffleListener(conf: SparkConf, kvstore: ElementTrackingStore)
   extends SparkListener with Logging {
 
+  private val aggregatedShuffleReadTimes = new ShuffleReadTimes()
   private val aggregatedShuffleWriteTimes = new ShuffleWriteTimes()
   private val aggregatedShuffleWriteMetric = new ConcurrentHashMap[String, 
AggregatedShuffleWriteMetric]
   private val aggregatedShuffleReadMetric = new ConcurrentHashMap[String, 
AggregatedShuffleReadMetric]
@@ -65,6 +67,9 @@ class UniffleListener(conf: SparkConf, kvstore: 
ElementTrackingStore)
       kvstore.write(
         AggregatedShuffleWriteTimesUIData(aggregatedShuffleWriteTimes)
       )
+      kvstore.write(
+        AggregatedShuffleReadTimesUIData(aggregatedShuffleReadTimes)
+      )
     }
   }
 
@@ -137,6 +142,7 @@ class UniffleListener(conf: SparkConf, kvstore: 
ElementTrackingStore)
       agg_metric.hadoopByteSize += rmetric.getHadoopByteSize
       agg_metric.hadoopDurationMillis += rmetric.getHadoopDurationMillis
     }
+    aggregatedShuffleReadTimes.merge(event.getShuffleReadTimes)
   }
 
   override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
index 931a1ce53..098013e05 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
@@ -22,6 +22,7 @@ import org.apache.spark.shuffle.events.{ShuffleWriteTimes, 
TaskReassignInfoEvent
 import org.apache.spark.status.KVUtils.KVIndexParam
 import org.apache.spark.util.Utils
 import org.apache.spark.util.kvstore.{KVIndex, KVStore, KVStoreView}
+import org.apache.uniffle.common.ShuffleReadTimes
 
 import java.util.concurrent.ConcurrentHashMap
 import scala.collection.JavaConverters.asScalaIteratorConverter
@@ -49,6 +50,15 @@ class UniffleStatusStore(store: KVStore) {
     }
   }
 
+  def shuffleReadTimes(): AggregatedShuffleReadTimesUIData = {
+    val kClass = classOf[AggregatedShuffleReadTimesUIData]
+    try {
+      store.read(kClass, kClass.getName)
+    } catch {
+      case _: NoSuchElementException => AggregatedShuffleReadTimesUIData(new 
ShuffleReadTimes())
+    }
+  }
+
   def shuffleWriteTimes(): AggregatedShuffleWriteTimesUIData = {
     val kClass = classOf[AggregatedShuffleWriteTimesUIData]
     try {
@@ -164,6 +174,12 @@ case class AggregatedShuffleWriteTimesUIData(times: 
ShuffleWriteTimes) {
   def id: String = classOf[AggregatedShuffleWriteTimesUIData].getName()
 }
 
+case class AggregatedShuffleReadTimesUIData(times: ShuffleReadTimes) {
+  @JsonIgnore
+  @KVIndex
+  def id: String = classOf[AggregatedShuffleReadTimesUIData].getName()
+}
+
 case class ReassignInfoUIData(event: TaskReassignInfoEvent) {
   @JsonIgnore
   @KVIndex
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index e4b32bdb8..3fadde1e9 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -40,6 +40,15 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("") 
with Logging {
     </td>
   </tr>
 
+  private def shuffleReadTimesRow(kv: Seq[String]) = <tr>
+    <td>{kv(0)}</td>
+    <td>{kv(1)}</td>
+    <td>{kv(2)}</td>
+    <td>{kv(3)}</td>
+    <td>{kv(4)}</td>
+    <td>{kv(5)}</td>
+  </tr>
+
   private def shuffleWriteTimesRow(kv: Seq[String]) = <tr>
     <td>{kv(0)}</td>
     <td>{kv(1)}</td>
@@ -146,9 +155,36 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
       fixedWidth = true
     )
 
+    // render shuffle read times
+    val readTimes = runtimeStatusStore.shuffleReadTimes().times
+    val readTotal = if (readTimes.getTotal <= 0) -1 else readTimes.getTotal
+    val readTimesUI = UIUtils.listingTable(
+      Seq("Total", "Fetch", "Copy", "CRC", "Decompress", "Deserialize"),
+      shuffleReadTimesRow,
+      Seq(
+        Seq(
+          UIUtils.formatDuration(readTotal),
+          UIUtils.formatDuration(readTimes.getFetch),
+          UIUtils.formatDuration(readTimes.getCopy),
+          UIUtils.formatDuration(readTimes.getCrc),
+          UIUtils.formatDuration(readTimes.getDecompress),
+          UIUtils.formatDuration(readTimes.getDeserialize),
+        ),
+        Seq(
+          1,
+          readTimes.getFetch.toDouble / readTotal,
+          readTimes.getCopy.toDouble / readTotal,
+          readTimes.getCrc.toDouble / readTotal,
+          readTimes.getDecompress.toDouble / readTotal,
+          readTimes.getDeserialize.toDouble / readTotal,
+        ).map(x => roundToTwoDecimals(x).toString)
+      ),
+      fixedWidth = true
+    )
+
     // render shuffle write times
     val writeTimes = runtimeStatusStore.shuffleWriteTimes().times
-    val total = if (writeTimes.getTotal <= 0) -1 else writeTimes.getTotal
+    val writeTotal = if (writeTimes.getTotal <= 0) -1 else writeTimes.getTotal
     val writeTimesUI = UIUtils.listingTable(
       Seq("Total Time", "Wait Finish Time", "Copy Time", "Serialize Time", 
"Compress Time", "Sort Time", "Require Memory Time"),
       shuffleWriteTimesRow,
@@ -164,12 +200,12 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
         ),
         Seq(
           1.toDouble,
-          writeTimes.getWaitFinish.toDouble / total,
-          writeTimes.getCopy.toDouble / total,
-          writeTimes.getSerialize.toDouble / total,
-          writeTimes.getCompress.toDouble / total,
-          writeTimes.getSort.toDouble / total,
-          writeTimes.getRequireMemory.toDouble / total,
+          writeTimes.getWaitFinish.toDouble / writeTotal,
+          writeTimes.getCopy.toDouble / writeTotal,
+          writeTimes.getSerialize.toDouble / writeTotal,
+          writeTimes.getCompress.toDouble / writeTotal,
+          writeTimes.getSort.toDouble / writeTotal,
+          writeTimes.getRequireMemory.toDouble / writeTotal,
         ).map(x => roundToTwoDecimals(x).toString)
       ),
       fixedWidth = true
@@ -407,6 +443,19 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
             {writeTimesUI}
           </div>
         </div>
+
+        <div>
+          <span class="collapse-read-times-properties collapse-table"
+                onClick="collapseTable('collapse-read-times-properties', 
'read-times-table')">
+            <h4>
+              <span class="collapse-table-arrow arrow-closed"></span>
+              <a>Shuffle Read Times</a>
+            </h4>
+          </span>
+          <div class="read-times-table collapsible-table collapsed">
+            {readTimesUI}
+          </div>
+        </div>
       </div>
     }
 
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 4113f0627..d6bb9b210 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -60,6 +60,7 @@ import 
org.apache.uniffle.client.request.RssReportShuffleReadMetricRequest;
 import org.apache.uniffle.client.response.RssReportShuffleReadMetricResponse;
 import org.apache.uniffle.client.util.RssClientConfig;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.ShuffleReadTimes;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
@@ -99,6 +100,8 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
   private boolean isShuffleReadFailed = false;
   private Optional<String> shuffleReadReason = Optional.empty();
 
+  private ShuffleReadTimes shuffleReadTimes = new ShuffleReadTimes();
+
   public RssShuffleReader(
       int startPartition,
       int endPartition,
@@ -314,6 +317,7 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
                 iterator,
                 FunctionUtils.once(
                     () -> {
+                      shuffleReadTimes.merge(iterator.getReadTimes());
                       context.taskMetrics().mergeShuffleReadMetrics();
                       return iterator.cleanup();
                     }));
@@ -389,7 +393,8 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
                                           
x.getValue().getHadoopReadLocalFileDurationMillis(),
                                           
x.getValue().getHadoopReadLocalFileBytes()))),
                       isShuffleReadFailed,
-                      shuffleReadReason));
+                      shuffleReadReason,
+                      shuffleReadTimes));
           if (response != null && response.getStatusCode() != 
StatusCode.SUCCESS) {
             LOG.error("Errors on reporting shuffle read metrics to driver");
           }
diff --git 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTest.java
 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTest.java
index c36c2f297..fdf1f84cd 100644
--- 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTest.java
+++ 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTest.java
@@ -55,6 +55,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
+import org.apache.uniffle.common.ShuffleReadTimes;
 import org.apache.uniffle.common.compression.Codec;
 import org.apache.uniffle.common.compression.Lz4Codec;
 import org.apache.uniffle.common.config.RssConf;
@@ -359,6 +360,11 @@ public class RssTezFetcherTest {
 
     @Override
     public void logStatics() {}
+
+    @Override
+    public ShuffleReadTimes getShuffleReadTimes() {
+      return new ShuffleReadTimes();
+    }
   }
 
   /**
diff --git 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcherTest.java
 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcherTest.java
index c6b2efd77..2ea90a923 100644
--- 
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcherTest.java
+++ 
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcherTest.java
@@ -66,6 +66,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
+import org.apache.uniffle.common.ShuffleReadTimes;
 import org.apache.uniffle.common.compression.Codec;
 import org.apache.uniffle.common.compression.Lz4Codec;
 
@@ -397,5 +398,10 @@ public class RssTezShuffleDataFetcherTest {
 
     @Override
     public void logStatics() {}
+
+    @Override
+    public ShuffleReadTimes getShuffleReadTimes() {
+      return new ShuffleReadTimes();
+    }
   }
 }
diff --git 
a/client/src/main/java/org/apache/uniffle/client/api/ShuffleReadClient.java 
b/client/src/main/java/org/apache/uniffle/client/api/ShuffleReadClient.java
index 33270dfc4..f4e7fe1f8 100644
--- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleReadClient.java
+++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleReadClient.java
@@ -18,6 +18,7 @@
 package org.apache.uniffle.client.api;
 
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
+import org.apache.uniffle.common.ShuffleReadTimes;
 
 public interface ShuffleReadClient {
 
@@ -28,4 +29,6 @@ public interface ShuffleReadClient {
   void close();
 
   void logStatics();
+
+  ShuffleReadTimes getShuffleReadTimes();
 }
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 44e8a3f67..20497cb99 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -40,6 +40,7 @@ import org.apache.uniffle.client.util.DefaultIdHelper;
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShuffleReadTimes;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
@@ -356,4 +357,9 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
             + " ms");
     clientReadHandler.logConsumedBlockInfo();
   }
+
+  @Override
+  public ShuffleReadTimes getShuffleReadTimes() {
+    return new ShuffleReadTimes(readDataTime.get(), copyTime.get(), 
crcCheckTime.get());
+  }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/ShuffleReadTimes.java 
b/common/src/main/java/org/apache/uniffle/common/ShuffleReadTimes.java
new file mode 100644
index 000000000..7be409fc0
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleReadTimes.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+import org.apache.uniffle.proto.RssProtos;
+
+/** The unit is millis */
+public class ShuffleReadTimes {
+  private long fetch;
+  private long crc;
+  private long copy;
+  private long deserialize;
+  private long decompress;
+
+  public ShuffleReadTimes() {}
+
+  public ShuffleReadTimes(long fetch, long crc, long copy) {
+    this.fetch = fetch;
+    this.crc = crc;
+    this.copy = copy;
+  }
+
+  public long getFetch() {
+    return fetch;
+  }
+
+  public long getCrc() {
+    return crc;
+  }
+
+  public long getCopy() {
+    return copy;
+  }
+
+  public void withDeserialized(long deserialized) {
+    this.deserialize = deserialized;
+  }
+
+  public void withDecompressed(long decompressed) {
+    this.decompress = decompressed;
+  }
+
+  public long getDeserialize() {
+    return deserialize;
+  }
+
+  public long getDecompress() {
+    return decompress;
+  }
+
+  public void merge(ShuffleReadTimes other) {
+    this.fetch += other.fetch;
+    this.crc += other.crc;
+    this.copy += other.copy;
+    this.deserialize += other.deserialize;
+    this.decompress += other.decompress;
+  }
+
+  public long getTotal() {
+    return fetch + crc + copy + deserialize + decompress;
+  }
+
+  public RssProtos.ShuffleReadTimes toProto() {
+    return RssProtos.ShuffleReadTimes.newBuilder()
+        .setFetch(fetch)
+        .setCrc(crc)
+        .setCopy(copy)
+        .setDecompress(decompress)
+        .setDeserialize(deserialize)
+        .build();
+  }
+
+  public static ShuffleReadTimes fromProto(RssProtos.ShuffleReadTimes proto) {
+    ShuffleReadTimes time = new ShuffleReadTimes();
+    time.fetch = proto.getFetch();
+    time.crc = proto.getCrc();
+    time.copy = proto.getCopy();
+    time.decompress = proto.getDecompress();
+    time.deserialize = proto.getDeserialize();
+    return time;
+  }
+}
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
index d536d49eb..e88ea097d 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import org.apache.uniffle.common.ShuffleReadTimes;
 import org.apache.uniffle.proto.RssProtos;
 
 public class RssReportShuffleReadMetricRequest {
@@ -30,6 +31,7 @@ public class RssReportShuffleReadMetricRequest {
   private Map<String, TaskShuffleReadMetric> metrics;
   private boolean isShuffleReadFailed;
   private Optional<String> shuffleReadReason;
+  private ShuffleReadTimes shuffleReadTimes;
 
   public RssReportShuffleReadMetricRequest(
       int stageId,
@@ -37,13 +39,15 @@ public class RssReportShuffleReadMetricRequest {
       long taskId,
       Map<String, TaskShuffleReadMetric> metrics,
       boolean isShuffleReadFailed,
-      Optional<String> shuffleReadReason) {
+      Optional<String> shuffleReadReason,
+      ShuffleReadTimes shuffleReadTimes) {
     this.stageId = stageId;
     this.shuffleId = shuffleId;
     this.taskId = taskId;
     this.metrics = metrics;
     this.isShuffleReadFailed = isShuffleReadFailed;
     this.shuffleReadReason = shuffleReadReason;
+    this.shuffleReadTimes = shuffleReadTimes;
   }
 
   public RssProtos.ReportShuffleReadMetricRequest toProto() {
@@ -56,6 +60,7 @@ public class RssReportShuffleReadMetricRequest {
         .setTaskId(request.taskId)
         .setIsTaskReadFailed(request.isShuffleReadFailed)
         .setShuffleReadFailureReason(request.shuffleReadReason.orElse(""))
+        .setShuffleReadTimes(shuffleReadTimes.toProto())
         .putAllMetrics(
             request.metrics.entrySet().stream()
                 .collect(
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 1ad6f2a5d..2967d98c0 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -641,6 +641,15 @@ message ReportShuffleReadMetricRequest {
   map<string, ShuffleReadMetric> metrics = 4;
   bool isTaskReadFailed = 5;
   string shuffleReadFailureReason = 6;
+  ShuffleReadTimes shuffleReadTimes = 7;
+}
+
+message ShuffleReadTimes {
+  int64 fetch = 1;
+  int64 crc = 2;
+  int64 copy = 3;
+  int64 deserialize = 4;
+  int64 decompress = 5;
 }
 
 message ReportShuffleReadMetricResponse {

Reply via email to