This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 575eee05c [#2460] feat(spark3): Add client/uniffle different observed 
shuffle speed (#2500)
575eee05c is described below

commit 575eee05cfc5d9d1e42d35fdee63e61efe878095
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Jun 13 11:55:28 2025 +0800

    [#2460] feat(spark3): Add client/uniffle different observed shuffle speed 
(#2500)
    
    ### What changes were proposed in this pull request?
    
    Add client/uniffle different observed shuffle speed
    
    
![image](https://github.com/user-attachments/assets/1383eadd-0462-44fe-8fe6-8d1bf46fffa2)
    
    ### Why are the changes needed?
    
    followup #2460
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Neen't
---
 .../scala/org/apache/spark/ui/ShufflePage.scala    | 63 +++++++++++++++++-----
 1 file changed, 51 insertions(+), 12 deletions(-)

diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index 22422d423..d53e0e8d0 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -24,7 +24,7 @@ import org.apache.spark.{AggregatedShuffleMetric, 
AggregatedShuffleReadMetric, A
 
 import java.util.concurrent.ConcurrentHashMap
 import javax.servlet.http.HttpServletRequest
-import scala.collection.JavaConverters.mapAsScalaMapConverter
+import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, 
mapAsScalaMapConverter}
 import scala.xml.{Node, NodeSeq}
 
 class ShufflePage(parent: ShuffleTab) extends WebUIPage("") with Logging {
@@ -102,11 +102,23 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
         AggregatedTaskInfoUIData(0, 0, 0, 0)
       else
         aggTaskInfo
-    val percent =
+    val percent = {
       if (taskInfo.cpuTimeMillis == 0)
         0
-      else
+      else {
         (taskInfo.shuffleWriteMillis + taskInfo.shuffleReadMillis).toDouble / 
taskInfo.cpuTimeMillis
+      }
+    }
+    // speed unit is MB/sec
+    val clientObservedWriteAvgSpeed = if (aggTaskInfo.shuffleWriteMillis == 0) 
0 else {
+      roundToTwoDecimals(aggTaskInfo.shuffleBytes.toDouble / 
aggTaskInfo.shuffleWriteMillis / 1000)
+    }
+    val clientObservedReadAvgSpeed = if (aggTaskInfo.shuffleReadMillis == 0) 0 
else {
+      roundToTwoDecimals(aggTaskInfo.shuffleBytes.toDouble / 
aggTaskInfo.shuffleReadMillis / 1000)
+    }
+
+    val uniffleWriteAvgSpeed = 
calculateSpeed(originWriteMetric.metrics.values().asScala.toSeq)
+    val uniffleReadAvgSpeed = 
calculateSpeed(originReadMetric.metrics.values().asScala.toSeq)
 
     // render build info
     val buildInfo = runtimeStatusStore.buildInfo()
@@ -244,19 +256,32 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
       <div>
         <div>
           <ul class="list-unstyled">
-            <li id="completed-summary" data-relingo-block="true">
+            <li>
               <a>
                 <strong>Total shuffle bytes:</strong>
               </a>
               {Utils.bytesToString(taskInfo.shuffleBytes)}
-            </li><li data-relingo-block="true">
-            <a>
-              <strong>Shuffle Duration (write+read) / Task Duration:</strong>
-            </a>
-            {UIUtils.formatDuration(taskInfo.shuffleWriteMillis + 
taskInfo.shuffleReadMillis)}
-            
({UIUtils.formatDuration(taskInfo.shuffleWriteMillis)}+{UIUtils.formatDuration(taskInfo.shuffleReadMillis)})
-            / {UIUtils.formatDuration(taskInfo.cpuTimeMillis)} = 
{roundToTwoDecimals(percent)}
-          </li>
+            </li>
+            <li>
+              <a>
+                <strong>Shuffle Duration (write+read) / Task Duration:</strong>
+              </a>
+              {UIUtils.formatDuration(taskInfo.shuffleWriteMillis + 
taskInfo.shuffleReadMillis)}
+              
({UIUtils.formatDuration(taskInfo.shuffleWriteMillis)}+{UIUtils.formatDuration(taskInfo.shuffleReadMillis)})
+              / {UIUtils.formatDuration(taskInfo.cpuTimeMillis)} = 
{roundToTwoDecimals(percent)}
+            </li>
+            <li>
+              <a>
+                <strong>Client Observed Speed (Write/Read) MB/s:</strong>
+              </a>
+              {clientObservedWriteAvgSpeed} / {clientObservedReadAvgSpeed}
+            </li>
+            <li>
+              <a>
+                <strong>Uniffle Speed (Write/Read) MB/s:</strong>
+              </a>
+              {uniffleWriteAvgSpeed} / {uniffleReadAvgSpeed}
+            </li>
           </ul>
         </div>
 
@@ -356,6 +381,20 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
     UIUtils.headerSparkPage(request, "Uniffle", summary, parent)
   }
 
+  private def calculateSpeed(metrics: Seq[AggregatedShuffleMetric]): Double = {
+    if (metrics == null || metrics.isEmpty) {
+      0.0
+    } else {
+      val totalBytes = metrics.map(_.byteSize).sum
+      val totalDuration = metrics.map(_.durationMillis).sum
+      if (totalDuration == 0) {
+        0.0
+      } else {
+        roundToTwoDecimals(totalBytes.toDouble / totalDuration / 1000)
+      }
+    }
+  }
+
   private def roundToTwoDecimals(value: Double): Double = {
     BigDecimal(value).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble
   }

Reply via email to