This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 3beec2b1a [#2460] feat(spark)(part-3): Add support of read metrics 
report (#2465)
3beec2b1a is described below

commit 3beec2b1af834b8a2850756cc2ba26c1b3b7f4fe
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Apr 29 11:25:08 2025 +0800

    [#2460] feat(spark)(part-3): Add support of read metrics report (#2465)
    
    ### What changes were proposed in this pull request?
    
    1. Add support of read metrics report into UI
    2. Add tab of uniffle properties
    
    ### Why are the changes needed?
    
    followup #2460
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Internal tests
---
 .../scala/org/apache/spark/UniffleListener.scala   | 21 ++++--
 .../org/apache/spark/UniffleStatusStore.scala      | 28 ++++++--
 .../scala/org/apache/spark/ui/ShufflePage.scala    | 79 ++++++++++++--------
 .../spark/shuffle/reader/RssShuffleReader.java     | 34 +++++++++
 .../client/factory/ShuffleClientFactory.java       | 11 +++
 .../uniffle/client/impl/ShuffleReadClientImpl.java |  4 ++
 .../record/reader/MockedShuffleServerClient.java   |  3 +-
 docs/client_guide/spark_client_guide.md            | 12 ++++
 .../org/apache/uniffle/client/api/ClientInfo.java  | 49 +++++++++++++
 .../uniffle/client/api/ShuffleServerClient.java    |  2 +-
 .../client/impl/grpc/ShuffleServerGrpcClient.java  |  7 +-
 .../impl/grpc/ShuffleServerGrpcNettyClient.java    | 13 ++--
 .../request/RssReportShuffleReadMetricRequest.java |  2 +-
 .../storage/factory/ShuffleHandlerFactory.java     |  9 ++-
 .../handler/impl/HadoopClientReadHandler.java      | 19 +++--
 .../handler/impl/LocalFileClientReadHandler.java   | 20 +++++-
 .../handler/impl/MemoryClientReadHandler.java      | 19 ++++-
 .../handler/impl/ShuffleServerReadCost.java        | 84 ++++++++++++++++++++++
 .../handler/impl/ShuffleServerReadCostTracker.java | 49 +++++++++++++
 .../request/CreateShuffleReadHandlerRequest.java   | 10 +++
 20 files changed, 412 insertions(+), 63 deletions(-)

diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
index 5930a7c01..4d3b19521 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
@@ -18,7 +18,7 @@
 package org.apache.spark
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, 
SparkListenerJobEnd, SparkListenerTaskEnd}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, 
SparkListenerJobEnd, SparkListenerJobStart, SparkListenerTaskEnd}
 import org.apache.spark.shuffle.events.{ShuffleAssignmentInfoEvent, 
TaskShuffleReadInfoEvent, TaskShuffleWriteInfoEvent}
 import org.apache.spark.status.ElementTrackingStore
 
@@ -31,7 +31,11 @@ class UniffleListener(conf: SparkConf, kvstore: 
ElementTrackingStore)
 
   private val aggregatedShuffleWriteMetric = new ConcurrentHashMap[String, 
AggregatedShuffleWriteMetric]
   private val aggregatedShuffleReadMetric = new ConcurrentHashMap[String, 
AggregatedShuffleReadMetric]
+
   private val totalTaskCpuTime = new AtomicLong(0)
+  private val totalShuffleReadTime = new AtomicLong(0)
+  private val totalShuffleWriteTime = new AtomicLong(0)
+  private val totalShuffleBytes = new AtomicLong(0)
 
   private val updateIntervalMillis = 5000
   private var updateLastTimeMillis: Long = -1
@@ -48,7 +52,7 @@ class UniffleListener(conf: SparkConf, kvstore: 
ElementTrackingStore)
         new 
AggregatedShuffleReadMetricsUIData(this.aggregatedShuffleReadMetric)
       )
       kvstore.write(
-        TotalTaskCpuTime(totalTaskCpuTime.get())
+        AggregatedTaskInfoUIData(totalTaskCpuTime.get(), 
totalShuffleWriteTime.get(), totalShuffleReadTime.get(), 
totalShuffleBytes.get())
       )
     }
   }
@@ -57,12 +61,18 @@ class UniffleListener(conf: SparkConf, kvstore: 
ElementTrackingStore)
     this.mayUpdate(false)
     if (taskEnd.taskMetrics.shuffleReadMetrics.recordsRead > 0
       || taskEnd.taskMetrics.shuffleWriteMetrics.recordsWritten > 0) {
-      totalTaskCpuTime.addAndGet(
-        taskEnd.taskInfo.duration
-      )
+      totalTaskCpuTime.addAndGet(taskEnd.taskInfo.duration)
+      
totalShuffleWriteTime.addAndGet(taskEnd.taskMetrics.shuffleWriteMetrics.writeTime
 / 1000000)
+      
totalShuffleReadTime.addAndGet(taskEnd.taskMetrics.shuffleReadMetrics.fetchWaitTime)
+      
totalShuffleBytes.addAndGet(taskEnd.taskMetrics.shuffleWriteMetrics.bytesWritten)
     }
   }
 
+  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+    val rssConf = conf.getAll.filter(x => x._1.startsWith("spark.rss."))
+    kvstore.write(new UniffleProperties(rssConf))
+  }
+
   override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
     this.mayUpdate(true)
   }
