This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 3e681e394 [#2460] feat(spark)(part-2): Add support of history server
plugin (#2464)
3e681e394 is described below
commit 3e681e3949100aca9c6cbd1a732ea677783f366e
Author: Junfan Zhang <[email protected]>
AuthorDate: Sun Apr 27 13:57:43 2025 +0800
[#2460] feat(spark)(part-2): Add support of history server plugin (#2464)
### What changes were proposed in this pull request?
1. Add support of history server plugin
2. Optimize UI performance by more memory cache and less status store access
3. Add more infos and correct partial metric unit show
4. Enable shuffle manager rpc when UI plugin is enabled
### Why are the changes needed?
followup for #2459
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal tests.

---
.../apache/spark/shuffle/RssSparkShuffleUtils.java | 11 ++
.../shuffle/manager/RssShuffleManagerBase.java | 6 +-
client-spark/extension/pom.xml | 2 +-
.../org.apache.spark.status.AppHistoryServerPlugin | 1 +
.../apache/spark/UniffleHistoryServerPlugin.scala | 37 ++++
.../scala/org/apache/spark/UniffleListener.scala | 75 +++++--
.../scala/org/apache/spark/UnifflePlugin.scala | 3 +-
.../org/apache/spark/UniffleStatusStore.scala | 78 ++++++--
.../scala/org/apache/spark/ui/ShufflePage.scala | 217 +++++++++++++--------
pom.xml | 1 +
10 files changed, 309 insertions(+), 122 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
index 91ce4de50..c37a93623 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
@@ -393,4 +393,15 @@ public class RssSparkShuffleUtils {
}
return rssFetchFailedException;
}
+
+ public static boolean isSparkUIEnabled(SparkConf conf) {
+ String rawPlugins = conf.get("spark.plugins", null);
+ if (StringUtils.isEmpty(rawPlugins)) {
+ return false;
+ }
+ if (rawPlugins.contains("UnifflePlugin")) {
+ return true;
+ }
+ return false;
+ }
}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index fcfc72818..cc3d90364 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -107,6 +107,7 @@ import static
org.apache.spark.shuffle.RssSparkConfig.RSS_BLOCK_ID_SELF_MANAGEME
import static
org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_MAX_REASSIGNMENT_SERVER_NUM;
import static
org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED;
import static
org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED;
+import static org.apache.spark.shuffle.RssSparkShuffleUtils.isSparkUIEnabled;
import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT;
import static
org.apache.uniffle.common.config.RssClientConf.HADOOP_CONFIG_KEY_PREFIX;
import static
org.apache.uniffle.common.config.RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE;
@@ -283,7 +284,10 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
}
this.blockIdSelfManagedEnabled =
rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
this.shuffleManagerRpcServiceEnabled =
- partitionReassignEnabled || rssStageRetryEnabled ||
blockIdSelfManagedEnabled;
+ partitionReassignEnabled
+ || rssStageRetryEnabled
+ || blockIdSelfManagedEnabled
+ || isSparkUIEnabled(conf);
if (isDriver) {
heartBeatScheduledExecutorService =
diff --git a/client-spark/extension/pom.xml b/client-spark/extension/pom.xml
index 77c290484..ff0511eba 100644
--- a/client-spark/extension/pom.xml
+++ b/client-spark/extension/pom.xml
@@ -75,7 +75,7 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
- <version>3.2.2</version>
+ <version>${scala.maven.plugin.version}</version>
<executions>
<execution>
<goals>
diff --git
a/client-spark/extension/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
b/client-spark/extension/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
new file mode 100644
index 000000000..1344947eb
--- /dev/null
+++
b/client-spark/extension/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
@@ -0,0 +1 @@
+org.apache.spark.UniffleHistoryServerPlugin
\ No newline at end of file
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleHistoryServerPlugin.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleHistoryServerPlugin.scala
new file mode 100644
index 000000000..307096492
--- /dev/null
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleHistoryServerPlugin.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.scheduler.SparkListener
+import org.apache.spark.status.{AppHistoryServerPlugin, ElementTrackingStore}
+import org.apache.spark.ui.{ShuffleTab, SparkUI}
+
+class UniffleHistoryServerPlugin extends AppHistoryServerPlugin {
+
+ override def createListeners(conf: SparkConf, store: ElementTrackingStore):
Seq[SparkListener] = {
+ Seq(new UniffleListener(conf, store))
+ }
+
+ override def setupUI(ui: SparkUI): Unit = {
+ val store = new UniffleStatusStore(ui.store.store)
+ new ShuffleTab(
+ store,
+ ui
+ )
+ }
+}
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
index e674439f1..5930a7c01 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
@@ -18,13 +18,55 @@
package org.apache.spark
import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent,
SparkListenerJobEnd, SparkListenerTaskEnd}
import org.apache.spark.shuffle.events.{ShuffleAssignmentInfoEvent,
TaskShuffleReadInfoEvent, TaskShuffleWriteInfoEvent}
import org.apache.spark.status.ElementTrackingStore
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicLong
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+
class UniffleListener(conf: SparkConf, kvstore: ElementTrackingStore)
extends SparkListener with Logging {
+ private val aggregatedShuffleWriteMetric = new ConcurrentHashMap[String,
AggregatedShuffleWriteMetric]
+ private val aggregatedShuffleReadMetric = new ConcurrentHashMap[String,
AggregatedShuffleReadMetric]
+ private val totalTaskCpuTime = new AtomicLong(0)
+
+ private val updateIntervalMillis = 5000
+ private var updateLastTimeMillis: Long = -1
+
+ // Using the async interval update to reduce state store pressure
+ private def mayUpdate(force: Boolean): Unit = {
+ val now = System.currentTimeMillis()
+ if (force || now - updateLastTimeMillis > updateIntervalMillis) {
+ updateLastTimeMillis = now
+ kvstore.write(
+ new
AggregatedShuffleWriteMetricsUIData(this.aggregatedShuffleWriteMetric)
+ )
+ kvstore.write(
+ new
AggregatedShuffleReadMetricsUIData(this.aggregatedShuffleReadMetric)
+ )
+ kvstore.write(
+ TotalTaskCpuTime(totalTaskCpuTime.get())
+ )
+ }
+ }
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+ this.mayUpdate(false)
+ if (taskEnd.taskMetrics.shuffleReadMetrics.recordsRead > 0
+ || taskEnd.taskMetrics.shuffleWriteMetrics.recordsWritten > 0) {
+ totalTaskCpuTime.addAndGet(
+ taskEnd.taskInfo.duration
+ )
+ }
+ }
+
+ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+ this.mayUpdate(true)
+ }
+
private def onBuildInfo(event: BuildInfoEvent): Unit = {
val uiData = new BuildInfoUIData(event.info.toSeq.sortBy(_._1))
kvstore.write(uiData)
@@ -40,25 +82,24 @@ class UniffleListener(conf: SparkConf, kvstore:
ElementTrackingStore)
}
private def onTaskShuffleWriteInfo(event: TaskShuffleWriteInfoEvent): Unit =
{
- kvstore.write(
- new TaskShuffleWriteMetricUIData(
- event.getStageId,
- event.getShuffleId,
- event.getTaskId,
- event.getMetrics
- )
- )
+ val metrics = event.getMetrics
+ for (metric <- metrics.asScala) {
+ val id = metric._1
+ val agg_metric = this.aggregatedShuffleWriteMetric.computeIfAbsent(id, _
=> new AggregatedShuffleWriteMetric(0, 0))
+ agg_metric.byteSize += metric._2.getByteSize
+ agg_metric.durationMillis += metric._2.getDurationMillis
+
+ }
}
private def onTaskShuffleReadInfo(event: TaskShuffleReadInfoEvent): Unit = {
- kvstore.write(
- new TaskShuffleReadMetricUIData(
- event.getStageId,
- event.getShuffleId,
- event.getTaskId,
- event.getMetrics
- )
- )
+ val metrics = event.getMetrics
+ for (metric <- metrics.asScala) {
+ val id = metric._1
+ val agg_metric = this.aggregatedShuffleReadMetric.computeIfAbsent(id, _
=> new AggregatedShuffleReadMetric(0, 0))
+ agg_metric.byteSize += metric._2.getByteSize
+ agg_metric.durationMillis += metric._2.getDurationMillis
+ }
}
override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UnifflePlugin.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UnifflePlugin.scala
index 8a59dbf47..dd6d91031 100644
--- a/client-spark/extension/src/main/scala/org/apache/spark/UnifflePlugin.scala
+++ b/client-spark/extension/src/main/scala/org/apache/spark/UnifflePlugin.scala
@@ -37,6 +37,7 @@ private class UniffleDriverPlugin extends DriverPlugin with
Logging {
private var _sc: Option[SparkContext] = None
override def init(sc: SparkContext, pluginContext: PluginContext):
java.util.Map[String, String] = {
+ logInfo("Initializing UniffleDriverPlugin...")
_sc = Some(sc)
UniffleListener.register(sc)
postBuildInfoEvent(sc)
@@ -56,7 +57,7 @@ private class UniffleDriverPlugin extends DriverPlugin with
Logging {
buildInfo.put("Commit Id", ProjectConstants.getGitCommitId)
buildInfo.put("Revision", ProjectConstants.REVISION)
- val event = new BuildInfoEvent(buildInfo.toMap)
+ val event = BuildInfoEvent(buildInfo.toMap)
context.listenerBus.post(event)
}
}
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
index fd19ffb02..43c5341e9 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
@@ -18,11 +18,11 @@
package org.apache.spark
import com.fasterxml.jackson.annotation.JsonIgnore
-import org.apache.spark.shuffle.events.{ShuffleReadMetric, ShuffleWriteMetric}
import org.apache.spark.status.KVUtils.KVIndexParam
import org.apache.spark.util.Utils
import org.apache.spark.util.kvstore.{KVIndex, KVStore, KVStoreView}
+import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters.asScalaIteratorConverter
class UniffleStatusStore(store: KVStore) {
@@ -32,19 +32,44 @@ class UniffleStatusStore(store: KVStore) {
def buildInfo(): BuildInfoUIData = {
val kClass = classOf[BuildInfoUIData]
- store.read(kClass, kClass.getName)
+ try {
+ store.read(kClass, kClass.getName)
+ } catch {
+ case _: NoSuchElementException => new BuildInfoUIData(Seq.empty)
+ }
}
- def taskShuffleReadMetrics(): Seq[TaskShuffleReadMetricUIData] = {
- viewToSeq(store.view(classOf[TaskShuffleReadMetricUIData]))
+ def assignmentInfos(): Seq[ShuffleAssignmentUIData] = {
+ viewToSeq(store.view(classOf[ShuffleAssignmentUIData]))
}
- def taskShuffleWriteMetrics(): Seq[TaskShuffleWriteMetricUIData] = {
- viewToSeq(store.view(classOf[TaskShuffleWriteMetricUIData]))
+ def aggregatedShuffleWriteMetrics(): AggregatedShuffleWriteMetricsUIData = {
+ val kClass = classOf[AggregatedShuffleWriteMetricsUIData]
+ try {
+ store.read(kClass, kClass.getName)
+ } catch {
+ case _: NoSuchElementException =>
+ new AggregatedShuffleWriteMetricsUIData(new ConcurrentHashMap[String,
AggregatedShuffleWriteMetric]())
+ }
}
- def assignmentInfos(): Seq[ShuffleAssignmentUIData] = {
- viewToSeq(store.view(classOf[ShuffleAssignmentUIData]))
+ def aggregatedShuffleReadMetrics(): AggregatedShuffleReadMetricsUIData = {
+ val kClass = classOf[AggregatedShuffleReadMetricsUIData]
+ try {
+ store.read(kClass, kClass.getName)
+ } catch {
+ case _: NoSuchElementException =>
+ new AggregatedShuffleReadMetricsUIData(new ConcurrentHashMap[String,
AggregatedShuffleReadMetric]())
+ }
+ }
+
+ def totalTaskTime(): TotalTaskCpuTime = {
+ val kClass = classOf[TotalTaskCpuTime]
+ try {
+ store.read(kClass, kClass.getName)
+ } catch {
+ case _: Exception => TotalTaskCpuTime(0)
+ }
}
}
@@ -54,14 +79,31 @@ class BuildInfoUIData(val info: Seq[(String, String)]) {
def id: String = classOf[BuildInfoUIData].getName()
}
-class TaskShuffleWriteMetricUIData(val stageId: Int,
- val shuffleId: Int,
- @KVIndexParam val taskId: Long,
- val metrics: java.util.Map[String,
ShuffleWriteMetric])
-
-class TaskShuffleReadMetricUIData(val stageId: Int,
- val shuffleId: Int,
- @KVIndexParam val taskId: Long,
- val metrics: java.util.Map[String,
ShuffleReadMetric])
class ShuffleAssignmentUIData(@KVIndexParam val shuffleId: Int,
- val shuffleServerIdList: java.util.List[String])
\ No newline at end of file
+ val shuffleServerIdList: java.util.List[String])
+
+// Aggregated shuffle write/read metrics
+class AggregatedShuffleMetric(var durationMillis: Long, var byteSize: Long)
+
+class AggregatedShuffleWriteMetricsUIData(val metrics:
ConcurrentHashMap[String, AggregatedShuffleWriteMetric]) {
+ @JsonIgnore
+ @KVIndex
+ def id: String = classOf[AggregatedShuffleWriteMetricsUIData].getName()
+}
+class AggregatedShuffleWriteMetric(durationMillis: Long, byteSize: Long)
+ extends AggregatedShuffleMetric(durationMillis, byteSize)
+
+class AggregatedShuffleReadMetricsUIData(val metrics:
ConcurrentHashMap[String, AggregatedShuffleReadMetric]) {
+ @JsonIgnore
+ @KVIndex
+ def id: String = classOf[AggregatedShuffleReadMetricsUIData].getName()
+}
+class AggregatedShuffleReadMetric(durationMillis: Long, byteSize: Long)
+ extends AggregatedShuffleMetric(durationMillis, byteSize)
+
+// task total cpu time
+case class TotalTaskCpuTime(durationMillis: Long) {
+ @JsonIgnore
+ @KVIndex
+ def id: String = classOf[TotalTaskCpuTime].getName()
+}
\ No newline at end of file
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index 6b7826a69..422ec41dc 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -18,8 +18,10 @@
package org.apache.spark.ui
import org.apache.spark.internal.Logging
-import org.apache.spark.shuffle.events.ShuffleMetric
+import org.apache.spark.util.Utils
+import org.apache.spark.{AggregatedShuffleMetric, AggregatedShuffleReadMetric,
AggregatedShuffleWriteMetric}
+import java.util.concurrent.ConcurrentHashMap
import javax.servlet.http.HttpServletRequest
import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.xml.{Node, NodeSeq}
@@ -37,7 +39,7 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("")
with Logging {
</td>
</tr>
- private def allServerRow(kv: (String, Long, Long, Long, Long, Long, Long)) =
<tr>
+ private def allServerRow(kv: (String, String, String, Double, String,
String, Double)) = <tr>
<td>{kv._1}</td>
<td>{kv._2}</td>
<td>{kv._3}</td>
@@ -46,40 +48,11 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("")
with Logging {
<td>{kv._6}</td>
</tr>
- private def shuffleStatisticsCalculate(shuffleMetrics: Seq[(String,
ShuffleMetric)]): (Seq[Long], Seq[String]) = {
- if (shuffleMetrics.isEmpty) {
- return (Seq.empty[Long], Seq.empty[String])
- }
-
- val trackerData = shuffleMetrics
- val groupedAndSortedMetrics = trackerData
- .groupBy(_._1)
- .map {
- case (key, metrics) =>
- val totalByteSize = metrics.map(_._2.getByteSize).sum
- val totalDuration = metrics.map(_._2.getDurationMillis).sum
- (key, totalByteSize, totalDuration, totalByteSize / totalDuration)
- }
- .toSeq
- .sortBy(_._4)
-
- val minMetric = groupedAndSortedMetrics.head
- val maxMetric = groupedAndSortedMetrics.last
- val p25Metric = groupedAndSortedMetrics((groupedAndSortedMetrics.size *
0.25).toInt)
- val p50Metric = groupedAndSortedMetrics(groupedAndSortedMetrics.size / 2)
- val p75Metric = groupedAndSortedMetrics((groupedAndSortedMetrics.size *
0.75).toInt)
-
- val speeds = Seq(minMetric, p25Metric, p50Metric, p75Metric,
maxMetric).map(_._4)
- val shuffleServerIds = Seq(minMetric, p25Metric, p50Metric, p75Metric,
maxMetric).map(_._1)
-
- (speeds, shuffleServerIds)
- }
-
- private def createShuffleMetricsRows(shuffleWriteMetrics: (Seq[Long],
Seq[String]), shuffleReadMetrics: (Seq[Long], Seq[String])):
Seq[scala.xml.Elem] = {
+ private def createShuffleMetricsRows(shuffleWriteMetrics: (Seq[Double],
Seq[String]), shuffleReadMetrics: (Seq[Double], Seq[String])):
Seq[scala.xml.Elem] = {
val (writeSpeeds, writeServerIds) = if (shuffleWriteMetrics != null)
shuffleWriteMetrics else (Seq.empty, Seq.empty)
val (readSpeeds, readServerIds) = if (shuffleReadMetrics != null)
shuffleReadMetrics else (Seq.empty, Seq.empty)
- def createSpeedRow(metricType: String, speeds: Seq[Long]) = <tr>
+ def createSpeedRow(metricType: String, speeds: Seq[Double]) = <tr>
<td>
{metricType}
</td>{speeds.map(speed => <td>
@@ -95,7 +68,7 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("")
with Logging {
</td>)}
</tr>
- val writeSpeedRow = if (writeSpeeds.nonEmpty) Some(createSpeedRow("Write
Speed (bytes/sec)", writeSpeeds)) else None
+ val writeSpeedRow = if (writeSpeeds.nonEmpty) Some(createSpeedRow("Write
Speed (MB/sec)", writeSpeeds)) else None
val writeServerIdRow = if (writeServerIds.nonEmpty)
Some(createServerIdRow("Shuffle Write Server ID", writeServerIds)) else None
val readSpeedRow = if (readSpeeds.nonEmpty) Some(createSpeedRow("Read
Speed (bytes/sec)", readSpeeds)) else None
val readServerIdRow = if (readServerIds.nonEmpty)
Some(createServerIdRow("Shuffle Read Server ID", readServerIds)) else None
@@ -103,35 +76,18 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
Seq(writeSpeedRow, writeServerIdRow, readSpeedRow, readServerIdRow).flatten
}
- private def combineReadWriteByServerId(writeMetrics: Seq[(String,
ShuffleMetric)], readMetrics: Seq[(String, ShuffleMetric)]): Seq[(String, Long,
Long, Long, Long, Long, Long)] = {
- val write = groupByShuffleServer(writeMetrics)
- val read = groupByShuffleServer(readMetrics)
- val allServerIds = write.keySet ++ read.keySet
- val combinedMetrics = allServerIds.toSeq.map { serverId =>
- val writeMetric = write.getOrElse(serverId, (0L, 0L, 0L))
- val readMetric = read.getOrElse(serverId, (0L, 0L, 0L))
- (serverId, writeMetric._1, writeMetric._2, writeMetric._3,
readMetric._1, readMetric._2, readMetric._3)
- }
- combinedMetrics
- }
+ override def render(request: HttpServletRequest): Seq[Node] = {
+ val originWriteMetric = runtimeStatusStore.aggregatedShuffleWriteMetrics()
+ val originReadMetric = runtimeStatusStore.aggregatedShuffleReadMetrics()
- private def groupByShuffleServer(shuffleMetrics: Seq[(String,
ShuffleMetric)]): Map[String, (Long, Long, Long)] = {
- if (shuffleMetrics.isEmpty) {
- return Map.empty[String, (Long, Long, Long)]
- }
- val metrics = shuffleMetrics
- .groupBy(_._1)
- .mapValues {
- metrics =>
- val totalByteSize = metrics.map(_._2.getByteSize).sum
- val totalDuration = metrics.map(_._2.getDurationMillis).sum
- (totalByteSize, totalDuration, totalByteSize / totalDuration)
- }
- .toMap
- metrics
- }
+ // render header
+ val writeMetaInfo =
getShuffleMetaInfo(originWriteMetric.metrics.asScala.toSeq)
+ val readMetaInfo =
getShuffleMetaInfo(originReadMetric.metrics.asScala.toSeq)
+ val shuffleTotalSize = writeMetaInfo._1
+ val shuffleTotalTime = writeMetaInfo._2 + readMetaInfo._2
+ val taskCpuTime = if (runtimeStatusStore.totalTaskTime == null) 0 else
runtimeStatusStore.totalTaskTime.durationMillis
+ val percent = if (taskCpuTime == 0) 0 else shuffleTotalTime.toDouble /
taskCpuTime
- override def render(request: HttpServletRequest): Seq[Node] = {
// render build info
val buildInfo = runtimeStatusStore.buildInfo()
val buildInfoTableUI = UIUtils.listingTable(
@@ -142,12 +98,12 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
)
// render shuffle-servers write+read statistics
- val shuffleWriteMetrics =
shuffleStatisticsCalculate(runtimeStatusStore.taskShuffleWriteMetrics().flatMap(x
=> x.metrics.asScala))
- val shuffleReadMetrics =
shuffleStatisticsCalculate(runtimeStatusStore.taskShuffleReadMetrics().flatMap(x
=> x.metrics.asScala))
- val shuffleHeader = Seq("Min", "P25", "P50", "P75", "Max")
+ val shuffleWriteMetrics =
shuffleSpeedStatistics(originWriteMetric.metrics.asScala.toSeq)
+ val shuffleReadMetrics =
shuffleSpeedStatistics(originReadMetric.metrics.asScala.toSeq)
+ val shuffleHeader = Seq("Avg", "Min", "P25", "P50", "P75", "Max")
val shuffleMetricsRows = createShuffleMetricsRows(shuffleWriteMetrics,
shuffleReadMetrics)
val shuffleMetricsTableUI =
- <table class="table table-bordered table-condensed table-striped
table-head-clickable">
+ <table class="table table-bordered table-sm table-striped sortable">
<thead>
<tr>
{("Metric" +: shuffleHeader).map(header => <th>
@@ -161,12 +117,12 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
</table>
// render all assigned shuffle-servers
- val allServers = combineReadWriteByServerId(
- runtimeStatusStore.taskShuffleWriteMetrics().flatMap(x =>
x.metrics.asScala),
- runtimeStatusStore.taskShuffleReadMetrics().flatMap(x =>
x.metrics.asScala)
+ val allServers = unionByServerId(
+ originWriteMetric.metrics,
+ originReadMetric.metrics
)
val allServersTableUI = UIUtils.listingTable(
- Seq("Shuffle Server ID", "Write Bytes", "Write Duration", "Write Speed",
"Read Bytes", "Read Duration", "Read Speed"),
+ Seq("Shuffle Server ID", "Write Bytes", "Write Duration", "Write Speed
(MB/sec)", "Read Bytes", "Read Duration", "Read Speed"),
allServerRow,
allServers,
fixedWidth = true
@@ -181,60 +137,153 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
fixedWidth = true
)
- val summary: NodeSeq =
+ val summary: NodeSeq = {
<div>
<div>
- <span class="collapse-sql-properties collapse-table"
- onClick="collapseTable('build-info-table')">
+ <ul class="list-unstyled">
+ <li id="completed-summary" data-relingo-block="true">
+ <a>
+ <strong>Total shuffle bytes:</strong>
+ </a>
+ {shuffleTotalSize} / {Utils.bytesToString(shuffleTotalSize)}
+ </li><li data-relingo-block="true">
+ <a>
+ <strong>Shuffle Duration / Task Duration:</strong>
+ </a>
+ {UIUtils.formatDuration(shuffleTotalTime)} /
{UIUtils.formatDuration(taskCpuTime)} = {roundToTwoDecimals(percent)}
+ </li>
+ </ul>
+ </div>
+
+ <div>
+ <span class="collapse-build-info-properties collapse-table"
+ onClick="collapseTable('collapse-build-info-properties',
'build-info-table')">
<h4>
<span class="collapse-table-arrow arrow-closed"></span>
<a>Uniffle Build Information</a>
</h4>
</span>
- <div class="build-info-table collapsible-table">
+ <div class="build-info-table collapsible-table collapsed">
{buildInfoTableUI}
</div>
</div>
<div>
- <span class="collapse-sql-properties collapse-table"
- onClick="collapseTable('statistics-table')">
+ <span class="collapse-throughput-properties collapse-table"
+ onClick="collapseTable('collapse-throughput-properties',
'statistics-table')">
<h4>
<span class="collapse-table-arrow arrow-closed"></span>
<a>Shuffle Throughput Statistics</a>
</h4>
- <div class="statistics-table collapsible-table">
+ <div class="statistics-table collapsible-table collapsed">
{shuffleMetricsTableUI}
</div>
</span>
</div>
<div>
- <span class="collapse-table"
onClick="collapseTable('all-servers-table')">
+ <span class="collapse-server-properties collapse-table"
+ onClick="collapseTable('collapse-server-properties',
'all-servers-table')">
<h4>
- <span class="collapse-table-arrow"></span>
- <a>Shuffle Server</a>
+ <span class="collapse-table-arrow arrow-closed"></span>
+ <a>Shuffle Server ({allServers.length})</a>
</h4>
- <div class="all-servers-table collapsed">
+ <div class="all-servers-table collapsible-table collapsed">
{allServersTableUI}
</div>
</span>
</div>
<div>
- <span class="collapse-sql-properties collapse-table"
- onClick="collapseTable('assignment-table')">
+ <span class="collapse-assignment-properties collapse-table"
+ onClick="collapseTable('collapse-assignment-properties',
'assignment-table')">
<h4>
<span class="collapse-table-arrow arrow-closed"></span>
- <a>Assignment</a>
+ <a>Assignment ({assignmentInfos.length})</a>
</h4>
</span>
- <div class="assignment-table collapsible-table">
+ <div class="assignment-table collapsible-table collapsed">
{assignmentTableUI}
</div>
</div>
</div>
+ }
UIUtils.headerSparkPage(request, "Uniffle", summary, parent)
}
+
+ private def getShuffleMetaInfo(metrics: Seq[(String,
AggregatedShuffleMetric)]) = {
+ (
+ metrics.map(x => x._2.byteSize).sum,
+ metrics.map(x => x._2.durationMillis).sum
+ )
+ }
+
+ private def roundToTwoDecimals(value: Double): Double = {
+ BigDecimal(value).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble
+ }
+
+ private def unionByServerId(write: ConcurrentHashMap[String,
AggregatedShuffleWriteMetric],
+ read: ConcurrentHashMap[String,
AggregatedShuffleReadMetric]): Seq[(String, String, String, Double, String,
String, Double)] = {
+ val writeMetrics = write.asScala
+ val readMetrics = read.asScala
+ val allServerIds = writeMetrics.keySet ++ readMetrics.keySet
+
+ val writeMetricsToMap =
+ writeMetrics
+ .mapValues {
+ metrics =>
+ (metrics.byteSize, metrics.durationMillis,
(metrics.byteSize.toDouble / metrics.durationMillis) / 1000.00)
+ }
+ .toMap
+ val readMetricsToMap =
+ readMetrics
+ .mapValues {
+ metrics =>
+ (metrics.byteSize, metrics.durationMillis,
(metrics.byteSize.toDouble / metrics.durationMillis) / 1000.00)
+ }
+ .toMap
+
+ val unionMetrics = allServerIds.toSeq.map { serverId =>
+ val writeMetric = writeMetricsToMap.getOrElse(serverId, (0L, 0L, 0.00))
+ val readMetric = readMetricsToMap.getOrElse(serverId, (0L, 0L, 0.00))
+ (
+ serverId,
+ Utils.bytesToString(writeMetric._1),
+ UIUtils.formatDuration(writeMetric._2),
+ roundToTwoDecimals(writeMetric._3),
+ Utils.bytesToString(readMetric._1),
+ UIUtils.formatDuration(readMetric._2),
+ roundToTwoDecimals(readMetric._3)
+ )
+ }
+ unionMetrics
+ }
+
+ private def shuffleSpeedStatistics(metrics: Seq[(String,
AggregatedShuffleMetric)]): (Seq[Double], Seq[String]) = {
+ if (metrics.isEmpty) {
+ return (Seq.empty, Seq.empty)
+ }
+ val sorted =
+ metrics
+ .map(x => {
+ (x._1, x._2.byteSize, x._2.durationMillis, x._2.byteSize.toDouble /
x._2.durationMillis / 1000.00)
+ })
+ .sortBy(_._4)
+
+ val totalBytes = sorted.map(_._2).sum.toDouble // Sum of all byte sizes
+ val totalDuration = sorted.map(_._3).sum.toDouble // Sum of all durations
in milliseconds
+ val avgMetric = if (totalDuration != 0) totalBytes / totalDuration /
1000.00 else 0.0
+
+ val minMetric = sorted.head
+ val maxMetric = sorted.last
+ val p25Metric = sorted((sorted.size * 0.25).toInt)
+ val p50Metric = sorted(sorted.size / 2)
+ val p75Metric = sorted((sorted.size * 0.75).toInt)
+
+ val speeds = avgMetric +: Seq(minMetric, p25Metric, p50Metric, p75Metric,
maxMetric).map(_._4)
+ val shuffleServerIds = "N/A" +: Seq(minMetric, p25Metric, p50Metric,
p75Metric, maxMetric).map(_._1)
+
+ (speeds, shuffleServerIds)
+ }
}
diff --git a/pom.xml b/pom.xml
index 284a7f23c..c6a4a4511 100644
--- a/pom.xml
+++ b/pom.xml
@@ -108,6 +108,7 @@
<snakeyaml.version>2.2</snakeyaml.version>
<kryo.version>4.0.2</kryo.version>
<scala.version>2.12.18</scala.version>
+ <scala.maven.plugin.version>3.2.2</scala.maven.plugin.version>
</properties>
<repositories>