This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 057dcd227 [#2460] feat(spark)(part-4): Hybrid storage reading
statistics (#2468)
057dcd227 is described below
commit 057dcd227286cbfc409b6cfc5787e6057190119b
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Apr 30 13:42:32 2025 +0800
[#2460] feat(spark)(part-4): Hybrid storage reading statistics (#2468)
### What changes were proposed in this pull request?
1. Add support of hybrid storage reading statistics

### Why are the changes needed?
followup #2460
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal tests.
---
.../spark/shuffle/events/ShuffleReadMetric.java | 52 ++++++++++++++++++-
.../shuffle/manager/ShuffleManagerGrpcService.java | 9 +++-
.../scala/org/apache/spark/UniffleListener.scala | 16 ++++--
.../org/apache/spark/UniffleStatusStore.scala | 10 +++-
.../scala/org/apache/spark/ui/ShufflePage.scala | 47 +++++++++++++++++
.../spark/shuffle/reader/RssShuffleReader.java | 8 ++-
.../request/RssReportShuffleReadMetricRequest.java | 59 +++++++++++++++++++++-
proto/src/main/proto/Rss.proto | 9 ++++
.../handler/impl/ShuffleServerReadCost.java | 24 +++++++++
9 files changed, 226 insertions(+), 8 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleReadMetric.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleReadMetric.java
index 1cb54c69f..dbbd7d463 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleReadMetric.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleReadMetric.java
@@ -18,7 +18,57 @@
package org.apache.spark.shuffle.events;
public class ShuffleReadMetric extends ShuffleMetric {
- public ShuffleReadMetric(long durationMillis, long byteSize) {
+ private final long memoryDurationMillis;
+ private final long memoryByteSize;
+
+ private final long localfileDurationMillis;
+ private final long localfileByteSize;
+
+ private final long hadoopDurationMillis;
+ private final long hadoopByteSize;
+
+ public ShuffleReadMetric(
+ long durationMillis,
+ long byteSize,
+ long memoryDurationMillis,
+ long memoryByteSize,
+ long localfileDurationMillis,
+ long localfileByteSize,
+ long hadoopDurationMillis,
+ long hadoopByteSize) {
super(durationMillis, byteSize);
+
+ this.memoryDurationMillis = memoryDurationMillis;
+ this.memoryByteSize = memoryByteSize;
+
+ this.localfileDurationMillis = localfileDurationMillis;
+ this.localfileByteSize = localfileByteSize;
+
+ this.hadoopDurationMillis = hadoopDurationMillis;
+ this.hadoopByteSize = hadoopByteSize;
+ }
+
+ public long getMemoryDurationMillis() {
+ return memoryDurationMillis;
+ }
+
+ public long getMemoryByteSize() {
+ return memoryByteSize;
+ }
+
+ public long getLocalfileDurationMillis() {
+ return localfileDurationMillis;
+ }
+
+ public long getLocalfileByteSize() {
+ return localfileByteSize;
+ }
+
+ public long getHadoopDurationMillis() {
+ return hadoopDurationMillis;
+ }
+
+ public long getHadoopByteSize() {
+ return hadoopByteSize;
}
}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 2ba39eac1..213f1a774 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -757,7 +757,14 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
Map.Entry::getKey,
x ->
new ShuffleReadMetric(
- x.getValue().getDurationMillis(),
x.getValue().getByteSize()))));
+ x.getValue().getDurationMillis(),
+ x.getValue().getByteSize(),
+ x.getValue().getMemoryDurationMillis(),
+ x.getValue().getMemoryByteSize(),
+ x.getValue().getLocalfileDurationMillis(),
+ x.getValue().getLocalfileByteSize(),
+ x.getValue().getHadoopDurationMillis(),
+ x.getValue().getHadoopByteSize()))));
RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(event);
RssProtos.ReportShuffleReadMetricResponse reply =
RssProtos.ReportShuffleReadMetricResponse.newBuilder()
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
index 4d3b19521..8c84593ea 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
@@ -105,9 +105,19 @@ class UniffleListener(conf: SparkConf, kvstore:
ElementTrackingStore)
val metrics = event.getMetrics
for (metric <- metrics.asScala) {
val id = metric._1
- val agg_metric = this.aggregatedShuffleReadMetric.computeIfAbsent(id, _
=> new AggregatedShuffleReadMetric(0, 0))
- agg_metric.byteSize += metric._2.getByteSize
- agg_metric.durationMillis += metric._2.getDurationMillis
+ val rmetric = metric._2
+ val agg_metric = this.aggregatedShuffleReadMetric.computeIfAbsent(id, _
=> new AggregatedShuffleReadMetric(0, 0, 0, 0, 0, 0, 0, 0))
+ agg_metric.byteSize += rmetric.getByteSize
+ agg_metric.durationMillis += rmetric.getDurationMillis
+
+ agg_metric.memoryByteSize += rmetric.getMemoryByteSize
+ agg_metric.memoryDurationMills += rmetric.getMemoryDurationMillis
+
+ agg_metric.localfileDurationMillis += rmetric.getLocalfileDurationMillis
+ agg_metric.localfileByteSize += rmetric.getLocalfileByteSize
+
+ agg_metric.hadoopByteSize += rmetric.getHadoopByteSize
+ agg_metric.hadoopDurationMillis += rmetric.getHadoopDurationMillis
}
}
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
index b7578d25f..021311981 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
@@ -113,7 +113,15 @@ class AggregatedShuffleReadMetricsUIData(val metrics:
ConcurrentHashMap[String,
@KVIndex
def id: String = classOf[AggregatedShuffleReadMetricsUIData].getName()
}
-class AggregatedShuffleReadMetric(durationMillis: Long, byteSize: Long)
+
+class AggregatedShuffleReadMetric(durationMillis: Long,
+ byteSize: Long,
+ var memoryDurationMills: Long,
+ var memoryByteSize: Long,
+ var localfileDurationMillis: Long,
+ var localfileByteSize: Long,
+ var hadoopDurationMillis: Long,
+ var hadoopByteSize: Long)
extends AggregatedShuffleMetric(durationMillis, byteSize)
// task total cpu time
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index cef2c4297..32864da90 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -143,6 +143,40 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
fixedWidth = true
)
+ // render reading hybrid storage statistics
+ val readMetrics = originReadMetric.metrics
+ val aggregatedByStorage = readMetrics.asScala.values
+ .flatMap { metric =>
+ Seq(
+ ("MEMORY", metric.memoryByteSize, metric.memoryDurationMills),
+ ("LOCALFILE", metric.localfileByteSize,
metric.localfileDurationMillis),
+ ("HADOOP", metric.hadoopByteSize, metric.hadoopDurationMillis)
+ )
+ }
+ .groupBy(_._1)
+ .mapValues { values =>
+ val totalBytes = values.map(_._2).sum
+ val totalTime = values.map(_._3).sum
+ val speed = if (totalTime != 0) totalBytes.toDouble / totalTime / 1000
else 0L
+ (totalBytes, totalTime, speed)
+ }
+ .toSeq
+ val readTableUI = UIUtils.listingTable(
+ Seq("Storage Type", "Read Bytes", "Read Time", "Read Speed (MB/sec)"),
+ { row: (String, Long, Long, Double) =>
+ <tr>
+ <td>{row._1}</td>
+ <td>{Utils.bytesToString(row._2)}</td>
+ <td>{UIUtils.formatDuration(row._3)}</td>
+ <td>{roundToTwoDecimals(row._4)}</td>
+ </tr>
+ },
+ aggregatedByStorage.map { case (storageType, (bytes, time, speed)) =>
+ (storageType, bytes, time, speed)
+ },
+ fixedWidth = true
+ )
+
// render assignment info
val assignmentInfos = runtimeStatusStore.assignmentInfos
val assignmentTableUI = UIUtils.listingTable(
@@ -211,6 +245,19 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
</div>
</div>
+ <div>
+ <span class="collapse-read-throughput-properties collapse-table"
+ onClick="collapseTable('collapse-read-throughput-properties',
'read-statistics-table')">
+ <h4>
+ <span class="collapse-table-arrow arrow-closed"></span>
+ <a>Hybrid Storage Read Statistics</a>
+ </h4>
+ </span>
+ <div class="read-statistics-table collapsible-table collapsed">
+ {readTableUI}
+ </div>
+ </div>
+
<div>
<span class="collapse-server-properties collapse-table"
onClick="collapseTable('collapse-server-properties',
'all-servers-table')">
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 6c6b3760c..a1c345e84 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -356,7 +356,13 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
x ->
new
RssReportShuffleReadMetricRequest.TaskShuffleReadMetric(
x.getValue().getDurationMillis(),
- x.getValue().getReadBytes())))));
+ x.getValue().getReadBytes(),
+
x.getValue().getMemoryReadDurationMillis(),
+ x.getValue().getMemoryReadBytes(),
+
x.getValue().getLocalfileReadDurationMillis(),
+ x.getValue().getLocalfileReadBytes(),
+
x.getValue().getHadoopReadLocalFileDurationMillis(),
+
x.getValue().getHadoopReadLocalFileBytes())))));
if (response != null && response.getStatusCode() !=
StatusCode.SUCCESS) {
LOG.error("Errors on reporting shuffle read metrics to driver");
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
index 17d95d0d2..f1faebe0e 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
@@ -53,6 +53,13 @@ public class RssReportShuffleReadMetricRequest {
RssProtos.ShuffleReadMetric.newBuilder()
.setByteSize(x.getValue().getByteSize())
.setDurationMillis(x.getValue().getDurationMillis())
+
.setMemoryByteSize(x.getValue().getMemoryByteSize())
+
.setMemoryDurationMillis(x.getValue().getMemoryDurationMillis())
+
.setLocalfileByteSize(x.getValue().getLocalfileByteSize())
+ .setLocalfileDurationMillis(
+ x.getValue().getLocalfileDurationMillis())
+
.setHadoopByteSize(x.getValue().getHadoopByteSize())
+
.setHadoopDurationMillis(x.getValue().getHadoopDurationMillis())
.build())));
return builder.build();
}
@@ -61,9 +68,35 @@ public class RssReportShuffleReadMetricRequest {
private long durationMillis;
private long byteSize;
- public TaskShuffleReadMetric(long durationMillis, long byteSize) {
+ private long localfileByteSize;
+ private long localfileDurationMillis;
+
+ private long memoryByteSize;
+ private long memoryDurationMillis;
+
+ private long hadoopByteSize;
+ private long hadoopDurationMillis;
+
+ public TaskShuffleReadMetric(
+ long durationMillis,
+ long byteSize,
+ long memoryReadDurationMillis,
+ long memoryReadBytes,
+ long localfileReadDurationMillis,
+ long localfileReadBytes,
+ long hadoopReadLocalFileDurationMillis,
+ long hadoopReadLocalFileBytes) {
this.durationMillis = durationMillis;
this.byteSize = byteSize;
+
+ this.localfileByteSize = localfileReadBytes;
+ this.localfileDurationMillis = localfileReadDurationMillis;
+
+ this.memoryByteSize = memoryReadBytes;
+ this.memoryDurationMillis = memoryReadDurationMillis;
+
+ this.hadoopByteSize = hadoopReadLocalFileBytes;
+ this.hadoopDurationMillis = hadoopReadLocalFileDurationMillis;
}
public long getDurationMillis() {
@@ -73,5 +106,29 @@ public class RssReportShuffleReadMetricRequest {
public long getByteSize() {
return byteSize;
}
+
+ public long getLocalfileByteSize() {
+ return localfileByteSize;
+ }
+
+ public long getLocalfileDurationMillis() {
+ return localfileDurationMillis;
+ }
+
+ public long getMemoryByteSize() {
+ return memoryByteSize;
+ }
+
+ public long getMemoryDurationMillis() {
+ return memoryDurationMillis;
+ }
+
+ public long getHadoopByteSize() {
+ return hadoopByteSize;
+ }
+
+ public long getHadoopDurationMillis() {
+ return hadoopDurationMillis;
+ }
}
}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index e47c51e13..4e5317587 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -604,6 +604,15 @@ message ShuffleWriteMetric {
message ShuffleReadMetric {
int64 durationMillis = 1;
int64 byteSize = 2;
+
+ int64 memoryDurationMillis = 3;
+ int64 memoryByteSize = 4;
+
+ int64 localfileDurationMillis = 5;
+ int64 localfileByteSize = 6;
+
+ int64 hadoopDurationMillis = 7;
+ int64 hadoopByteSize = 8;
}
message ReportShuffleWriteMetricResponse {
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCost.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCost.java
index e8920108c..2c5e7fc4e 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCost.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleServerReadCost.java
@@ -81,4 +81,28 @@ public class ShuffleServerReadCost {
public long getReadBytes() {
return readBytes.get();
}
+
+ public long getMemoryReadBytes() {
+ return memoryReadBytes.get();
+ }
+
+ public long getMemoryReadDurationMillis() {
+ return memoryReadDurationMillis.get();
+ }
+
+ public long getLocalfileReadBytes() {
+ return localfileReadBytes.get();
+ }
+
+ public long getHadoopReadLocalFileBytes() {
+ return hadoopReadLocalFileBytes.get();
+ }
+
+ public long getLocalfileReadDurationMillis() {
+ return localfileReadDurationMillis.get();
+ }
+
+ public long getHadoopReadLocalFileDurationMillis() {
+ return hadoopReadLocalFileDurationMillis.get();
+ }
}