Repository: spark Updated Branches: refs/heads/master 9fdc7a840 -> cb368f2c2
[SPARK-26142] followup: Move sql shuffle read metrics relatives to SQLShuffleMetricsReporter ## What changes were proposed in this pull request? Follow up for https://github.com/apache/spark/pull/23128, move sql read metrics relatives to `SQLShuffleMetricsReporter`, in order to put sql shuffle read metrics relatives closer and avoid possible problem about forgetting update SQLShuffleMetricsReporter while new metrics added by others. ## How was this patch tested? Existing tests. Closes #23175 from xuanyuanking/SPARK-26142-follow. Authored-by: Yuanjian Li <xyliyuanj...@gmail.com> Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb368f2c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb368f2c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb368f2c Branch: refs/heads/master Commit: cb368f2c2964797d7313d3a4151e2352ff7847a9 Parents: 9fdc7a8 Author: Yuanjian Li <xyliyuanj...@gmail.com> Authored: Thu Nov 29 12:09:30 2018 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Thu Nov 29 12:09:30 2018 -0800 ---------------------------------------------------------------------- .../exchange/ShuffleExchangeExec.scala | 4 +- .../org/apache/spark/sql/execution/limit.scala | 6 +-- .../spark/sql/execution/metric/SQLMetrics.scala | 20 -------- .../metric/SQLShuffleMetricsReporter.scala | 50 ++++++++++++++++---- .../execution/UnsafeRowSerializerSuite.scala | 4 +- 5 files changed, 47 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/cb368f2c/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 8938d93..c9ca395 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Uns import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.metric.{SQLMetrics, SQLShuffleMetricsReporter} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.util.MutablePair @@ -49,7 +49,7 @@ case class ShuffleExchangeExec( override lazy val metrics = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size") - ) ++ SQLMetrics.getShuffleReadMetrics(sparkContext) + ) ++ SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) override def nodeName: String = { val extraInfo = coordinator match { http://git-wip-us.apache.org/repos/asf/spark/blob/cb368f2c/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index ea845da..e9ab7cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec -import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.metric.SQLShuffleMetricsReporter /** * Take the first `limit` elements and collect them to a single partition. @@ -38,7 +38,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override lazy val metrics = SQLMetrics.getShuffleReadMetrics(sparkContext) + override lazy val metrics = SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) protected override def doExecute(): RDD[InternalRow] = { val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit)) val shuffled = new ShuffledRowRDD( @@ -154,7 +154,7 @@ case class TakeOrderedAndProjectExec( private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - override lazy val metrics = SQLMetrics.getShuffleReadMetrics(sparkContext) + override lazy val metrics = SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext) protected override def doExecute(): RDD[InternalRow] = { val ord = new LazilyGeneratedOrdering(sortOrder, child.output) http://git-wip-us.apache.org/repos/asf/spark/blob/cb368f2c/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 0b5ee3a..cbf707f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -82,14 +82,6 @@ object SQLMetrics { private val baseForAvgMetric: Int = 10 - val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched" - val LOCAL_BLOCKS_FETCHED = "localBlocksFetched" - val REMOTE_BYTES_READ = "remoteBytesRead" - val REMOTE_BYTES_READ_TO_DISK = "remoteBytesReadToDisk" - val LOCAL_BYTES_READ = "localBytesRead" - val FETCH_WAIT_TIME = "fetchWaitTime" - val RECORDS_READ = "recordsRead" - /** * Converts a double value to long value by multiplying a base integer, so we can store it in * `SQLMetrics`. It only works for average metrics. When showing the metrics on UI, we restore @@ -202,16 +194,4 @@ object SQLMetrics { SparkListenerDriverAccumUpdates(executionId.toLong, metrics.map(m => m.id -> m.value))) } } - - /** - * Create all shuffle read relative metrics and return the Map. - */ - def getShuffleReadMetrics(sc: SparkContext): Map[String, SQLMetric] = Map( - REMOTE_BLOCKS_FETCHED -> createMetric(sc, "remote blocks fetched"), - LOCAL_BLOCKS_FETCHED -> createMetric(sc, "local blocks fetched"), - REMOTE_BYTES_READ -> createSizeMetric(sc, "remote bytes read"), - REMOTE_BYTES_READ_TO_DISK -> createSizeMetric(sc, "remote bytes read to disk"), - LOCAL_BYTES_READ -> createSizeMetric(sc, "local bytes read"), - FETCH_WAIT_TIME -> createTimingMetric(sc, "fetch wait time"), - RECORDS_READ -> createMetric(sc, "records read")) } http://git-wip-us.apache.org/repos/asf/spark/blob/cb368f2c/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala index 542141e..780f0d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala @@ -17,24 +17,32 @@ package org.apache.spark.sql.execution.metric +import org.apache.spark.SparkContext import org.apache.spark.executor.TempShuffleReadMetrics /** * A shuffle metrics reporter for SQL exchange operators. * @param tempMetrics [[TempShuffleReadMetrics]] created in TaskContext. * @param metrics All metrics in current SparkPlan. This param should not empty and - * contains all shuffle metrics defined in [[SQLMetrics.getShuffleReadMetrics]]. + * contains all shuffle metrics defined in createShuffleReadMetrics. */ private[spark] class SQLShuffleMetricsReporter( - tempMetrics: TempShuffleReadMetrics, - metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics { - private[this] val _remoteBlocksFetched = metrics(SQLMetrics.REMOTE_BLOCKS_FETCHED) - private[this] val _localBlocksFetched = metrics(SQLMetrics.LOCAL_BLOCKS_FETCHED) - private[this] val _remoteBytesRead = metrics(SQLMetrics.REMOTE_BYTES_READ) - private[this] val _remoteBytesReadToDisk = metrics(SQLMetrics.REMOTE_BYTES_READ_TO_DISK) - private[this] val _localBytesRead = metrics(SQLMetrics.LOCAL_BYTES_READ) - private[this] val _fetchWaitTime = metrics(SQLMetrics.FETCH_WAIT_TIME) - private[this] val _recordsRead = metrics(SQLMetrics.RECORDS_READ) + tempMetrics: TempShuffleReadMetrics, + metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics { + private[this] val _remoteBlocksFetched = + metrics(SQLShuffleMetricsReporter.REMOTE_BLOCKS_FETCHED) + private[this] val _localBlocksFetched = + metrics(SQLShuffleMetricsReporter.LOCAL_BLOCKS_FETCHED) + private[this] val _remoteBytesRead = + metrics(SQLShuffleMetricsReporter.REMOTE_BYTES_READ) + private[this] val _remoteBytesReadToDisk = + metrics(SQLShuffleMetricsReporter.REMOTE_BYTES_READ_TO_DISK) + private[this] val _localBytesRead = + metrics(SQLShuffleMetricsReporter.LOCAL_BYTES_READ) + private[this] val _fetchWaitTime = + metrics(SQLShuffleMetricsReporter.FETCH_WAIT_TIME) + private[this] val _recordsRead = + metrics(SQLShuffleMetricsReporter.RECORDS_READ) override def incRemoteBlocksFetched(v: Long): Unit = { _remoteBlocksFetched.add(v) @@ -65,3 +73,25 @@ private[spark] class SQLShuffleMetricsReporter( tempMetrics.incRecordsRead(v) } } + +private[spark] object SQLShuffleMetricsReporter { + val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched" + val LOCAL_BLOCKS_FETCHED = "localBlocksFetched" + val REMOTE_BYTES_READ = "remoteBytesRead" + val REMOTE_BYTES_READ_TO_DISK = "remoteBytesReadToDisk" + val LOCAL_BYTES_READ = "localBytesRead" + val FETCH_WAIT_TIME = "fetchWaitTime" + val RECORDS_READ = "recordsRead" + + /** + * Create all shuffle read relative metrics and return the Map. + */ + def createShuffleReadMetrics(sc: SparkContext): Map[String, SQLMetric] = Map( + REMOTE_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "remote blocks fetched"), + LOCAL_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "local blocks fetched"), + REMOTE_BYTES_READ -> SQLMetrics.createSizeMetric(sc, "remote bytes read"), + REMOTE_BYTES_READ_TO_DISK -> SQLMetrics.createSizeMetric(sc, "remote bytes read to disk"), + LOCAL_BYTES_READ -> SQLMetrics.createSizeMetric(sc, "local bytes read"), + FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"), + RECORDS_READ -> SQLMetrics.createMetric(sc, "records read")) +} http://git-wip-us.apache.org/repos/asf/spark/blob/cb368f2c/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index 96b3aa5..1ad5713 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow} -import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.metric.SQLShuffleMetricsReporter import org.apache.spark.sql.types._ import org.apache.spark.storage.ShuffleBlockId import org.apache.spark.util.collection.ExternalSorter @@ -140,7 +140,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession { new UnsafeRowSerializer(2)) val shuffled = new ShuffledRowRDD( dependency, - SQLMetrics.getShuffleReadMetrics(spark.sparkContext)) + SQLShuffleMetricsReporter.createShuffleReadMetrics(spark.sparkContext)) shuffled.count() } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org