Repository: spark Updated Branches: refs/heads/master 09a91d98b -> 93112e693
[SPARK-26142][SQL] Implement shuffle read metrics in SQL ## What changes were proposed in this pull request? Implement `SQLShuffleMetricsReporter` on the sql side as the customized ShuffleMetricsReporter, which extended the `TempShuffleReadMetrics` and update SQLMetrics, in this way shuffle metrics can be reported in the SQL UI. ## How was this patch tested? Add UT in SQLMetricsSuite. Manual test locally, before: ![image](https://user-images.githubusercontent.com/4833765/48960517-30f97880-efa8-11e8-982c-92d05938fd1d.png) after: ![image](https://user-images.githubusercontent.com/4833765/48960587-b54bfb80-efa8-11e8-8e95-7a3c8c74cc5c.png) Closes #23128 from xuanyuanking/SPARK-26142. Lead-authored-by: Yuanjian Li <xyliyuanj...@gmail.com> Co-authored-by: liyuanjian <liyuanj...@baidu.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93112e69 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93112e69 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93112e69 Branch: refs/heads/master Commit: 93112e693082f3fba24cebaf9a98dcf5c1eb84af Parents: 09a91d9 Author: Yuanjian Li <xyliyuanj...@gmail.com> Authored: Wed Nov 28 20:18:13 2018 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Wed Nov 28 20:18:13 2018 +0800 ---------------------------------------------------------------------- .../spark/sql/execution/ShuffledRowRDD.scala | 9 ++- .../exchange/ShuffleExchangeExec.scala | 5 +- .../org/apache/spark/sql/execution/limit.scala | 10 ++- .../spark/sql/execution/metric/SQLMetrics.scala | 20 ++++++ .../metric/SQLShuffleMetricsReporter.scala | 67 ++++++++++++++++++++ .../execution/UnsafeRowSerializerSuite.scala | 5 +- .../sql/execution/metric/SQLMetricsSuite.scala | 21 ++++-- 7 files changed, 126 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/93112e69/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala index 542266b..9b05faa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala @@ -22,6 +22,7 @@ import java.util.Arrays import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleMetricsReporter} /** * The [[Partition]] used by [[ShuffledRowRDD]]. A post-shuffle partition @@ -112,6 +113,7 @@ class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: A */ class ShuffledRowRDD( var dependency: ShuffleDependency[Int, InternalRow, InternalRow], + metrics: Map[String, SQLMetric], specifiedPartitionStartIndices: Option[Array[Int]] = None) extends RDD[InternalRow](dependency.rdd.context, Nil) { @@ -154,7 +156,10 @@ class ShuffledRowRDD( override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition] - val metrics = context.taskMetrics().createTempShuffleReadMetrics() + val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics() + // `SQLShuffleMetricsReporter` will update its own metrics for SQL exchange operator, + // as well as the `tempMetrics` for basic shuffle metrics. + val sqlMetricsReporter = new SQLShuffleMetricsReporter(tempMetrics, metrics) // The range of pre-shuffle partitions that we are fetching at here is // [startPreShufflePartitionIndex, endPreShufflePartitionIndex - 1]. val reader = @@ -163,7 +168,7 @@ class ShuffledRowRDD( shuffledRowPartition.startPreShufflePartitionIndex, shuffledRowPartition.endPreShufflePartitionIndex, context, - metrics) + sqlMetricsReporter) reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2) } http://git-wip-us.apache.org/repos/asf/spark/blob/93112e69/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index d6742ab..8938d93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -48,7 +48,8 @@ case class ShuffleExchangeExec( // e.g. it can be null on the Executor side override lazy val metrics = Map( - "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")) + "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size") + ) ++ SQLMetrics.getShuffleReadMetrics(sparkContext) override def nodeName: String = { val extraInfo = coordinator match { @@ -108,7 +109,7 @@ case class ShuffleExchangeExec( assert(newPartitioning.isInstanceOf[HashPartitioning]) newPartitioning = UnknownPartitioning(indices.length) } - new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices) + new ShuffledRowRDD(shuffleDependency, metrics, specifiedPartitionStartIndices) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/93112e69/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 90dafcf..ea845da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.metric.SQLMetrics /** * Take the first `limit` elements and collect them to a single partition. @@ -37,11 +38,13 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) + override lazy val metrics = SQLMetrics.getShuffleReadMetrics(sparkContext) protected override def doExecute(): RDD[InternalRow] = { val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit)) val shuffled = new ShuffledRowRDD( ShuffleExchangeExec.prepareShuffleDependency( - locallyLimited, child.output, SinglePartition, serializer)) + locallyLimited, child.output, SinglePartition, serializer), + metrics) shuffled.mapPartitionsInternal(_.take(limit)) } } @@ -151,6 +154,8 @@ case class TakeOrderedAndProjectExec( private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) + override lazy val metrics = SQLMetrics.getShuffleReadMetrics(sparkContext) + protected override def doExecute(): RDD[InternalRow] = { val ord = new LazilyGeneratedOrdering(sortOrder, child.output) val localTopK: RDD[InternalRow] = { @@ -160,7 +165,8 @@ case class TakeOrderedAndProjectExec( } val shuffled = new ShuffledRowRDD( ShuffleExchangeExec.prepareShuffleDependency( - localTopK, child.output, SinglePartition, serializer)) + localTopK, child.output, SinglePartition, serializer), + metrics) shuffled.mapPartitions { iter => val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord) if (projectList != child.output) { http://git-wip-us.apache.org/repos/asf/spark/blob/93112e69/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index cbf707f..0b5ee3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -82,6 +82,14 @@ object SQLMetrics { private val baseForAvgMetric: Int = 10 + val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched" + val LOCAL_BLOCKS_FETCHED = "localBlocksFetched" + val REMOTE_BYTES_READ = "remoteBytesRead" + val REMOTE_BYTES_READ_TO_DISK = "remoteBytesReadToDisk" + val LOCAL_BYTES_READ = "localBytesRead" + val FETCH_WAIT_TIME = "fetchWaitTime" + val RECORDS_READ = "recordsRead" + /** * Converts a double value to long value by multiplying a base integer, so we can store it in * `SQLMetrics`. It only works for average metrics. When showing the metrics on UI, we restore @@ -194,4 +202,16 @@ object SQLMetrics { SparkListenerDriverAccumUpdates(executionId.toLong, metrics.map(m => m.id -> m.value))) } } + + /** + * Create all shuffle read relative metrics and return the Map. + */ + def getShuffleReadMetrics(sc: SparkContext): Map[String, SQLMetric] = Map( + REMOTE_BLOCKS_FETCHED -> createMetric(sc, "remote blocks fetched"), + LOCAL_BLOCKS_FETCHED -> createMetric(sc, "local blocks fetched"), + REMOTE_BYTES_READ -> createSizeMetric(sc, "remote bytes read"), + REMOTE_BYTES_READ_TO_DISK -> createSizeMetric(sc, "remote bytes read to disk"), + LOCAL_BYTES_READ -> createSizeMetric(sc, "local bytes read"), + FETCH_WAIT_TIME -> createTimingMetric(sc, "fetch wait time"), + RECORDS_READ -> createMetric(sc, "records read")) } http://git-wip-us.apache.org/repos/asf/spark/blob/93112e69/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala new file mode 100644 index 0000000..542141e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.metric + +import org.apache.spark.executor.TempShuffleReadMetrics + +/** + * A shuffle metrics reporter for SQL exchange operators. + * @param tempMetrics [[TempShuffleReadMetrics]] created in TaskContext. + * @param metrics All metrics in current SparkPlan. This param should not empty and + * contains all shuffle metrics defined in [[SQLMetrics.getShuffleReadMetrics]]. + */ +private[spark] class SQLShuffleMetricsReporter( + tempMetrics: TempShuffleReadMetrics, + metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics { + private[this] val _remoteBlocksFetched = metrics(SQLMetrics.REMOTE_BLOCKS_FETCHED) + private[this] val _localBlocksFetched = metrics(SQLMetrics.LOCAL_BLOCKS_FETCHED) + private[this] val _remoteBytesRead = metrics(SQLMetrics.REMOTE_BYTES_READ) + private[this] val _remoteBytesReadToDisk = metrics(SQLMetrics.REMOTE_BYTES_READ_TO_DISK) + private[this] val _localBytesRead = metrics(SQLMetrics.LOCAL_BYTES_READ) + private[this] val _fetchWaitTime = metrics(SQLMetrics.FETCH_WAIT_TIME) + private[this] val _recordsRead = metrics(SQLMetrics.RECORDS_READ) + + override def incRemoteBlocksFetched(v: Long): Unit = { + _remoteBlocksFetched.add(v) + tempMetrics.incRemoteBlocksFetched(v) + } + override def incLocalBlocksFetched(v: Long): Unit = { + _localBlocksFetched.add(v) + tempMetrics.incLocalBlocksFetched(v) + } + override def incRemoteBytesRead(v: Long): Unit = { + _remoteBytesRead.add(v) + tempMetrics.incRemoteBytesRead(v) + } + override def incRemoteBytesReadToDisk(v: Long): Unit = { + _remoteBytesReadToDisk.add(v) + tempMetrics.incRemoteBytesReadToDisk(v) + } + override def incLocalBytesRead(v: Long): Unit = { + _localBytesRead.add(v) + tempMetrics.incLocalBytesRead(v) + } + override def incFetchWaitTime(v: Long): Unit = { + _fetchWaitTime.add(v) + tempMetrics.incFetchWaitTime(v) + } + override def incRecordsRead(v: Long): Unit = { + _recordsRead.add(v) + tempMetrics.incRecordsRead(v) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/93112e69/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index d305ce3..96b3aa5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types._ import org.apache.spark.storage.ShuffleBlockId import org.apache.spark.util.collection.ExternalSorter @@ -137,7 +138,9 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession { rowsRDD, new PartitionIdPassthrough(2), new UnsafeRowSerializer(2)) - val shuffled = new ShuffledRowRDD(dependency) + val shuffled = new ShuffledRowRDD( + dependency, + SQLMetrics.getShuffleReadMetrics(spark.sparkContext)) shuffled.count() } } http://git-wip-us.apache.org/repos/asf/spark/blob/93112e69/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index b955c15..0f1d08b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -94,8 +94,13 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"), Map("number of output rows" -> 1L, "avg hash probe (min, med, max)" -> "\n(1, 1, 1)")) + val shuffleExpected1 = Map( + "records read" -> 2L, + "local blocks fetched" -> 2L, + "remote blocks fetched" -> 0L) testSparkPlanMetrics(df, 1, Map( 2L -> (("HashAggregate", expected1(0))), + 1L -> (("Exchange", shuffleExpected1)), 0L -> (("HashAggregate", expected1(1)))) ) @@ -106,8 +111,13 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"), Map("number of output rows" -> 3L, "avg hash probe (min, med, max)" -> "\n(1, 1, 1)")) + val shuffleExpected2 = Map( + "records read" -> 4L, + "local blocks fetched" -> 4L, + "remote blocks fetched" -> 0L) testSparkPlanMetrics(df2, 1, Map( 2L -> (("HashAggregate", expected2(0))), + 1L -> (("Exchange", shuffleExpected2)), 0L -> (("HashAggregate", expected2(1)))) ) } @@ -191,7 +201,11 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared testSparkPlanMetrics(df, 1, Map( 0L -> (("SortMergeJoin", Map( // It's 4 because we only read 3 rows in the first partition and 1 row in the second one - "number of output rows" -> 4L)))) + "number of output rows" -> 4L))), + 2L -> (("Exchange", Map( + "records read" -> 4L, + "local blocks fetched" -> 2L, + "remote blocks fetched" -> 0L)))) ) } } @@ -208,7 +222,7 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared "SELECT * FROM testData2 left JOIN testDataForJoin ON testData2.a = testDataForJoin.a") testSparkPlanMetrics(df, 1, Map( 0L -> (("SortMergeJoin", Map( - // It's 4 because we only read 3 rows in the first partition and 1 row in the second one + // It's 8 because we read 6 rows in the left and 2 row in the right one "number of output rows" -> 8L)))) ) @@ -216,7 +230,7 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared "SELECT * FROM testDataForJoin right JOIN testData2 ON testData2.a = testDataForJoin.a") testSparkPlanMetrics(df2, 1, Map( 0L -> (("SortMergeJoin", Map( - // It's 4 because we only read 3 rows in the first partition and 1 row in the second one + // It's 8 because we read 6 rows in the left and 2 row in the right one "number of output rows" -> 8L)))) ) } @@ -287,7 +301,6 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared // Assume the execution plan is // ... -> ShuffledHashJoin(nodeId = 1) -> Project(nodeId = 0) val df = df1.join(df2, "key") - val metrics = getSparkPlanMetrics(df, 1, Set(1L)) testSparkPlanMetrics(df, 1, Map( 1L -> (("ShuffledHashJoin", Map( "number of output rows" -> 2L, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org