This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 7c90ec0 [SPARK-31271][UI] fix web ui for driver side SQL metrics 7c90ec0 is described below commit 7c90ec065f81c3933eef1f0dd172f1a518b1232b Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Fri Mar 27 15:45:35 2020 -0700 [SPARK-31271][UI] fix web ui for driver side SQL metrics ### What changes were proposed in this pull request? In https://github.com/apache/spark/pull/23551, we changed the metrics type of driver-side SQL metrics to size/time etc. which comes with max/min/median info. This doesn't make sense for driver side SQL metrics as they have only one value. It makes the web UI hard to read: ![image](https://user-images.githubusercontent.com/3182036/77653892-42db9900-6fab-11ea-8e7f-92f763fa32ff.png) This PR updates the SQL metrics UI to only display max/min/median if there are more than one metrics values: ![image](https://user-images.githubusercontent.com/3182036/77653975-5f77d100-6fab-11ea-849e-64c935377c8e.png) ### Why are the changes needed? Makes the UI easier to read ### Does this PR introduce any user-facing change? no ### How was this patch tested? manual test Closes #28037 from cloud-fan/ui. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit c4e98c065c99d2cf840e6006ee5414fbaaba9937) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../spark/sql/execution/metric/SQLMetrics.scala | 60 +++++++++++----------- .../spark/sql/execution/ui/SparkPlanGraph.scala | 7 ++- .../sql/execution/metric/SQLMetricsSuite.scala | 33 +++++++----- .../sql/execution/metric/SQLMetricsTestUtils.scala | 12 ++--- .../execution/ui/SQLAppStatusListenerSuite.scala | 9 ++-- 5 files changed, 68 insertions(+), 53 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 1394e0f..92d2179 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -116,26 +116,23 @@ object SQLMetrics { // data size total (min, med, max): // 100GB (100MB, 1GB, 10GB) val acc = new SQLMetric(SIZE_METRIC, -1) - acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"), - countFailedValues = false) + acc.register(sc, name = Some(name), countFailedValues = false) acc } def createTimingMetric(sc: SparkContext, name: String): SQLMetric = { // The final result of this metric in physical operator UI may looks like: - // duration(min, med, max): + // duration total (min, med, max): // 5s (800ms, 1s, 2s) val acc = new SQLMetric(TIMING_METRIC, -1) - acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"), - countFailedValues = false) + acc.register(sc, name = Some(name), countFailedValues = false) acc } def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = { // Same with createTimingMetric, just normalize the unit of time to millisecond. val acc = new SQLMetric(NS_TIMING_METRIC, -1) - acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"), - countFailedValues = false) + acc.register(sc, name = Some(name), countFailedValues = false) acc } @@ -150,8 +147,7 @@ object SQLMetrics { // probe avg (min, med, max): // (1.2, 2.2, 6.3) val acc = new SQLMetric(AVERAGE_METRIC) - acc.register(sc, name = Some(s"$name (min, med, max (stageId: taskId))"), - countFailedValues = false) + acc.register(sc, name = Some(name), countFailedValues = false) acc } @@ -164,13 +160,15 @@ object SQLMetrics { metricsType != SUM_METRIC } + private val METRICS_NAME_SUFFIX = "(min, med, max (stageId: taskId))" + /** * A function that defines how we aggregate the final accumulator results among all tasks, * and represent it in string for a SQL physical operator. */ def stringValue(metricsType: String, values: Array[Long], maxMetrics: Array[Long]): String = { - // stringMetric = "(driver)" OR (stage ${stageId}.${attemptId}: task $taskId) - val stringMetric = if (maxMetrics.isEmpty) { + // taskInfo = "(driver)" OR (stage ${stageId}.${attemptId}: task $taskId) + val taskInfo = if (maxMetrics.isEmpty) { "(driver)" } else { s"(stage ${maxMetrics(1)}.${maxMetrics(2)}: task ${maxMetrics(3)})" @@ -180,18 +178,20 @@ object SQLMetrics { numberFormat.format(values.sum) } else if (metricsType == AVERAGE_METRIC) { val validValues = values.filter(_ > 0) - val Seq(min, med, max) = { - val metric = if (validValues.isEmpty) { - val zeros = Seq.fill(3)(0L) - zeros.map(v => toNumberFormat(v)) - } else { + // When there are only 1 metrics value (or None), no need to display max/min/median. This is + // common for driver-side SQL metrics. + if (validValues.length <= 1) { + toNumberFormat(validValues.headOption.getOrElse(0)) + } else { + val Seq(min, med, max) = { Arrays.sort(validValues) - Seq(toNumberFormat(validValues(0)), toNumberFormat(validValues(validValues.length / 2)), - s"${toNumberFormat(validValues(validValues.length - 1))} $stringMetric") + Seq( + toNumberFormat(validValues(0)), + toNumberFormat(validValues(validValues.length / 2)), + toNumberFormat(validValues(validValues.length - 1))) } - metric + s"$METRICS_NAME_SUFFIX:\n($min, $med, $max $taskInfo)" } - s"\n($min, $med, $max)" } else { val strFormat: Long => String = if (metricsType == SIZE_METRIC) { Utils.bytesToString @@ -204,19 +204,21 @@ object SQLMetrics { } val validValues = values.filter(_ >= 0) - val Seq(sum, min, med, max) = { - val metric = if (validValues.isEmpty) { - val zeros = Seq.fill(4)(0L) - zeros.map(v => strFormat(v)) - } else { + // When there are only 1 metrics value (or None), no need to display max/min/median. This is + // common for driver-side SQL metrics. + if (validValues.length <= 1) { + strFormat(validValues.headOption.getOrElse(0)) + } else { + val Seq(sum, min, med, max) = { Arrays.sort(validValues) - Seq(strFormat(validValues.sum), strFormat(validValues(0)), + Seq( + strFormat(validValues.sum), + strFormat(validValues(0)), strFormat(validValues(validValues.length / 2)), - s"${strFormat(validValues(validValues.length - 1))} $stringMetric") + strFormat(validValues(validValues.length - 1))) } - metric + s"total $METRICS_NAME_SUFFIX\n$sum ($min, $med, $max $taskInfo)" } - s"\n$sum ($min, $med, $max)" } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index 6762802..274a5a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -166,7 +166,12 @@ private[ui] class SparkPlanGraphNode( metric <- metrics value <- metricsValue.get(metric.accumulatorId) } yield { - metric.name + ": " + value + // The value may contain ":" to extend the name, like `total (min, med, max): ...` + if (value.contains(":")) { + metric.name + " " + value + } else { + metric.name + ": " + value + } } if (values.nonEmpty) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 11f93c8..a5b07d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -98,7 +98,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils { val ds = spark.range(10).filter('id < 5) testSparkPlanMetricsWithPredicates(ds.toDF(), 1, Map( 0L -> (("WholeStageCodegen (1)", Map( - "duration total (min, med, max (stageId: taskId))" -> { + "duration" -> { _.toString.matches(timingMetricPattern) })))), true) } @@ -110,10 +110,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils { val df = testData2.groupBy().count() // 2 partitions val expected1 = Seq( Map("number of output rows" -> 2L, - "avg hash probe bucket list iters (min, med, max (stageId: taskId))" -> + "avg hash probe bucket list iters" -> aggregateMetricsPattern), Map("number of output rows" -> 1L, - "avg hash probe bucket list iters (min, med, max (stageId: taskId))" -> + "avg hash probe bucket list iters" -> aggregateMetricsPattern)) val shuffleExpected1 = Map( "records read" -> 2L, @@ -130,10 +130,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils { val df2 = testData2.groupBy('a).count() val expected2 = Seq( Map("number of output rows" -> 4L, - "avg hash probe bucket list iters (min, med, max (stageId: taskId))" -> + "avg hash probe bucket list iters" -> aggregateMetricsPattern), Map("number of output rows" -> 3L, - "avg hash probe bucket list iters (min, med, max (stageId: taskId))" -> + "avg hash probe bucket list iters" -> aggregateMetricsPattern)) val shuffleExpected2 = Map( @@ -181,12 +181,17 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils { } val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get nodeIds.foreach { nodeId => - val probes = - metrics(nodeId)._2("avg hash probe bucket list iters (min, med, max (stageId: taskId))") - // Extract min, med, max from the string and strip off everthing else. - val index = probes.toString.stripPrefix("\n(").stripSuffix(")").indexOf(" (", 0) - probes.toString.stripPrefix("\n(").stripSuffix(")").slice(0, index).split(", ").foreach { - probe => assert(probe.toDouble > 1.0) + val probes = metrics(nodeId)._2("avg hash probe bucket list iters").toString + if (!probes.contains("\n")) { + // It's a single metrics value + assert(probes.toDouble > 1.0) + } else { + val mainValue = probes.split("\n").apply(1).stripPrefix("(").stripSuffix(")") + // Extract min, med, max from the string and strip off everthing else. + val index = mainValue.indexOf(" (", 0) + mainValue.slice(0, index).split(", ").foreach { + probe => assert(probe.toDouble > 1.0) + } } } } @@ -231,13 +236,13 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils { val df = Seq(1, 3, 2).toDF("id").sort('id) testSparkPlanMetricsWithPredicates(df, 2, Map( 0L -> (("Sort", Map( - "sort time total (min, med, max (stageId: taskId))" -> { + "sort time" -> { _.toString.matches(timingMetricPattern) }, - "peak memory total (min, med, max (stageId: taskId))" -> { + "peak memory" -> { _.toString.matches(sizeMetricPattern) }, - "spill size total (min, med, max (stageId: taskId))" -> { + "spill size" -> { _.toString.matches(sizeMetricPattern) }))) )) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index 766e7a9..2977b53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -46,23 +46,23 @@ trait SQLMetricsTestUtils extends SQLTestUtils { protected val sizeMetricPattern = { val bytes = "([0-9]+(\\.[0-9]+)?) (EiB|PiB|TiB|GiB|MiB|KiB|B)" val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)" - s"\\n$bytes \\($bytes, $bytes, $bytes( $maxMetrics)?\\)" + s"(.*\\n$bytes \\($bytes, $bytes, $bytes( $maxMetrics)?\\))|($bytes)" } // Pattern of timing SQLMetric value, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms (stage 3.0): - // task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)" + // task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)" OR "1.0 ms" protected val timingMetricPattern = { val duration = "([0-9]+(\\.[0-9]+)?) (ms|s|m|h)" val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)" - s"\\n$duration \\($duration, $duration, $duration( $maxMetrics)?\\)" + s"(.*\\n$duration \\($duration, $duration, $duration( $maxMetrics)?\\))|($duration)" } // Pattern of size SQLMetric value for Aggregate tests. - // e.g "\n(1, 1, 0.9 (stage 1.0: task 8)) OR "\n(1, 1, 0.9 )" + // e.g "\n(1, 1, 0.9 (stage 1.0: task 8)) OR "\n(1, 1, 0.9 )" OR "1" protected val aggregateMetricsPattern = { val iters = "([0-9]+(\\.[0-9]+)?)" val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)" - s"\\n\\($iters, $iters, $iters( $maxMetrics)?\\)" + s"(.*\\n\\($iters, $iters, $iters( $maxMetrics)?\\))|($iters)" } /** @@ -98,7 +98,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils { } val totalNumBytesMetric = executedNode.metrics.find( - _.name == "written output total (min, med, max (stageId: taskId))").get + _.name == "written output").get val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "") .split(" ").head.trim.toDouble assert(totalNumBytes > 0) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 9f4a335..949924e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -152,11 +152,14 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils expected.foreach { case (id, value) => // The values in actual can be SQL metrics meaning that they contain additional formatting // when converted to string. Verify that they start with the expected value. - // TODO: this is brittle. There is no requirement that the actual string needs to start - // with the accumulator value. assert(actual.contains(id)) val v = actual(id).trim - assert(v.startsWith(value.toString), s"Wrong value for accumulator $id") + if (v.contains("\n")) { + // The actual value can be "total (max, ...)\n6 ms (5 ms, ...)". + assert(v.split("\n")(1).startsWith(value.toString), s"Wrong value for accumulator $id") + } else { + assert(v.startsWith(value.toString), s"Wrong value for accumulator $id") + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org