This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 575eee05c [#2460] feat(spark3): Add client/uniffle different observed
shuffle speed (#2500)
575eee05c is described below
commit 575eee05cfc5d9d1e42d35fdee63e61efe878095
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Jun 13 11:55:28 2025 +0800
[#2460] feat(spark3): Add client/uniffle different observed shuffle speed
(#2500)
### What changes were proposed in this pull request?
Add client/uniffle different observed shuffle speed

### Why are the changes needed?
followup #2460
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Neen't
---
.../scala/org/apache/spark/ui/ShufflePage.scala | 63 +++++++++++++++++-----
1 file changed, 51 insertions(+), 12 deletions(-)
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index 22422d423..d53e0e8d0 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -24,7 +24,7 @@ import org.apache.spark.{AggregatedShuffleMetric,
AggregatedShuffleReadMetric, A
import java.util.concurrent.ConcurrentHashMap
import javax.servlet.http.HttpServletRequest
-import scala.collection.JavaConverters.mapAsScalaMapConverter
+import scala.collection.JavaConverters.{collectionAsScalaIterableConverter,
mapAsScalaMapConverter}
import scala.xml.{Node, NodeSeq}
class ShufflePage(parent: ShuffleTab) extends WebUIPage("") with Logging {
@@ -102,11 +102,23 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
AggregatedTaskInfoUIData(0, 0, 0, 0)
else
aggTaskInfo
- val percent =
+ val percent = {
if (taskInfo.cpuTimeMillis == 0)
0
- else
+ else {
(taskInfo.shuffleWriteMillis + taskInfo.shuffleReadMillis).toDouble /
taskInfo.cpuTimeMillis
+ }
+ }
+ // speed unit is MB/sec
+ val clientObservedWriteAvgSpeed = if (aggTaskInfo.shuffleWriteMillis == 0)
0 else {
+ roundToTwoDecimals(aggTaskInfo.shuffleBytes.toDouble /
aggTaskInfo.shuffleWriteMillis / 1000)
+ }
+ val clientObservedReadAvgSpeed = if (aggTaskInfo.shuffleReadMillis == 0) 0
else {
+ roundToTwoDecimals(aggTaskInfo.shuffleBytes.toDouble /
aggTaskInfo.shuffleReadMillis / 1000)
+ }
+
+ val uniffleWriteAvgSpeed =
calculateSpeed(originWriteMetric.metrics.values().asScala.toSeq)
+ val uniffleReadAvgSpeed =
calculateSpeed(originReadMetric.metrics.values().asScala.toSeq)
// render build info
val buildInfo = runtimeStatusStore.buildInfo()
@@ -244,19 +256,32 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
<div>
<div>
<ul class="list-unstyled">
- <li id="completed-summary" data-relingo-block="true">
+ <li>
<a>
<strong>Total shuffle bytes:</strong>
</a>
{Utils.bytesToString(taskInfo.shuffleBytes)}
- </li><li data-relingo-block="true">
- <a>
- <strong>Shuffle Duration (write+read) / Task Duration:</strong>
- </a>
- {UIUtils.formatDuration(taskInfo.shuffleWriteMillis +
taskInfo.shuffleReadMillis)}
-
({UIUtils.formatDuration(taskInfo.shuffleWriteMillis)}+{UIUtils.formatDuration(taskInfo.shuffleReadMillis)})
- / {UIUtils.formatDuration(taskInfo.cpuTimeMillis)} =
{roundToTwoDecimals(percent)}
- </li>
+ </li>
+ <li>
+ <a>
+ <strong>Shuffle Duration (write+read) / Task Duration:</strong>
+ </a>
+ {UIUtils.formatDuration(taskInfo.shuffleWriteMillis +
taskInfo.shuffleReadMillis)}
+
({UIUtils.formatDuration(taskInfo.shuffleWriteMillis)}+{UIUtils.formatDuration(taskInfo.shuffleReadMillis)})
+ / {UIUtils.formatDuration(taskInfo.cpuTimeMillis)} =
{roundToTwoDecimals(percent)}
+ </li>
+ <li>
+ <a>
+ <strong>Client Observed Speed (Write/Read) MB/s:</strong>
+ </a>
+ {clientObservedWriteAvgSpeed} / {clientObservedReadAvgSpeed}
+ </li>
+ <li>
+ <a>
+ <strong>Uniffle Speed (Write/Read) MB/s:</strong>
+ </a>
+ {uniffleWriteAvgSpeed} / {uniffleReadAvgSpeed}
+ </li>
</ul>
</div>
@@ -356,6 +381,20 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
UIUtils.headerSparkPage(request, "Uniffle", summary, parent)
}
+ private def calculateSpeed(metrics: Seq[AggregatedShuffleMetric]): Double = {
+ if (metrics == null || metrics.isEmpty) {
+ 0.0
+ } else {
+ val totalBytes = metrics.map(_.byteSize).sum
+ val totalDuration = metrics.map(_.durationMillis).sum
+ if (totalDuration == 0) {
+ 0.0
+ } else {
+ roundToTwoDecimals(totalBytes.toDouble / totalDuration / 1000)
+ }
+ }
+ }
+
private def roundToTwoDecimals(value: Double): Double = {
BigDecimal(value).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble
}