@@ -88,7 +98,6 @@ class UniffleListener(conf: SparkConf, kvstore: 
ElementTrackingStore)
       val agg_metric = this.aggregatedShuffleWriteMetric.computeIfAbsent(id, _ 
=> new AggregatedShuffleWriteMetric(0, 0))
       agg_metric.byteSize += metric._2.getByteSize
       agg_metric.durationMillis += metric._2.getDurationMillis
-
     }
   }
 
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
index 43c5341e9..b7578d25f 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
@@ -30,6 +30,15 @@ class UniffleStatusStore(store: KVStore) {
     Utils.tryWithResource(view.closeableIterator())(iter => 
iter.asScala.toList)
   }
 
+  def uniffleProperties(): UniffleProperties = {
+    val kClass = classOf[UniffleProperties]
+    try {
+      store.read(kClass, kClass.getName)
+    } catch {
+      case _: NoSuchElementException => new UniffleProperties(Seq.empty)
+    }
+  }
+
   def buildInfo(): BuildInfoUIData = {
     val kClass = classOf[BuildInfoUIData]
     try {
@@ -63,16 +72,22 @@ class UniffleStatusStore(store: KVStore) {
     }
   }
 
-  def totalTaskTime(): TotalTaskCpuTime = {
-    val kClass = classOf[TotalTaskCpuTime]
+  def aggregatedTaskInfo(): AggregatedTaskInfoUIData = {
+    val kClass = classOf[AggregatedTaskInfoUIData]
     try {
       store.read(kClass, kClass.getName)
     } catch {
-      case _: Exception => TotalTaskCpuTime(0)
+      case _: Exception => AggregatedTaskInfoUIData(0, 0, 0, 0)
     }
   }
 }
 
+class UniffleProperties(val info: Seq[(String, String)]) {
+  @JsonIgnore
+  @KVIndex
+  def id: String = classOf[UniffleProperties].getName()
+}
+
 class BuildInfoUIData(val info: Seq[(String, String)]) {
   @JsonIgnore
   @KVIndex
@@ -102,8 +117,11 @@ class AggregatedShuffleReadMetric(durationMillis: Long, 
byteSize: Long)
   extends AggregatedShuffleMetric(durationMillis, byteSize)
 
 // task total cpu time
-case class TotalTaskCpuTime(durationMillis: Long) {
+case class AggregatedTaskInfoUIData(cpuTimeMillis: Long,
+                                    shuffleWriteMillis: Long,
+                                    shuffleReadMillis: Long,
+                                    shuffleBytes: Long) {
   @JsonIgnore
   @KVIndex
-  def id: String = classOf[TotalTaskCpuTime].getName()
+  def id: String = classOf[AggregatedTaskInfoUIData].getName()
 }
\ No newline at end of file
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index 422ec41dc..cef2c4297 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -19,7 +19,7 @@ package org.apache.spark.ui
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.Utils
-import org.apache.spark.{AggregatedShuffleMetric, AggregatedShuffleReadMetric, 
AggregatedShuffleWriteMetric}
+import org.apache.spark.{AggregatedShuffleMetric, AggregatedShuffleReadMetric, 
AggregatedShuffleWriteMetric, AggregatedTaskInfoUIData}
 
 import java.util.concurrent.ConcurrentHashMap
 import javax.servlet.http.HttpServletRequest
@@ -46,6 +46,7 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("") 
with Logging {
     <td>{kv._4}</td>
     <td>{kv._5}</td>
     <td>{kv._6}</td>
+    <td>{kv._7}</td>
   </tr>
 
   private def createShuffleMetricsRows(shuffleWriteMetrics: (Seq[Double], 
Seq[String]), shuffleReadMetrics: (Seq[Double], Seq[String])): 
Seq[scala.xml.Elem] = {
@@ -69,9 +70,9 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("") 
with Logging {
     </tr>
 
     val writeSpeedRow = if (writeSpeeds.nonEmpty) Some(createSpeedRow("Write 
Speed (MB/sec)", writeSpeeds)) else None
-    val writeServerIdRow = if (writeServerIds.nonEmpty) 
Some(createServerIdRow("Shuffle Write Server ID", writeServerIds)) else None
-    val readSpeedRow = if (readSpeeds.nonEmpty) Some(createSpeedRow("Read 
Speed (bytes/sec)", readSpeeds)) else None
-    val readServerIdRow = if (readServerIds.nonEmpty) 
Some(createServerIdRow("Shuffle Read Server ID", readServerIds)) else None
+    val writeServerIdRow = if (writeServerIds.nonEmpty) 
Some(createServerIdRow("Write Shuffle Server ID", writeServerIds)) else None
+    val readSpeedRow = if (readSpeeds.nonEmpty) Some(createSpeedRow("Read 
Speed (MB/sec)", readSpeeds)) else None
+    val readServerIdRow = if (readServerIds.nonEmpty) 
Some(createServerIdRow("Read Shuffle Server ID", readServerIds)) else None
 
     Seq(writeSpeedRow, writeServerIdRow, readSpeedRow, readServerIdRow).flatten
   }
@@ -81,12 +82,17 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("") 
with Logging {
     val originReadMetric = runtimeStatusStore.aggregatedShuffleReadMetrics()
 
     // render header
-    val writeMetaInfo = 
getShuffleMetaInfo(originWriteMetric.metrics.asScala.toSeq)
-    val readMetaInfo = 
getShuffleMetaInfo(originReadMetric.metrics.asScala.toSeq)
-    val shuffleTotalSize = writeMetaInfo._1
-    val shuffleTotalTime = writeMetaInfo._2 + readMetaInfo._2
-    val taskCpuTime = if (runtimeStatusStore.totalTaskTime == null) 0 else 
runtimeStatusStore.totalTaskTime.durationMillis
-    val percent = if (taskCpuTime == 0) 0 else shuffleTotalTime.toDouble / 
taskCpuTime
+    val aggTaskInfo = runtimeStatusStore.aggregatedTaskInfo
+    val taskInfo =
+      if (aggTaskInfo == null)
+        AggregatedTaskInfoUIData(0, 0, 0, 0)
+      else
+        aggTaskInfo
+    val percent =
+      if (taskInfo.cpuTimeMillis == 0)
+        0
+      else
+        (taskInfo.shuffleWriteMillis + taskInfo.shuffleReadMillis).toDouble / 
taskInfo.cpuTimeMillis
 
     // render build info
     val buildInfo = runtimeStatusStore.buildInfo()
@@ -97,13 +103,22 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
       fixedWidth = true
     )
 
+    // render uniffle configs
+    val rssConf = runtimeStatusStore.uniffleProperties()
+    val rssConfTableUI = UIUtils.listingTable(
+      propertyHeader,
+      propertyRow,
+      rssConf.info,
+      fixedWidth = true
+    )
+
     // render shuffle-servers write+read statistics
     val shuffleWriteMetrics = 
shuffleSpeedStatistics(originWriteMetric.metrics.asScala.toSeq)
     val shuffleReadMetrics = 
shuffleSpeedStatistics(originReadMetric.metrics.asScala.toSeq)
     val shuffleHeader = Seq("Avg", "Min", "P25", "P50", "P75", "Max")
     val shuffleMetricsRows = createShuffleMetricsRows(shuffleWriteMetrics, 
shuffleReadMetrics)
     val shuffleMetricsTableUI =
-      <table class="table table-bordered table-sm table-striped sortable">
+      <table class="table table-bordered table-sm table-striped">
         <thead>
           <tr>
             {("Metric" +: shuffleHeader).map(header => <th>
@@ -122,7 +137,7 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("") 
with Logging {
       originReadMetric.metrics
     )
     val allServersTableUI = UIUtils.listingTable(
-      Seq("Shuffle Server ID", "Write Bytes", "Write Duration", "Write Speed 
(MB/sec)", "Read Bytes", "Read Duration", "Read Speed"),
+      Seq("Shuffle Server ID", "Write Bytes", "Write Duration", "Write Speed 
(MB/sec)", "Read Bytes", "Read Duration", "Read Speed (MB/sec)"),
       allServerRow,
       allServers,
       fixedWidth = true
@@ -145,12 +160,14 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
               <a>
                 <strong>Total shuffle bytes:</strong>
               </a>
-              {shuffleTotalSize} / {Utils.bytesToString(shuffleTotalSize)}
+              {Utils.bytesToString(taskInfo.shuffleBytes)}
             </li><li data-relingo-block="true">
             <a>
-              <strong>Shuffle Duration / Task Duration:</strong>
+              <strong>Shuffle Duration (write+read) / Task Duration:</strong>
             </a>
-            {UIUtils.formatDuration(shuffleTotalTime)} / 
{UIUtils.formatDuration(taskCpuTime)} = {roundToTwoDecimals(percent)}
+            {UIUtils.formatDuration(taskInfo.shuffleWriteMillis + 
taskInfo.shuffleReadMillis)}
+            
({UIUtils.formatDuration(taskInfo.shuffleWriteMillis)}+{UIUtils.formatDuration(taskInfo.shuffleReadMillis)})
+            / {UIUtils.formatDuration(taskInfo.cpuTimeMillis)} = 
{roundToTwoDecimals(percent)}
           </li>
           </ul>
         </div>
@@ -168,6 +185,19 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
           </div>
         </div>
 
+        <div>
+          <span class="collapse-uniffle-config-properties collapse-table"
+                onClick="collapseTable('collapse-uniffle-config-properties', 
'uniffle-config-table')">
+            <h4>
+              <span class="collapse-table-arrow arrow-closed"></span>
+              <a>Uniffle Properties</a>
+            </h4>
+          </span>
+          <div class="uniffle-config-table collapsible-table collapsed">
+            {rssConfTableUI}
+          </div>
+        </div>
+
         <div>
           <span class="collapse-throughput-properties collapse-table"
                 onClick="collapseTable('collapse-throughput-properties', 
'statistics-table')">
@@ -175,10 +205,10 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
               <span class="collapse-table-arrow arrow-closed"></span>
               <a>Shuffle Throughput Statistics</a>
             </h4>
-            <div class="statistics-table collapsible-table collapsed">
-              {shuffleMetricsTableUI}
-            </div>
           </span>
+          <div class="statistics-table collapsible-table collapsed">
+            {shuffleMetricsTableUI}
+          </div>
         </div>
 
         <div>
@@ -188,10 +218,10 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
               <span class="collapse-table-arrow arrow-closed"></span>
               <a>Shuffle Server ({allServers.length})</a>
             </h4>
-            <div class="all-servers-table collapsible-table collapsed">
-              {allServersTableUI}
-            </div>
           </span>
+          <div class="all-servers-table collapsible-table collapsed">
+            {allServersTableUI}
+          </div>
         </div>
 
         <div>
@@ -212,13 +242,6 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
     UIUtils.headerSparkPage(request, "Uniffle", summary, parent)
   }
 
-  private def getShuffleMetaInfo(metrics: Seq[(String, 
AggregatedShuffleMetric)]) = {
-    (
-      metrics.map(x => x._2.byteSize).sum,
-      metrics.map(x => x._2.durationMillis).sum
-      )
-  }
-
   private def roundToTwoDecimals(value: Double): Double = {
     BigDecimal(value).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble
   }
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 19682bd65..6c6b3760c 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -20,6 +20,7 @@ package org.apache.spark.shuffle.reader;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import scala.Function0;
 import scala.Function1;
@@ -53,11 +54,15 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.client.api.ShuffleManagerClient;
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
+import org.apache.uniffle.client.request.RssReportShuffleReadMetricRequest;
+import org.apache.uniffle.client.response.RssReportShuffleReadMetricResponse;
 import org.apache.uniffle.client.util.RssClientConfig;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.storage.handler.impl.ShuffleServerReadCostTracker;
 
 import static 
org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED;
 
@@ -85,6 +90,8 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
   private RssConf rssConf;
   private ShuffleDataDistributionType dataDistributionType;
   private Supplier<ShuffleManagerClient> managerClientSupplier;
+  private ShuffleServerReadCostTracker shuffleServerReadCostTracker =
+      new ShuffleServerReadCostTracker();
 
   public RssShuffleReader(
       int startPartition,
@@ -266,6 +273,7 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
             ShuffleClientFactory.getInstance()
                 .createShuffleReadClient(
                     ShuffleClientFactory.newReadBuilder()
+                        .readCostTracker(shuffleServerReadCostTracker)
                         .appId(appId)
                         .shuffleId(shuffleId)
                         .partitionId(partition)
@@ -315,6 +323,7 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
       }
       while (!dataIterator.hasNext()) {
         if (!iterator.hasNext()) {
+          postShuffleReadMetricsToDriver();
           return false;
         }
         dataIterator = iterator.next();
@@ -329,4 +338,29 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
       return result;
     }
   }
+
+  private void postShuffleReadMetricsToDriver() {
+    if (managerClientSupplier != null) {
+      ShuffleManagerClient client = managerClientSupplier.get();
+      if (client != null) {
+        RssReportShuffleReadMetricResponse response =
+            client.reportShuffleReadMetric(
+                new RssReportShuffleReadMetricRequest(
+                    context.stageId(),
+                    shuffleId,
+                    context.taskAttemptId(),
+                    shuffleServerReadCostTracker.list().entrySet().stream()
+                        .collect(
+                            Collectors.toMap(
+                                Map.Entry::getKey,
+                                x ->
+                                    new 
RssReportShuffleReadMetricRequest.TaskShuffleReadMetric(
+                                        x.getValue().getDurationMillis(),
+                                        x.getValue().getReadBytes())))));
+        if (response != null && response.getStatusCode() != 
StatusCode.SUCCESS) {
+          LOG.error("Errors on reporting shuffle read metrics to driver");
+        }
+      }
+    }
+  }
 }
diff --git 
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
 
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
index def9d5fa6..1542f6920 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
@@ -31,6 +31,7 @@ import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.util.IdHelper;
+import org.apache.uniffle.storage.handler.impl.ShuffleServerReadCostTracker;
 
 public class ShuffleClientFactory {
 
@@ -224,6 +225,12 @@ public class ShuffleClientFactory {
     private ClientType clientType;
     private int retryMax;
     private long retryIntervalMax;
+    private ShuffleServerReadCostTracker readCostTracker;
+
+    public ReadClientBuilder readCostTracker(ShuffleServerReadCostTracker 
tracker) {
+      this.readCostTracker = tracker;
+      return this;
+    }
 
     public ReadClientBuilder appId(String appId) {
       this.appId = appId;
@@ -334,6 +341,10 @@ public class ShuffleClientFactory {
 
     public ReadClientBuilder() {}
 
+    public ShuffleServerReadCostTracker getReadCostTracker() {
+      return readCostTracker;
+    }
+
     public String getAppId() {
       return appId;
     }
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 6a1701c95..8383d5b66 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -48,6 +48,7 @@ import org.apache.uniffle.common.util.IdHelper;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.storage.factory.ShuffleHandlerFactory;
 import org.apache.uniffle.storage.handler.api.ClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.ShuffleServerReadCostTracker;
 import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
 
 public class ShuffleReadClientImpl implements ShuffleReadClient {
@@ -69,6 +70,7 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
   private ClientReadHandler clientReadHandler;
   private IdHelper idHelper;
   private BlockIdLayout blockIdLayout;
+  private ShuffleServerReadCostTracker readCostTracker;
 
   public ShuffleReadClientImpl(ShuffleClientFactory.ReadClientBuilder builder) 
{
     // add default value
@@ -151,6 +153,7 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
     this.idHelper = builder.getIdHelper();
     this.shuffleServerInfoList = builder.getShuffleServerInfoList();
     this.blockIdLayout = BlockIdLayout.from(builder.getRssConf());
+    this.readCostTracker = builder.getReadCostTracker();
 
     CreateShuffleReadHandlerRequest request = new 
CreateShuffleReadHandlerRequest();
     request.setStorageType(builder.getStorageType());
@@ -173,6 +176,7 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
     request.setClientType(builder.getClientType());
     request.setRetryMax(builder.getRetryMax());
     request.setRetryIntervalMax(builder.getRetryIntervalMax());
+    request.setReadCostTracker(readCostTracker);
     if (builder.isExpectedTaskIdsBitmapFilterEnable()) {
       request.useExpectedTaskIdsBitmapFilter();
     }
diff --git 
a/client/src/test/java/org/apache/uniffle/client/record/reader/MockedShuffleServerClient.java
 
b/client/src/test/java/org/apache/uniffle/client/record/reader/MockedShuffleServerClient.java
index 931f8990c..e60abea52 100644
--- 
a/client/src/test/java/org/apache/uniffle/client/record/reader/MockedShuffleServerClient.java
+++ 
b/client/src/test/java/org/apache/uniffle/client/record/reader/MockedShuffleServerClient.java
@@ -27,6 +27,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
+import org.apache.uniffle.client.api.ClientInfo;
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.client.request.RssAppHeartBeatRequest;
 import org.apache.uniffle.client.request.RssFinishShuffleRequest;
@@ -205,7 +206,7 @@ public class MockedShuffleServerClient implements 
ShuffleServerClient {
   public void close() {}
 
   @Override
-  public String getClientInfo() {
+  public ClientInfo getClientInfo() {
     return null;
   }
 }
diff --git a/docs/client_guide/spark_client_guide.md 
b/docs/client_guide/spark_client_guide.md
index c189c064d..11f072afb 100644
--- a/docs/client_guide/spark_client_guide.md
+++ b/docs/client_guide/spark_client_guide.md
@@ -180,3 +180,15 @@ We can enable this feature by using the following 
configuration:
 | spark.rss.client.mapSideCombine.enabled | false   | Whether to enable map 
side combine of shuffle writer. |
 
 **Note**: Map side combine will handle entire map side shuffle write data, 
which may cause data spills and delay shuffle writes.
+
+### Spark UI
+
+The Uniffle client’s metric statistics are now available in the Spark UI under 
the Uniffle tab.
+This feature can be enabled with the following configuration (supported in 
Spark 3 and above).
+
+```bash
+spark.plugins org.apache.spark.UnifflePlugin
+```
+
+To enable this feature in the Spark History Server, place the Uniffle client 
JAR file into the jars directory of your Spark HOME. 
+A restart of the History Server may be required for the changes to take effect.
\ No newline at end of file
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/api/ClientInfo.java 
b/internal-client/src/main/java/org/apache/uniffle/client/api/ClientInfo.java
new file mode 100644
index 000000000..2170a0a27
--- /dev/null
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/api/ClientInfo.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.api;
+
+import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.ShuffleServerInfo;
+
+public class ClientInfo {
+  private ClientType clientType;
+  private ShuffleServerInfo shuffleServerInfo;
+
+  public ClientInfo(ClientType clientType, ShuffleServerInfo 
shuffleServerInfo) {
+    this.clientType = clientType;
+    this.shuffleServerInfo = shuffleServerInfo;
+  }
+
+  public ClientType getClientType() {
+    return clientType;
+  }
+
+  public ShuffleServerInfo getShuffleServerInfo() {
+    return shuffleServerInfo;
+  }
+
+  @Override
+  public String toString() {
+    return "ClientInfo{"
+        + "clientType="
+        + clientType
+        + ", shuffleServerInfo="
+        + shuffleServerInfo
+        + '}';
+  }
+}
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
index 3e01d6760..88aca1697 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
@@ -84,5 +84,5 @@ public interface ShuffleServerClient {
 
   void close();
 
-  String getClientInfo();
+  ClientInfo getClientInfo();
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 82993890e..82dd7c9b3 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -38,6 +38,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.client.api.ClientInfo;
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.client.request.RetryableRequest;
 import org.apache.uniffle.client.request.RssAppHeartBeatRequest;
@@ -70,10 +71,12 @@ import 
org.apache.uniffle.client.response.RssStartSortMergeResponse;
 import org.apache.uniffle.client.response.RssUnregisterShuffleByAppIdResponse;
 import org.apache.uniffle.client.response.RssUnregisterShuffleResponse;
 import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.NotRetryException;
@@ -1256,8 +1259,8 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
   }
 
   @Override
-  public String getClientInfo() {
-    return "ShuffleServerGrpcClient for host[" + host + "], port[" + port + 
"]";
+  public ClientInfo getClientInfo() {
+    return new ClientInfo(ClientType.GRPC, new ShuffleServerInfo(host, port));
   }
 
   protected void waitOrThrow(
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index e48233ddb..07e91ea9e 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -30,6 +30,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.client.api.ClientInfo;
 import org.apache.uniffle.client.request.RssGetInMemoryShuffleDataRequest;
 import org.apache.uniffle.client.request.RssGetShuffleDataRequest;
 import org.apache.uniffle.client.request.RssGetShuffleIndexRequest;
@@ -40,7 +41,9 @@ import 
org.apache.uniffle.client.response.RssGetShuffleDataResponse;
 import org.apache.uniffle.client.response.RssGetShuffleIndexResponse;
 import org.apache.uniffle.client.response.RssGetSortedShuffleDataResponse;
 import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.NotRetryException;
@@ -125,14 +128,8 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
   }
 
   @Override
-  public String getClientInfo() {
-    return "ShuffleServerGrpcNettyClient for host["
-        + host
-        + "], port["
-        + port
-        + "], nettyPort["
-        + nettyPort
-        + "]";
+  public ClientInfo getClientInfo() {
+    return new ClientInfo(ClientType.GRPC_NETTY, new ShuffleServerInfo(host, 
port, nettyPort));
   }
 
   @Override
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
index b5f4a6624..17d95d0d2 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
@@ -61,7 +61,7 @@ public class RssReportShuffleReadMetricRequest {
     private long durationMillis;
     private long byteSize;
 
-    TaskShuffleReadMetric(long durationMillis, long byteSize) {
+    public TaskShuffleReadMetric(long durationMillis, long byteSize) {
       this.durationMillis = durationMillis;
       this.byteSize = byteSize;
     }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
 
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
index fba2d3810..84550ab1a 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
@@ -139,7 +139,8 @@ public class ShuffleHandlerFactory {
             expectTaskIds,
             request.getRetryMax(),
             request.getRetryIntervalMax(),
-            request.getPrefetchOption());
+            request.getPrefetchOption(),
+            request.getReadCostTracker());
     return memoryClientReadHandler;
   }
 
@@ -163,7 +164,8 @@ public class ShuffleHandlerFactory {
         request.getExpectTaskIds(),
         request.getRetryMax(),
         request.getRetryIntervalMax(),
-        request.getPrefetchOption());
+        request.getPrefetchOption(),
+        request.getReadCostTracker());
   }
 
   private ClientReadHandler getHadoopClientReadHandler(
@@ -184,7 +186,8 @@ public class ShuffleHandlerFactory {
         request.getExpectTaskIds(),
         ssi.getId(),
         request.isOffHeapEnabled(),
-        request.getPrefetchOption());
+        request.getPrefetchOption(),
+        request.getReadCostTracker());
   }
 
   public ShuffleDeleteHandler createShuffleDeleteHandler(
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandler.java
index ec444b2b4..a316a3028 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandler.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.StorageType;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
 import org.apache.uniffle.common.util.Constants;
@@ -57,6 +58,7 @@ public class HadoopClientReadHandler extends 
AbstractClientReadHandler {
   private Roaring64NavigableMap expectTaskIds;
   private boolean offHeapEnable = false;
   private Optional<PrefetchableClientReadHandler.PrefetchOption> 
prefetchOption;
+  private ShuffleServerReadCostTracker readCostTracker;
 
   public HadoopClientReadHandler(
       String appId,
@@ -74,7 +76,8 @@ public class HadoopClientReadHandler extends 
AbstractClientReadHandler {
       Roaring64NavigableMap expectTaskIds,
       String shuffleServerId,
       boolean offHeapEnable,
-      Optional<PrefetchableClientReadHandler.PrefetchOption> prefetchOption) {
+      Optional<PrefetchableClientReadHandler.PrefetchOption> prefetchOption,
+      ShuffleServerReadCostTracker readCostTracker) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
@@ -91,6 +94,7 @@ public class HadoopClientReadHandler extends 
AbstractClientReadHandler {
     this.shuffleServerId = shuffleServerId;
     this.offHeapEnable = offHeapEnable;
     this.prefetchOption = prefetchOption;
+    this.readCostTracker = readCostTracker;
   }
 
   // Only for test
@@ -122,7 +126,8 @@ public class HadoopClientReadHandler extends 
AbstractClientReadHandler {
         Roaring64NavigableMap.bitmapOf(),
         null,
         false,
-        Optional.empty());
+        Optional.empty(),
+        new ShuffleServerReadCostTracker());
   }
 
   protected void init(String fullShufflePath) {
@@ -210,8 +215,8 @@ public class HadoopClientReadHandler extends 
AbstractClientReadHandler {
     }
 
     HadoopShuffleReadHandler hadoopShuffleFileReader = 
readHandlers.get(readHandlerIndex);
+    long start = System.currentTimeMillis();
     ShuffleDataResult shuffleDataResult = 
hadoopShuffleFileReader.readShuffleData();
-
     while (shuffleDataResult == null) {
       ++readHandlerIndex;
       if (readHandlerIndex >= readHandlers.size()) {
@@ -220,7 +225,13 @@ public class HadoopClientReadHandler extends 
AbstractClientReadHandler {
       hadoopShuffleFileReader = readHandlers.get(readHandlerIndex);
       shuffleDataResult = hadoopShuffleFileReader.readShuffleData();
     }
-
+    if (readCostTracker != null) {
+      readCostTracker.record(
+          shuffleServerId,
+          StorageType.HDFS,
+          shuffleDataResult.getDataLength(),
+          System.currentTimeMillis() - start);
+    }
     return shuffleDataResult;
   }
 
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
index a5464678b..d06675808 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
@@ -24,6 +24,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.client.api.ClientInfo;
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.client.request.RssGetShuffleDataRequest;
 import org.apache.uniffle.client.request.RssGetShuffleIndexRequest;
@@ -32,6 +33,7 @@ import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleDataSegment;
 import org.apache.uniffle.common.ShuffleIndexResult;
+import org.apache.uniffle.common.StorageType;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.exception.RssFetchFailedException;
 
@@ -42,6 +44,7 @@ public class LocalFileClientReadHandler extends 
DataSkippableReadHandler {
   private ShuffleServerClient shuffleServerClient;
   private int retryMax;
   private long retryIntervalMax;
+  private ShuffleServerReadCostTracker readCostTracker;
 
   public LocalFileClientReadHandler(
       String appId,
@@ -58,7 +61,8 @@ public class LocalFileClientReadHandler extends 
DataSkippableReadHandler {
       Roaring64NavigableMap expectTaskIds,
       int retryMax,
       long retryIntervalMax,
-      Optional<PrefetchOption> prefetchOption) {
+      Optional<PrefetchOption> prefetchOption,
+      ShuffleServerReadCostTracker readCostTracker) {
     super(
         appId,
         shuffleId,
@@ -74,6 +78,7 @@ public class LocalFileClientReadHandler extends 
DataSkippableReadHandler {
     this.partitionNum = partitionNum;
     this.retryMax = retryMax;
     this.retryIntervalMax = retryIntervalMax;
+    this.readCostTracker = readCostTracker;
   }
 
   @VisibleForTesting
@@ -103,7 +108,8 @@ public class LocalFileClientReadHandler extends 
DataSkippableReadHandler {
         Roaring64NavigableMap.bitmapOf(),
         1,
         0,
-        Optional.empty());
+        Optional.empty(),
+        new ShuffleServerReadCostTracker());
   }
 
   @Override
@@ -166,9 +172,19 @@ public class LocalFileClientReadHandler extends 
DataSkippableReadHandler {
             retryMax,
             retryIntervalMax);
     try {
+      long start = System.currentTimeMillis();
       RssGetShuffleDataResponse response = 
shuffleServerClient.getShuffleData(request);
       result =
           new ShuffleDataResult(response.getShuffleData(), 
shuffleDataSegment.getBufferSegments());
+
+      ClientInfo clientInfo = shuffleServerClient.getClientInfo();
+      if (readCostTracker != null && clientInfo != null) {
+        readCostTracker.record(
+            clientInfo.getShuffleServerInfo().getId(),
+            StorageType.LOCALFILE,
+            result.getDataLength(),
+            System.currentTimeMillis() - start);
+      }
     } catch (Exception e) {
       throw new RssException(
           "Failed to read shuffle data with " + 
shuffleServerClient.getClientInfo(), e);
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
index b5ec28970..b2f15e472 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
@@ -25,11 +25,13 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.client.api.ClientInfo;
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.client.request.RssGetInMemoryShuffleDataRequest;
 import org.apache.uniffle.client.response.RssGetInMemoryShuffleDataResponse;
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.StorageType;
 import org.apache.uniffle.common.exception.RssFetchFailedException;
 import org.apache.uniffle.common.util.Constants;
 
@@ -41,6 +43,7 @@ public class MemoryClientReadHandler extends 
PrefetchableClientReadHandler {
   private Roaring64NavigableMap expectTaskIds;
   private int retryMax;
   private long retryIntervalMax;
+  private ShuffleServerReadCostTracker readCostTracker;
 
   public MemoryClientReadHandler(
       String appId,
@@ -51,7 +54,8 @@ public class MemoryClientReadHandler extends 
PrefetchableClientReadHandler {
       Roaring64NavigableMap expectTaskIds,
       int retryMax,
       long retryIntervalMax,
-      Optional<PrefetchableClientReadHandler.PrefetchOption> prefetchOption) {
+      Optional<PrefetchableClientReadHandler.PrefetchOption> prefetchOption,
+      ShuffleServerReadCostTracker readCostTracker) {
     super(prefetchOption);
     this.appId = appId;
     this.shuffleId = shuffleId;
@@ -61,6 +65,7 @@ public class MemoryClientReadHandler extends 
PrefetchableClientReadHandler {
     this.expectTaskIds = expectTaskIds;
     this.retryMax = retryMax;
     this.retryIntervalMax = retryIntervalMax;
+    this.readCostTracker = readCostTracker;
   }
 
   @VisibleForTesting
@@ -80,7 +85,8 @@ public class MemoryClientReadHandler extends 
PrefetchableClientReadHandler {
         expectTaskIds,
         1,
         0,
-        Optional.empty());
+        Optional.empty(),
+        new ShuffleServerReadCostTracker());
   }
 
   @Override
@@ -99,9 +105,18 @@ public class MemoryClientReadHandler extends 
PrefetchableClientReadHandler {
             retryIntervalMax);
 
     try {
+      long start = System.currentTimeMillis();
       RssGetInMemoryShuffleDataResponse response =
           shuffleServerClient.getInMemoryShuffleData(request);
       result = new ShuffleDataResult(response.getData(), 
response.getBufferSegments());
+      ClientInfo clientInfo = shuffleServerClient.getClientInfo();
+      if (readCostTracker != null && clientInfo != null) {
+        readCostTracker.record(
+            clientInfo.getShuffleServerInfo().getId(),
+            StorageType.MEMORY,
+            result.getDataLength(),
+            System.currentTimeMillis() - start);
+      }
     } catch (RssFetchFailedException e) {
       throw e;
     } catch (Exception e) {
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCost.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCost.java
new file mode 100644
index 000000000..e8920108c
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCost.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.uniffle.common.StorageType;
+
+public class ShuffleServerReadCost {
+  private final String shuffleServerId;
+  private final AtomicLong durationMillis;
+  private final AtomicLong readBytes;
+
+  // hybrid storage statistics
+  private final AtomicLong memoryReadBytes;
+  private final AtomicLong memoryReadDurationMillis;
+
+  private final AtomicLong localfileReadBytes;
+  private final AtomicLong localfileReadDurationMillis;
+
+  private final AtomicLong hadoopReadLocalFileBytes;
+  private final AtomicLong hadoopReadLocalFileDurationMillis;
+
+  public ShuffleServerReadCost(String shuffleServerId) {
+    this.shuffleServerId = shuffleServerId;
+
+    this.durationMillis = new AtomicLong(0);
+    this.readBytes = new AtomicLong(0);
+
+    this.memoryReadBytes = new AtomicLong(0);
+    this.memoryReadDurationMillis = new AtomicLong(0);
+
+    this.localfileReadBytes = new AtomicLong(0);
+    this.localfileReadDurationMillis = new AtomicLong(0);
+
+    this.hadoopReadLocalFileBytes = new AtomicLong(0);
+    this.hadoopReadLocalFileDurationMillis = new AtomicLong(0);
+  }
+
+  public void inc(StorageType storageType, long bytes, long durationMillis) {
+    this.durationMillis.addAndGet(durationMillis);
+    this.readBytes.addAndGet(bytes);
+
+    switch (storageType) {
+      case MEMORY:
+        this.memoryReadBytes.addAndGet(bytes);
+        this.memoryReadDurationMillis.addAndGet(durationMillis);
+        break;
+      case LOCALFILE:
+        this.localfileReadBytes.addAndGet(bytes);
+        this.localfileReadDurationMillis.addAndGet(durationMillis);
+        break;
+      case HDFS:
+        this.hadoopReadLocalFileBytes.addAndGet(bytes);
+        this.hadoopReadLocalFileDurationMillis.addAndGet(durationMillis);
+        break;
+      default:
+        break;
+    }
+  }
+
+  public long getDurationMillis() {
+    return durationMillis.get();
+  }
+
+  public long getReadBytes() {
+    return readBytes.get();
+  }
+}
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCostTracker.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCostTracker.java
new file mode 100644
index 000000000..015257361
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCostTracker.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler.impl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.StorageType;
+
+public class ShuffleServerReadCostTracker {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ShuffleServerReadCostTracker.class);
+
+  private Map<String, ShuffleServerReadCost> tracking;
+
+  public ShuffleServerReadCostTracker() {
+    this.tracking = new ConcurrentHashMap<>();
+  }
+
+  public void record(String serverId, StorageType storageType, long bytes, 
long durationMillis) {
+    if (serverId == null) {
+      return;
+    }
+    ShuffleServerReadCost readCost =
+        tracking.computeIfAbsent(serverId, x -> new ShuffleServerReadCost(x));
+    readCost.inc(storageType, bytes, durationMillis);
+  }
+
+  public Map<String, ShuffleServerReadCost> list() {
+    return tracking;
+  }
+}
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
 
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
index 9a36e4ad0..145f93cab 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
@@ -31,6 +31,7 @@ import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.util.IdHelper;
 import org.apache.uniffle.storage.handler.impl.PrefetchableClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.ShuffleServerReadCostTracker;
 
 public class CreateShuffleReadHandlerRequest {
 
@@ -55,6 +56,7 @@ public class CreateShuffleReadHandlerRequest {
   private boolean expectedTaskIdsBitmapFilterEnable;
   private boolean offHeapEnabled;
   private RssConf clientConf;
+  private ShuffleServerReadCostTracker readCostTracker;
 
   private IdHelper idHelper;
 
@@ -246,6 +248,14 @@ public class CreateShuffleReadHandlerRequest {
     this.clientType = clientType;
   }
 
+  public ShuffleServerReadCostTracker getReadCostTracker() {
+    return readCostTracker;
+  }
+
+  public void setReadCostTracker(ShuffleServerReadCostTracker readCostTracker) 
{
+    this.readCostTracker = readCostTracker;
+  }
+
   public Optional<PrefetchableClientReadHandler.PrefetchOption> 
getPrefetchOption() {
     if (clientConf.get(RssClientConf.RSS_CLIENT_PREFETCH_ENABLED)) {
       return Optional.of(

Reply via email to