This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 3beec2b1a [#2460] feat(spark)(part-3): Add support of read metrics
report (#2465)
3beec2b1a is described below
commit 3beec2b1af834b8a2850756cc2ba26c1b3b7f4fe
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Apr 29 11:25:08 2025 +0800
[#2460] feat(spark)(part-3): Add support of read metrics report (#2465)
### What changes were proposed in this pull request?
1. Add support of read metrics report into UI
2. Add tab of uniffle properties
### Why are the changes needed?
followup #2460
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal tests
---
.../scala/org/apache/spark/UniffleListener.scala | 21 ++++--
.../org/apache/spark/UniffleStatusStore.scala | 28 ++++++--
.../scala/org/apache/spark/ui/ShufflePage.scala | 79 ++++++++++++--------
.../spark/shuffle/reader/RssShuffleReader.java | 34 +++++++++
.../client/factory/ShuffleClientFactory.java | 11 +++
.../uniffle/client/impl/ShuffleReadClientImpl.java | 4 ++
.../record/reader/MockedShuffleServerClient.java | 3 +-
docs/client_guide/spark_client_guide.md | 12 ++++
.../org/apache/uniffle/client/api/ClientInfo.java | 49 +++++++++++++
.../uniffle/client/api/ShuffleServerClient.java | 2 +-
.../client/impl/grpc/ShuffleServerGrpcClient.java | 7 +-
.../impl/grpc/ShuffleServerGrpcNettyClient.java | 13 ++--
.../request/RssReportShuffleReadMetricRequest.java | 2 +-
.../storage/factory/ShuffleHandlerFactory.java | 9 ++-
.../handler/impl/HadoopClientReadHandler.java | 19 +++--
.../handler/impl/LocalFileClientReadHandler.java | 20 +++++-
.../handler/impl/MemoryClientReadHandler.java | 19 ++++-
.../handler/impl/ShuffleServerReadCost.java | 84 ++++++++++++++++++++++
.../handler/impl/ShuffleServerReadCostTracker.java | 49 +++++++++++++
.../request/CreateShuffleReadHandlerRequest.java | 10 +++
20 files changed, 412 insertions(+), 63 deletions(-)
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
index 5930a7c01..4d3b19521 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
@@ -18,7 +18,7 @@
package org.apache.spark
import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent,
SparkListenerJobEnd, SparkListenerTaskEnd}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent,
SparkListenerJobEnd, SparkListenerJobStart, SparkListenerTaskEnd}
import org.apache.spark.shuffle.events.{ShuffleAssignmentInfoEvent,
TaskShuffleReadInfoEvent, TaskShuffleWriteInfoEvent}
import org.apache.spark.status.ElementTrackingStore
@@ -31,7 +31,11 @@ class UniffleListener(conf: SparkConf, kvstore:
ElementTrackingStore)
private val aggregatedShuffleWriteMetric = new ConcurrentHashMap[String,
AggregatedShuffleWriteMetric]
private val aggregatedShuffleReadMetric = new ConcurrentHashMap[String,
AggregatedShuffleReadMetric]
+
private val totalTaskCpuTime = new AtomicLong(0)
+ private val totalShuffleReadTime = new AtomicLong(0)
+ private val totalShuffleWriteTime = new AtomicLong(0)
+ private val totalShuffleBytes = new AtomicLong(0)
private val updateIntervalMillis = 5000
private var updateLastTimeMillis: Long = -1
@@ -48,7 +52,7 @@ class UniffleListener(conf: SparkConf, kvstore:
ElementTrackingStore)
new
AggregatedShuffleReadMetricsUIData(this.aggregatedShuffleReadMetric)
)
kvstore.write(
- TotalTaskCpuTime(totalTaskCpuTime.get())
+ AggregatedTaskInfoUIData(totalTaskCpuTime.get(),
totalShuffleWriteTime.get(), totalShuffleReadTime.get(),
totalShuffleBytes.get())
)
}
}
@@ -57,12 +61,18 @@ class UniffleListener(conf: SparkConf, kvstore:
ElementTrackingStore)
this.mayUpdate(false)
if (taskEnd.taskMetrics.shuffleReadMetrics.recordsRead > 0
|| taskEnd.taskMetrics.shuffleWriteMetrics.recordsWritten > 0) {
- totalTaskCpuTime.addAndGet(
- taskEnd.taskInfo.duration
- )
+ totalTaskCpuTime.addAndGet(taskEnd.taskInfo.duration)
+
totalShuffleWriteTime.addAndGet(taskEnd.taskMetrics.shuffleWriteMetrics.writeTime
/ 1000000)
+
totalShuffleReadTime.addAndGet(taskEnd.taskMetrics.shuffleReadMetrics.fetchWaitTime)
+
totalShuffleBytes.addAndGet(taskEnd.taskMetrics.shuffleWriteMetrics.bytesWritten)
}
}
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+ val rssConf = conf.getAll.filter(x => x._1.startsWith("spark.rss."))
+ kvstore.write(new UniffleProperties(rssConf))
+ }
+
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
this.mayUpdate(true)
}
@@ -88,7 +98,6 @@ class UniffleListener(conf: SparkConf, kvstore:
ElementTrackingStore)
val agg_metric = this.aggregatedShuffleWriteMetric.computeIfAbsent(id, _
=> new AggregatedShuffleWriteMetric(0, 0))
agg_metric.byteSize += metric._2.getByteSize
agg_metric.durationMillis += metric._2.getDurationMillis
-
}
}
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
index 43c5341e9..b7578d25f 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
@@ -30,6 +30,15 @@ class UniffleStatusStore(store: KVStore) {
Utils.tryWithResource(view.closeableIterator())(iter =>
iter.asScala.toList)
}
+ def uniffleProperties(): UniffleProperties = {
+ val kClass = classOf[UniffleProperties]
+ try {
+ store.read(kClass, kClass.getName)
+ } catch {
+ case _: NoSuchElementException => new UniffleProperties(Seq.empty)
+ }
+ }
+
def buildInfo(): BuildInfoUIData = {
val kClass = classOf[BuildInfoUIData]
try {
@@ -63,16 +72,22 @@ class UniffleStatusStore(store: KVStore) {
}
}
- def totalTaskTime(): TotalTaskCpuTime = {
- val kClass = classOf[TotalTaskCpuTime]
+ def aggregatedTaskInfo(): AggregatedTaskInfoUIData = {
+ val kClass = classOf[AggregatedTaskInfoUIData]
try {
store.read(kClass, kClass.getName)
} catch {
- case _: Exception => TotalTaskCpuTime(0)
+ case _: Exception => AggregatedTaskInfoUIData(0, 0, 0, 0)
}
}
}
+class UniffleProperties(val info: Seq[(String, String)]) {
+ @JsonIgnore
+ @KVIndex
+ def id: String = classOf[UniffleProperties].getName()
+}
+
class BuildInfoUIData(val info: Seq[(String, String)]) {
@JsonIgnore
@KVIndex
@@ -102,8 +117,11 @@ class AggregatedShuffleReadMetric(durationMillis: Long,
byteSize: Long)
extends AggregatedShuffleMetric(durationMillis, byteSize)
// task total cpu time
-case class TotalTaskCpuTime(durationMillis: Long) {
+case class AggregatedTaskInfoUIData(cpuTimeMillis: Long,
+ shuffleWriteMillis: Long,
+ shuffleReadMillis: Long,
+ shuffleBytes: Long) {
@JsonIgnore
@KVIndex
- def id: String = classOf[TotalTaskCpuTime].getName()
+ def id: String = classOf[AggregatedTaskInfoUIData].getName()
}
\ No newline at end of file
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index 422ec41dc..cef2c4297 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -19,7 +19,7 @@ package org.apache.spark.ui
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
-import org.apache.spark.{AggregatedShuffleMetric, AggregatedShuffleReadMetric,
AggregatedShuffleWriteMetric}
+import org.apache.spark.{AggregatedShuffleMetric, AggregatedShuffleReadMetric,
AggregatedShuffleWriteMetric, AggregatedTaskInfoUIData}
import java.util.concurrent.ConcurrentHashMap
import javax.servlet.http.HttpServletRequest
@@ -46,6 +46,7 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("")
with Logging {
<td>{kv._4}</td>
<td>{kv._5}</td>
<td>{kv._6}</td>
+ <td>{kv._7}</td>
</tr>
private def createShuffleMetricsRows(shuffleWriteMetrics: (Seq[Double],
Seq[String]), shuffleReadMetrics: (Seq[Double], Seq[String])):
Seq[scala.xml.Elem] = {
@@ -69,9 +70,9 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("")
with Logging {
</tr>
val writeSpeedRow = if (writeSpeeds.nonEmpty) Some(createSpeedRow("Write
Speed (MB/sec)", writeSpeeds)) else None
- val writeServerIdRow = if (writeServerIds.nonEmpty)
Some(createServerIdRow("Shuffle Write Server ID", writeServerIds)) else None
- val readSpeedRow = if (readSpeeds.nonEmpty) Some(createSpeedRow("Read
Speed (bytes/sec)", readSpeeds)) else None
- val readServerIdRow = if (readServerIds.nonEmpty)
Some(createServerIdRow("Shuffle Read Server ID", readServerIds)) else None
+ val writeServerIdRow = if (writeServerIds.nonEmpty)
Some(createServerIdRow("Write Shuffle Server ID", writeServerIds)) else None
+ val readSpeedRow = if (readSpeeds.nonEmpty) Some(createSpeedRow("Read
Speed (MB/sec)", readSpeeds)) else None
+ val readServerIdRow = if (readServerIds.nonEmpty)
Some(createServerIdRow("Read Shuffle Server ID", readServerIds)) else None
Seq(writeSpeedRow, writeServerIdRow, readSpeedRow, readServerIdRow).flatten
}
@@ -81,12 +82,17 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("")
with Logging {
val originReadMetric = runtimeStatusStore.aggregatedShuffleReadMetrics()
// render header
- val writeMetaInfo =
getShuffleMetaInfo(originWriteMetric.metrics.asScala.toSeq)
- val readMetaInfo =
getShuffleMetaInfo(originReadMetric.metrics.asScala.toSeq)
- val shuffleTotalSize = writeMetaInfo._1
- val shuffleTotalTime = writeMetaInfo._2 + readMetaInfo._2
- val taskCpuTime = if (runtimeStatusStore.totalTaskTime == null) 0 else
runtimeStatusStore.totalTaskTime.durationMillis
- val percent = if (taskCpuTime == 0) 0 else shuffleTotalTime.toDouble /
taskCpuTime
+ val aggTaskInfo = runtimeStatusStore.aggregatedTaskInfo
+ val taskInfo =
+ if (aggTaskInfo == null)
+ AggregatedTaskInfoUIData(0, 0, 0, 0)
+ else
+ aggTaskInfo
+ val percent =
+ if (taskInfo.cpuTimeMillis == 0)
+ 0
+ else
+ (taskInfo.shuffleWriteMillis + taskInfo.shuffleReadMillis).toDouble /
taskInfo.cpuTimeMillis
// render build info
val buildInfo = runtimeStatusStore.buildInfo()
@@ -97,13 +103,22 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
fixedWidth = true
)
+ // render uniffle configs
+ val rssConf = runtimeStatusStore.uniffleProperties()
+ val rssConfTableUI = UIUtils.listingTable(
+ propertyHeader,
+ propertyRow,
+ rssConf.info,
+ fixedWidth = true
+ )
+
// render shuffle-servers write+read statistics
val shuffleWriteMetrics =
shuffleSpeedStatistics(originWriteMetric.metrics.asScala.toSeq)
val shuffleReadMetrics =
shuffleSpeedStatistics(originReadMetric.metrics.asScala.toSeq)
val shuffleHeader = Seq("Avg", "Min", "P25", "P50", "P75", "Max")
val shuffleMetricsRows = createShuffleMetricsRows(shuffleWriteMetrics,
shuffleReadMetrics)
val shuffleMetricsTableUI =
- <table class="table table-bordered table-sm table-striped sortable">
+ <table class="table table-bordered table-sm table-striped">
<thead>
<tr>
{("Metric" +: shuffleHeader).map(header => <th>
@@ -122,7 +137,7 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("")
with Logging {
originReadMetric.metrics
)
val allServersTableUI = UIUtils.listingTable(
- Seq("Shuffle Server ID", "Write Bytes", "Write Duration", "Write Speed
(MB/sec)", "Read Bytes", "Read Duration", "Read Speed"),
+ Seq("Shuffle Server ID", "Write Bytes", "Write Duration", "Write Speed
(MB/sec)", "Read Bytes", "Read Duration", "Read Speed (MB/sec)"),
allServerRow,
allServers,
fixedWidth = true
@@ -145,12 +160,14 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
<a>
<strong>Total shuffle bytes:</strong>
</a>
- {shuffleTotalSize} / {Utils.bytesToString(shuffleTotalSize)}
+ {Utils.bytesToString(taskInfo.shuffleBytes)}
</li><li data-relingo-block="true">
<a>
- <strong>Shuffle Duration / Task Duration:</strong>
+ <strong>Shuffle Duration (write+read) / Task Duration:</strong>
</a>
- {UIUtils.formatDuration(shuffleTotalTime)} /
{UIUtils.formatDuration(taskCpuTime)} = {roundToTwoDecimals(percent)}
+ {UIUtils.formatDuration(taskInfo.shuffleWriteMillis +
taskInfo.shuffleReadMillis)}
+
({UIUtils.formatDuration(taskInfo.shuffleWriteMillis)}+{UIUtils.formatDuration(taskInfo.shuffleReadMillis)})
+ / {UIUtils.formatDuration(taskInfo.cpuTimeMillis)} =
{roundToTwoDecimals(percent)}
</li>
</ul>
</div>
@@ -168,6 +185,19 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
</div>
</div>
+ <div>
+ <span class="collapse-uniffle-config-properties collapse-table"
+ onClick="collapseTable('collapse-uniffle-config-properties',
'uniffle-config-table')">
+ <h4>
+ <span class="collapse-table-arrow arrow-closed"></span>
+ <a>Uniffle Properties</a>
+ </h4>
+ </span>
+ <div class="uniffle-config-table collapsible-table collapsed">
+ {rssConfTableUI}
+ </div>
+ </div>
+
<div>
<span class="collapse-throughput-properties collapse-table"
onClick="collapseTable('collapse-throughput-properties',
'statistics-table')">
@@ -175,10 +205,10 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
<span class="collapse-table-arrow arrow-closed"></span>
<a>Shuffle Throughput Statistics</a>
</h4>
- <div class="statistics-table collapsible-table collapsed">
- {shuffleMetricsTableUI}
- </div>
</span>
+ <div class="statistics-table collapsible-table collapsed">
+ {shuffleMetricsTableUI}
+ </div>
</div>
<div>
@@ -188,10 +218,10 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
<span class="collapse-table-arrow arrow-closed"></span>
<a>Shuffle Server ({allServers.length})</a>
</h4>
- <div class="all-servers-table collapsible-table collapsed">
- {allServersTableUI}
- </div>
</span>
+ <div class="all-servers-table collapsible-table collapsed">
+ {allServersTableUI}
+ </div>
</div>
<div>
@@ -212,13 +242,6 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
UIUtils.headerSparkPage(request, "Uniffle", summary, parent)
}
- private def getShuffleMetaInfo(metrics: Seq[(String,
AggregatedShuffleMetric)]) = {
- (
- metrics.map(x => x._2.byteSize).sum,
- metrics.map(x => x._2.durationMillis).sum
- )
- }
-
private def roundToTwoDecimals(value: Double): Double = {
BigDecimal(value).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 19682bd65..6c6b3760c 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -20,6 +20,7 @@ package org.apache.spark.shuffle.reader;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import scala.Function0;
import scala.Function1;
@@ -53,11 +54,15 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
+import org.apache.uniffle.client.request.RssReportShuffleReadMetricRequest;
+import org.apache.uniffle.client.response.RssReportShuffleReadMetricResponse;
import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.storage.handler.impl.ShuffleServerReadCostTracker;
import static
org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED;
@@ -85,6 +90,8 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
private RssConf rssConf;
private ShuffleDataDistributionType dataDistributionType;
private Supplier<ShuffleManagerClient> managerClientSupplier;
+ private ShuffleServerReadCostTracker shuffleServerReadCostTracker =
+ new ShuffleServerReadCostTracker();
public RssShuffleReader(
int startPartition,
@@ -266,6 +273,7 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
ShuffleClientFactory.getInstance()
.createShuffleReadClient(
ShuffleClientFactory.newReadBuilder()
+ .readCostTracker(shuffleServerReadCostTracker)
.appId(appId)
.shuffleId(shuffleId)
.partitionId(partition)
@@ -315,6 +323,7 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
}
while (!dataIterator.hasNext()) {
if (!iterator.hasNext()) {
+ postShuffleReadMetricsToDriver();
return false;
}
dataIterator = iterator.next();
@@ -329,4 +338,29 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
return result;
}
}
+
+ private void postShuffleReadMetricsToDriver() {
+ if (managerClientSupplier != null) {
+ ShuffleManagerClient client = managerClientSupplier.get();
+ if (client != null) {
+ RssReportShuffleReadMetricResponse response =
+ client.reportShuffleReadMetric(
+ new RssReportShuffleReadMetricRequest(
+ context.stageId(),
+ shuffleId,
+ context.taskAttemptId(),
+ shuffleServerReadCostTracker.list().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ x ->
+ new
RssReportShuffleReadMetricRequest.TaskShuffleReadMetric(
+ x.getValue().getDurationMillis(),
+ x.getValue().getReadBytes())))));
+ if (response != null && response.getStatusCode() !=
StatusCode.SUCCESS) {
+ LOG.error("Errors on reporting shuffle read metrics to driver");
+ }
+ }
+ }
+ }
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
index def9d5fa6..1542f6920 100644
---
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
+++
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
@@ -31,6 +31,7 @@ import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.IdHelper;
+import org.apache.uniffle.storage.handler.impl.ShuffleServerReadCostTracker;
public class ShuffleClientFactory {
@@ -224,6 +225,12 @@ public class ShuffleClientFactory {
private ClientType clientType;
private int retryMax;
private long retryIntervalMax;
+ private ShuffleServerReadCostTracker readCostTracker;
+
+ public ReadClientBuilder readCostTracker(ShuffleServerReadCostTracker
tracker) {
+ this.readCostTracker = tracker;
+ return this;
+ }
public ReadClientBuilder appId(String appId) {
this.appId = appId;
@@ -334,6 +341,10 @@ public class ShuffleClientFactory {
public ReadClientBuilder() {}
+ public ShuffleServerReadCostTracker getReadCostTracker() {
+ return readCostTracker;
+ }
+
public String getAppId() {
return appId;
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 6a1701c95..8383d5b66 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -48,6 +48,7 @@ import org.apache.uniffle.common.util.IdHelper;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.storage.factory.ShuffleHandlerFactory;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.ShuffleServerReadCostTracker;
import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
public class ShuffleReadClientImpl implements ShuffleReadClient {
@@ -69,6 +70,7 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
private ClientReadHandler clientReadHandler;
private IdHelper idHelper;
private BlockIdLayout blockIdLayout;
+ private ShuffleServerReadCostTracker readCostTracker;
public ShuffleReadClientImpl(ShuffleClientFactory.ReadClientBuilder builder)
{
// add default value
@@ -151,6 +153,7 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
this.idHelper = builder.getIdHelper();
this.shuffleServerInfoList = builder.getShuffleServerInfoList();
this.blockIdLayout = BlockIdLayout.from(builder.getRssConf());
+ this.readCostTracker = builder.getReadCostTracker();
CreateShuffleReadHandlerRequest request = new
CreateShuffleReadHandlerRequest();
request.setStorageType(builder.getStorageType());
@@ -173,6 +176,7 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
request.setClientType(builder.getClientType());
request.setRetryMax(builder.getRetryMax());
request.setRetryIntervalMax(builder.getRetryIntervalMax());
+ request.setReadCostTracker(readCostTracker);
if (builder.isExpectedTaskIdsBitmapFilterEnable()) {
request.useExpectedTaskIdsBitmapFilter();
}
diff --git
a/client/src/test/java/org/apache/uniffle/client/record/reader/MockedShuffleServerClient.java
b/client/src/test/java/org/apache/uniffle/client/record/reader/MockedShuffleServerClient.java
index 931f8990c..e60abea52 100644
---
a/client/src/test/java/org/apache/uniffle/client/record/reader/MockedShuffleServerClient.java
+++
b/client/src/test/java/org/apache/uniffle/client/record/reader/MockedShuffleServerClient.java
@@ -27,6 +27,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.apache.uniffle.client.api.ClientInfo;
import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.request.RssAppHeartBeatRequest;
import org.apache.uniffle.client.request.RssFinishShuffleRequest;
@@ -205,7 +206,7 @@ public class MockedShuffleServerClient implements
ShuffleServerClient {
public void close() {}
@Override
- public String getClientInfo() {
+ public ClientInfo getClientInfo() {
return null;
}
}
diff --git a/docs/client_guide/spark_client_guide.md
b/docs/client_guide/spark_client_guide.md
index c189c064d..11f072afb 100644
--- a/docs/client_guide/spark_client_guide.md
+++ b/docs/client_guide/spark_client_guide.md
@@ -180,3 +180,15 @@ We can enable this feature by using the following
configuration:
| spark.rss.client.mapSideCombine.enabled | false | Whether to enable map
side combine of shuffle writer. |
**Note**: Map side combine will handle entire map side shuffle write data,
which may cause data spills and delay shuffle writes.
+
+### Spark UI
+
+The Uniffle client’s metric statistics are now available in the Spark UI under
the Uniffle tab.
+This feature can be enabled with the following configuration (supported in
Spark 3 and above).
+
+```bash
+spark.plugins org.apache.spark.UnifflePlugin
+```
+
+To enable this feature in the Spark History Server, place the Uniffle client
JAR file into the jars directory of your Spark HOME.
+A restart of the History Server may be required for the changes to take effect.
\ No newline at end of file
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/api/ClientInfo.java
b/internal-client/src/main/java/org/apache/uniffle/client/api/ClientInfo.java
new file mode 100644
index 000000000..2170a0a27
--- /dev/null
+++
b/internal-client/src/main/java/org/apache/uniffle/client/api/ClientInfo.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.api;
+
+import org.apache.uniffle.common.ClientType;
+import org.apache.uniffle.common.ShuffleServerInfo;
+
+public class ClientInfo {
+ private ClientType clientType;
+ private ShuffleServerInfo shuffleServerInfo;
+
+ public ClientInfo(ClientType clientType, ShuffleServerInfo
shuffleServerInfo) {
+ this.clientType = clientType;
+ this.shuffleServerInfo = shuffleServerInfo;
+ }
+
+ public ClientType getClientType() {
+ return clientType;
+ }
+
+ public ShuffleServerInfo getShuffleServerInfo() {
+ return shuffleServerInfo;
+ }
+
+ @Override
+ public String toString() {
+ return "ClientInfo{"
+ + "clientType="
+ + clientType
+ + ", shuffleServerInfo="
+ + shuffleServerInfo
+ + '}';
+ }
+}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
index 3e01d6760..88aca1697 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
@@ -84,5 +84,5 @@ public interface ShuffleServerClient {
void close();
- String getClientInfo();
+ ClientInfo getClientInfo();
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 82993890e..82dd7c9b3 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -38,6 +38,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.client.api.ClientInfo;
import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.request.RetryableRequest;
import org.apache.uniffle.client.request.RssAppHeartBeatRequest;
@@ -70,10 +71,12 @@ import
org.apache.uniffle.client.response.RssStartSortMergeResponse;
import org.apache.uniffle.client.response.RssUnregisterShuffleByAppIdResponse;
import org.apache.uniffle.client.response.RssUnregisterShuffleResponse;
import org.apache.uniffle.common.BufferSegment;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.NotRetryException;
@@ -1256,8 +1259,8 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
}
@Override
- public String getClientInfo() {
- return "ShuffleServerGrpcClient for host[" + host + "], port[" + port +
"]";
+ public ClientInfo getClientInfo() {
+ return new ClientInfo(ClientType.GRPC, new ShuffleServerInfo(host, port));
}
protected void waitOrThrow(
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index e48233ddb..07e91ea9e 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -30,6 +30,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.client.api.ClientInfo;
import org.apache.uniffle.client.request.RssGetInMemoryShuffleDataRequest;
import org.apache.uniffle.client.request.RssGetShuffleDataRequest;
import org.apache.uniffle.client.request.RssGetShuffleIndexRequest;
@@ -40,7 +41,9 @@ import
org.apache.uniffle.client.response.RssGetShuffleDataResponse;
import org.apache.uniffle.client.response.RssGetShuffleIndexResponse;
import org.apache.uniffle.client.response.RssGetSortedShuffleDataResponse;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.NotRetryException;
@@ -125,14 +128,8 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
}
@Override
- public String getClientInfo() {
- return "ShuffleServerGrpcNettyClient for host["
- + host
- + "], port["
- + port
- + "], nettyPort["
- + nettyPort
- + "]";
+ public ClientInfo getClientInfo() {
+ return new ClientInfo(ClientType.GRPC_NETTY, new ShuffleServerInfo(host,
port, nettyPort));
}
@Override
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
index b5f4a6624..17d95d0d2 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
@@ -61,7 +61,7 @@ public class RssReportShuffleReadMetricRequest {
private long durationMillis;
private long byteSize;
- TaskShuffleReadMetric(long durationMillis, long byteSize) {
+ public TaskShuffleReadMetric(long durationMillis, long byteSize) {
this.durationMillis = durationMillis;
this.byteSize = byteSize;
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
index fba2d3810..84550ab1a 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
@@ -139,7 +139,8 @@ public class ShuffleHandlerFactory {
expectTaskIds,
request.getRetryMax(),
request.getRetryIntervalMax(),
- request.getPrefetchOption());
+ request.getPrefetchOption(),
+ request.getReadCostTracker());
return memoryClientReadHandler;
}
@@ -163,7 +164,8 @@ public class ShuffleHandlerFactory {
request.getExpectTaskIds(),
request.getRetryMax(),
request.getRetryIntervalMax(),
- request.getPrefetchOption());
+ request.getPrefetchOption(),
+ request.getReadCostTracker());
}
private ClientReadHandler getHadoopClientReadHandler(
@@ -184,7 +186,8 @@ public class ShuffleHandlerFactory {
request.getExpectTaskIds(),
ssi.getId(),
request.isOffHeapEnabled(),
- request.getPrefetchOption());
+ request.getPrefetchOption(),
+ request.getReadCostTracker());
}
public ShuffleDeleteHandler createShuffleDeleteHandler(
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandler.java
index ec444b2b4..a316a3028 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandler.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.StorageType;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
import org.apache.uniffle.common.util.Constants;
@@ -57,6 +58,7 @@ public class HadoopClientReadHandler extends
AbstractClientReadHandler {
private Roaring64NavigableMap expectTaskIds;
private boolean offHeapEnable = false;
private Optional<PrefetchableClientReadHandler.PrefetchOption>
prefetchOption;
+ private ShuffleServerReadCostTracker readCostTracker;
public HadoopClientReadHandler(
String appId,
@@ -74,7 +76,8 @@ public class HadoopClientReadHandler extends
AbstractClientReadHandler {
Roaring64NavigableMap expectTaskIds,
String shuffleServerId,
boolean offHeapEnable,
- Optional<PrefetchableClientReadHandler.PrefetchOption> prefetchOption) {
+ Optional<PrefetchableClientReadHandler.PrefetchOption> prefetchOption,
+ ShuffleServerReadCostTracker readCostTracker) {
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionId = partitionId;
@@ -91,6 +94,7 @@ public class HadoopClientReadHandler extends
AbstractClientReadHandler {
this.shuffleServerId = shuffleServerId;
this.offHeapEnable = offHeapEnable;
this.prefetchOption = prefetchOption;
+ this.readCostTracker = readCostTracker;
}
// Only for test
@@ -122,7 +126,8 @@ public class HadoopClientReadHandler extends
AbstractClientReadHandler {
Roaring64NavigableMap.bitmapOf(),
null,
false,
- Optional.empty());
+ Optional.empty(),
+ new ShuffleServerReadCostTracker());
}
protected void init(String fullShufflePath) {
@@ -210,8 +215,8 @@ public class HadoopClientReadHandler extends
AbstractClientReadHandler {
}
HadoopShuffleReadHandler hadoopShuffleFileReader =
readHandlers.get(readHandlerIndex);
+ long start = System.currentTimeMillis();
ShuffleDataResult shuffleDataResult =
hadoopShuffleFileReader.readShuffleData();
-
while (shuffleDataResult == null) {
++readHandlerIndex;
if (readHandlerIndex >= readHandlers.size()) {
@@ -220,7 +225,13 @@ public class HadoopClientReadHandler extends
AbstractClientReadHandler {
hadoopShuffleFileReader = readHandlers.get(readHandlerIndex);
shuffleDataResult = hadoopShuffleFileReader.readShuffleData();
}
-
+ if (readCostTracker != null) {
+ readCostTracker.record(
+ shuffleServerId,
+ StorageType.HDFS,
+ shuffleDataResult.getDataLength(),
+ System.currentTimeMillis() - start);
+ }
return shuffleDataResult;
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
index a5464678b..d06675808 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java
@@ -24,6 +24,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.client.api.ClientInfo;
import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.request.RssGetShuffleDataRequest;
import org.apache.uniffle.client.request.RssGetShuffleIndexRequest;
@@ -32,6 +33,7 @@ import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
+import org.apache.uniffle.common.StorageType;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssFetchFailedException;
@@ -42,6 +44,7 @@ public class LocalFileClientReadHandler extends
DataSkippableReadHandler {
private ShuffleServerClient shuffleServerClient;
private int retryMax;
private long retryIntervalMax;
+ private ShuffleServerReadCostTracker readCostTracker;
public LocalFileClientReadHandler(
String appId,
@@ -58,7 +61,8 @@ public class LocalFileClientReadHandler extends
DataSkippableReadHandler {
Roaring64NavigableMap expectTaskIds,
int retryMax,
long retryIntervalMax,
- Optional<PrefetchOption> prefetchOption) {
+ Optional<PrefetchOption> prefetchOption,
+ ShuffleServerReadCostTracker readCostTracker) {
super(
appId,
shuffleId,
@@ -74,6 +78,7 @@ public class LocalFileClientReadHandler extends
DataSkippableReadHandler {
this.partitionNum = partitionNum;
this.retryMax = retryMax;
this.retryIntervalMax = retryIntervalMax;
+ this.readCostTracker = readCostTracker;
}
@VisibleForTesting
@@ -103,7 +108,8 @@ public class LocalFileClientReadHandler extends
DataSkippableReadHandler {
Roaring64NavigableMap.bitmapOf(),
1,
0,
- Optional.empty());
+ Optional.empty(),
+ new ShuffleServerReadCostTracker());
}
@Override
@@ -166,9 +172,19 @@ public class LocalFileClientReadHandler extends
DataSkippableReadHandler {
retryMax,
retryIntervalMax);
try {
+ long start = System.currentTimeMillis();
RssGetShuffleDataResponse response =
shuffleServerClient.getShuffleData(request);
result =
new ShuffleDataResult(response.getShuffleData(),
shuffleDataSegment.getBufferSegments());
+
+ ClientInfo clientInfo = shuffleServerClient.getClientInfo();
+ if (readCostTracker != null && clientInfo != null) {
+ readCostTracker.record(
+ clientInfo.getShuffleServerInfo().getId(),
+ StorageType.LOCALFILE,
+ result.getDataLength(),
+ System.currentTimeMillis() - start);
+ }
} catch (Exception e) {
throw new RssException(
"Failed to read shuffle data with " +
shuffleServerClient.getClientInfo(), e);
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
index b5ec28970..b2f15e472 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
@@ -25,11 +25,13 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.client.api.ClientInfo;
import org.apache.uniffle.client.api.ShuffleServerClient;
import org.apache.uniffle.client.request.RssGetInMemoryShuffleDataRequest;
import org.apache.uniffle.client.response.RssGetInMemoryShuffleDataResponse;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.StorageType;
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.util.Constants;
@@ -41,6 +43,7 @@ public class MemoryClientReadHandler extends
PrefetchableClientReadHandler {
private Roaring64NavigableMap expectTaskIds;
private int retryMax;
private long retryIntervalMax;
+ private ShuffleServerReadCostTracker readCostTracker;
public MemoryClientReadHandler(
String appId,
@@ -51,7 +54,8 @@ public class MemoryClientReadHandler extends
PrefetchableClientReadHandler {
Roaring64NavigableMap expectTaskIds,
int retryMax,
long retryIntervalMax,
- Optional<PrefetchableClientReadHandler.PrefetchOption> prefetchOption) {
+ Optional<PrefetchableClientReadHandler.PrefetchOption> prefetchOption,
+ ShuffleServerReadCostTracker readCostTracker) {
super(prefetchOption);
this.appId = appId;
this.shuffleId = shuffleId;
@@ -61,6 +65,7 @@ public class MemoryClientReadHandler extends
PrefetchableClientReadHandler {
this.expectTaskIds = expectTaskIds;
this.retryMax = retryMax;
this.retryIntervalMax = retryIntervalMax;
+ this.readCostTracker = readCostTracker;
}
@VisibleForTesting
@@ -80,7 +85,8 @@ public class MemoryClientReadHandler extends
PrefetchableClientReadHandler {
expectTaskIds,
1,
0,
- Optional.empty());
+ Optional.empty(),
+ new ShuffleServerReadCostTracker());
}
@Override
@@ -99,9 +105,18 @@ public class MemoryClientReadHandler extends
PrefetchableClientReadHandler {
retryIntervalMax);
try {
+ long start = System.currentTimeMillis();
RssGetInMemoryShuffleDataResponse response =
shuffleServerClient.getInMemoryShuffleData(request);
result = new ShuffleDataResult(response.getData(),
response.getBufferSegments());
+ ClientInfo clientInfo = shuffleServerClient.getClientInfo();
+ if (readCostTracker != null && clientInfo != null) {
+ readCostTracker.record(
+ clientInfo.getShuffleServerInfo().getId(),
+ StorageType.MEMORY,
+ result.getDataLength(),
+ System.currentTimeMillis() - start);
+ }
} catch (RssFetchFailedException e) {
throw e;
} catch (Exception e) {
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCost.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCost.java
new file mode 100644
index 000000000..e8920108c
--- /dev/null
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCost.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.uniffle.common.StorageType;
+
+public class ShuffleServerReadCost {
+ private final String shuffleServerId;
+ private final AtomicLong durationMillis;
+ private final AtomicLong readBytes;
+
+ // hybrid storage statistics
+ private final AtomicLong memoryReadBytes;
+ private final AtomicLong memoryReadDurationMillis;
+
+ private final AtomicLong localfileReadBytes;
+ private final AtomicLong localfileReadDurationMillis;
+
+ private final AtomicLong hadoopReadLocalFileBytes;
+ private final AtomicLong hadoopReadLocalFileDurationMillis;
+
+ public ShuffleServerReadCost(String shuffleServerId) {
+ this.shuffleServerId = shuffleServerId;
+
+ this.durationMillis = new AtomicLong(0);
+ this.readBytes = new AtomicLong(0);
+
+ this.memoryReadBytes = new AtomicLong(0);
+ this.memoryReadDurationMillis = new AtomicLong(0);
+
+ this.localfileReadBytes = new AtomicLong(0);
+ this.localfileReadDurationMillis = new AtomicLong(0);
+
+ this.hadoopReadLocalFileBytes = new AtomicLong(0);
+ this.hadoopReadLocalFileDurationMillis = new AtomicLong(0);
+ }
+
+ public void inc(StorageType storageType, long bytes, long durationMillis) {
+ this.durationMillis.addAndGet(durationMillis);
+ this.readBytes.addAndGet(bytes);
+
+ switch (storageType) {
+ case MEMORY:
+ this.memoryReadBytes.addAndGet(bytes);
+ this.memoryReadDurationMillis.addAndGet(durationMillis);
+ break;
+ case LOCALFILE:
+ this.localfileReadBytes.addAndGet(bytes);
+ this.localfileReadDurationMillis.addAndGet(durationMillis);
+ break;
+ case HDFS:
+ this.hadoopReadLocalFileBytes.addAndGet(bytes);
+ this.hadoopReadLocalFileDurationMillis.addAndGet(durationMillis);
+ break;
+ default:
+ break;
+ }
+ }
+
+ public long getDurationMillis() {
+ return durationMillis.get();
+ }
+
+ public long getReadBytes() {
+ return readBytes.get();
+ }
+}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCostTracker.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCostTracker.java
new file mode 100644
index 000000000..015257361
--- /dev/null
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCostTracker.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler.impl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.StorageType;
+
+public class ShuffleServerReadCostTracker {
+ private static final Logger LOG =
LoggerFactory.getLogger(ShuffleServerReadCostTracker.class);
+
+ private Map<String, ShuffleServerReadCost> tracking;
+
+ public ShuffleServerReadCostTracker() {
+ this.tracking = new ConcurrentHashMap<>();
+ }
+
+ public void record(String serverId, StorageType storageType, long bytes,
long durationMillis) {
+ if (serverId == null) {
+ return;
+ }
+ ShuffleServerReadCost readCost =
+ tracking.computeIfAbsent(serverId, x -> new ShuffleServerReadCost(x));
+ readCost.inc(storageType, bytes, durationMillis);
+ }
+
+ public Map<String, ShuffleServerReadCost> list() {
+ return tracking;
+ }
+}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
index 9a36e4ad0..145f93cab 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
@@ -31,6 +31,7 @@ import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.IdHelper;
import org.apache.uniffle.storage.handler.impl.PrefetchableClientReadHandler;
+import org.apache.uniffle.storage.handler.impl.ShuffleServerReadCostTracker;
public class CreateShuffleReadHandlerRequest {
@@ -55,6 +56,7 @@ public class CreateShuffleReadHandlerRequest {
private boolean expectedTaskIdsBitmapFilterEnable;
private boolean offHeapEnabled;
private RssConf clientConf;
+ private ShuffleServerReadCostTracker readCostTracker;
private IdHelper idHelper;
@@ -246,6 +248,14 @@ public class CreateShuffleReadHandlerRequest {
this.clientType = clientType;
}
+ public ShuffleServerReadCostTracker getReadCostTracker() {
+ return readCostTracker;
+ }
+
+ public void setReadCostTracker(ShuffleServerReadCostTracker readCostTracker)
{
+ this.readCostTracker = readCostTracker;
+ }
+
public Optional<PrefetchableClientReadHandler.PrefetchOption>
getPrefetchOption() {
if (clientConf.get(RssClientConf.RSS_CLIENT_PREFETCH_ENABLED)) {
return Optional.of(