This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new bc37fdc [SPARK-31275][WEBUI] Improve the metrics format in ExecutionPage for StageId bc37fdc is described below commit bc37fdc77130ce4f60806db0bb2b1b8914452040 Author: Kousuke Saruta <saru...@oss.nttdata.com> AuthorDate: Fri Mar 27 13:35:28 2020 +0800 [SPARK-31275][WEBUI] Improve the metrics format in ExecutionPage for StageId ### What changes were proposed in this pull request? In ExecutionPage, metrics format for stageId, attemptId and taskId are displayed like `(stageId (attemptId): taskId)` for now. I changed this format like `(stageId.attemptId taskId)`. ### Why are the changes needed? As cloud-fan suggested [here](https://github.com/apache/spark/pull/27927#discussion_r398591519), `stageId.attemptId` is more standard in Spark. ### Does this PR introduce any user-facing change? Yes. Before applying this change, we can see the UI like as follows. ![with-checked](https://user-images.githubusercontent.com/4736016/77682421-42a6c200-6fda-11ea-92e4-e9f4554adb71.png) And after this change applied, we can like as follows. ![fix-merics-format-with-checked](https://user-images.githubusercontent.com/4736016/77682493-61a55400-6fda-11ea-801f-91a67da698fd.png) ### How was this patch tested? Modified `SQLMetricsSuite` and manual test. Closes #28039 from sarutak/improve-metrics-format. Authored-by: Kousuke Saruta <saru...@oss.nttdata.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/execution/ui/static/spark-sql-viz.js | 2 +- .../spark/sql/execution/metric/SQLMetrics.scala | 12 ++++++------ .../spark/sql/execution/ui/ExecutionPage.scala | 2 +- .../spark/sql/execution/metric/SQLMetricsSuite.scala | 20 ++++++++++---------- .../sql/execution/metric/SQLMetricsTestUtils.scala | 18 +++++++++--------- 5 files changed, 27 insertions(+), 27 deletions(-) diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js index 0fb7dab..bb393d9 100644 --- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js +++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js @@ -73,7 +73,7 @@ function setupTooltipForSparkPlanNode(nodeId) { // labelSeparator should be a non-graphical character in order not to affect the width of boxes. var labelSeparator = "\x01"; -var stageAndTaskMetricsPattern = "^(.*)(\\(stage.*attempt.*task[^)]*\\))(.*)$"; +var stageAndTaskMetricsPattern = "^(.*)(\\(stage.*task[^)]*\\))(.*)$"; /* * Helper function to pre-process the graph layout. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 65aabe0..1394e0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -116,7 +116,7 @@ object SQLMetrics { // data size total (min, med, max): // 100GB (100MB, 1GB, 10GB) val acc = new SQLMetric(SIZE_METRIC, -1) - acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"), + acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"), countFailedValues = false) acc } @@ -126,7 +126,7 @@ object SQLMetrics { // duration(min, med, max): // 5s (800ms, 1s, 2s) val acc = new SQLMetric(TIMING_METRIC, -1) - acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"), + acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"), countFailedValues = false) acc } @@ -134,7 +134,7 @@ object SQLMetrics { def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = { // Same with createTimingMetric, just normalize the unit of time to millisecond. val acc = new SQLMetric(NS_TIMING_METRIC, -1) - acc.register(sc, name = Some(s"$name total (min, med, max (stageId (attemptId): taskId))"), + acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"), countFailedValues = false) acc } @@ -150,7 +150,7 @@ object SQLMetrics { // probe avg (min, med, max): // (1.2, 2.2, 6.3) val acc = new SQLMetric(AVERAGE_METRIC) - acc.register(sc, name = Some(s"$name (min, med, max (stageId (attemptId): taskId))"), + acc.register(sc, name = Some(s"$name (min, med, max (stageId: taskId))"), countFailedValues = false) acc } @@ -169,11 +169,11 @@ object SQLMetrics { * and represent it in string for a SQL physical operator. */ def stringValue(metricsType: String, values: Array[Long], maxMetrics: Array[Long]): String = { - // stringMetric = "(driver)" OR (stage $stageId (attempt $attemptId): task $taskId)) + // stringMetric = "(driver)" OR (stage ${stageId}.${attemptId}: task $taskId) val stringMetric = if (maxMetrics.isEmpty) { "(driver)" } else { - s"(stage ${maxMetrics(1)} (attempt ${maxMetrics(2)}): task ${maxMetrics(3)})" + s"(stage ${maxMetrics(1)}.${maxMetrics(2)}: task ${maxMetrics(3)})" } if (metricsType == SUM_METRIC) { val numberFormat = NumberFormat.getIntegerInstance(Locale.US) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala index d304369..76bc7fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala @@ -73,7 +73,7 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging </div> <div> <input type="checkbox" id="stageId-and-taskId-checkbox"></input> - <span>Show the Stage (Stage Attempt): Task ID that corresponds to the max metric</span> + <span>Show the Stage ID and Task ID that corresponds to the max metric</span> </div> val metrics = sqlStore.executionMetrics(executionId) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 7d09577..11f93c8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -98,7 +98,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils { val ds = spark.range(10).filter('id < 5) testSparkPlanMetricsWithPredicates(ds.toDF(), 1, Map( 0L -> (("WholeStageCodegen (1)", Map( - "duration total (min, med, max (stageId (attemptId): taskId))" -> { + "duration total (min, med, max (stageId: taskId))" -> { _.toString.matches(timingMetricPattern) })))), true) } @@ -110,10 +110,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils { val df = testData2.groupBy().count() // 2 partitions val expected1 = Seq( Map("number of output rows" -> 2L, - "avg hash probe bucket list iters (min, med, max (stageId (attemptId): taskId))" -> + "avg hash probe bucket list iters (min, med, max (stageId: taskId))" -> aggregateMetricsPattern), Map("number of output rows" -> 1L, - "avg hash probe bucket list iters (min, med, max (stageId (attemptId): taskId))" -> + "avg hash probe bucket list iters (min, med, max (stageId: taskId))" -> aggregateMetricsPattern)) val shuffleExpected1 = Map( "records read" -> 2L, @@ -130,10 +130,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils { val df2 = testData2.groupBy('a).count() val expected2 = Seq( Map("number of output rows" -> 4L, - "avg hash probe bucket list iters (min, med, max (stageId (attemptId): taskId))" -> + "avg hash probe bucket list iters (min, med, max (stageId: taskId))" -> aggregateMetricsPattern), Map("number of output rows" -> 3L, - "avg hash probe bucket list iters (min, med, max (stageId (attemptId): taskId))" -> + "avg hash probe bucket list iters (min, med, max (stageId: taskId))" -> aggregateMetricsPattern)) val shuffleExpected2 = Map( @@ -181,8 +181,8 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils { } val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get nodeIds.foreach { nodeId => - val probes = metrics(nodeId)._2("avg hash probe bucket list iters (min, med, max (stageId" + - " (attemptId): taskId))") + val probes = + metrics(nodeId)._2("avg hash probe bucket list iters (min, med, max (stageId: taskId))") // Extract min, med, max from the string and strip off everthing else. val index = probes.toString.stripPrefix("\n(").stripSuffix(")").indexOf(" (", 0) probes.toString.stripPrefix("\n(").stripSuffix(")").slice(0, index).split(", ").foreach { @@ -231,13 +231,13 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils { val df = Seq(1, 3, 2).toDF("id").sort('id) testSparkPlanMetricsWithPredicates(df, 2, Map( 0L -> (("Sort", Map( - "sort time total (min, med, max (stageId (attemptId): taskId))" -> { + "sort time total (min, med, max (stageId: taskId))" -> { _.toString.matches(timingMetricPattern) }, - "peak memory total (min, med, max (stageId (attemptId): taskId))" -> { + "peak memory total (min, med, max (stageId: taskId))" -> { _.toString.matches(sizeMetricPattern) }, - "spill size total (min, med, max (stageId (attemptId): taskId))" -> { + "spill size total (min, med, max (stageId: taskId))" -> { _.toString.matches(sizeMetricPattern) }))) )) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index 0c1148f..766e7a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -41,27 +41,27 @@ trait SQLMetricsTestUtils extends SQLTestUtils { protected def statusStore: SQLAppStatusStore = spark.sharedState.statusStore - // Pattern of size SQLMetric value, e.g. "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB (stage 0 - // (attempt 0): task 4))" OR "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB)" + // Pattern of size SQLMetric value, e.g. "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB (stage 0.0: + // task 4))" OR "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB)" protected val sizeMetricPattern = { val bytes = "([0-9]+(\\.[0-9]+)?) (EiB|PiB|TiB|GiB|MiB|KiB|B)" - val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task ([0-9])+\\)" + val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)" s"\\n$bytes \\($bytes, $bytes, $bytes( $maxMetrics)?\\)" } - // Pattern of timing SQLMetric value, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms (stage 3 (attempt - // 0): task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)" + // Pattern of timing SQLMetric value, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms (stage 3.0): + // task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)" protected val timingMetricPattern = { val duration = "([0-9]+(\\.[0-9]+)?) (ms|s|m|h)" - val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task ([0-9])+\\)" + val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)" s"\\n$duration \\($duration, $duration, $duration( $maxMetrics)?\\)" } // Pattern of size SQLMetric value for Aggregate tests. - // e.g "\n(1, 1, 0.9 (stage 1 (attempt 0): task 8)) OR "\n(1, 1, 0.9 )" + // e.g "\n(1, 1, 0.9 (stage 1.0: task 8)) OR "\n(1, 1, 0.9 )" protected val aggregateMetricsPattern = { val iters = "([0-9]+(\\.[0-9]+)?)" - val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task ([0-9])+\\)" + val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)" s"\\n\\($iters, $iters, $iters( $maxMetrics)?\\)" } @@ -98,7 +98,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils { } val totalNumBytesMetric = executedNode.metrics.find( - _.name == "written output total (min, med, max (stageId (attemptId): taskId))").get + _.name == "written output total (min, med, max (stageId: taskId))").get val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "") .split(" ").head.trim.toDouble assert(totalNumBytes > 0) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org