Repository: spark
Updated Branches:
  refs/heads/master 9fdc7a840 -> cb368f2c2


[SPARK-26142] followup: Move sql shuffle read metrics relatives to 
SQLShuffleMetricsReporter

## What changes were proposed in this pull request?

Follow up for https://github.com/apache/spark/pull/23128, move sql read metrics 
relatives to `SQLShuffleMetricsReporter`, in order to put sql shuffle read 
metrics relatives closer and avoid possible problem about forgetting update 
SQLShuffleMetricsReporter while new metrics added by others.

## How was this patch tested?

Existing tests.

Closes #23175 from xuanyuanking/SPARK-26142-follow.

Authored-by: Yuanjian Li <xyliyuanj...@gmail.com>
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb368f2c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb368f2c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb368f2c

Branch: refs/heads/master
Commit: cb368f2c2964797d7313d3a4151e2352ff7847a9
Parents: 9fdc7a8
Author: Yuanjian Li <xyliyuanj...@gmail.com>
Authored: Thu Nov 29 12:09:30 2018 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Nov 29 12:09:30 2018 -0800

----------------------------------------------------------------------
 .../exchange/ShuffleExchangeExec.scala          |  4 +-
 .../org/apache/spark/sql/execution/limit.scala  |  6 +--
 .../spark/sql/execution/metric/SQLMetrics.scala | 20 --------
 .../metric/SQLShuffleMetricsReporter.scala      | 50 ++++++++++++++++----
 .../execution/UnsafeRowSerializerSuite.scala    |  4 +-
 5 files changed, 47 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cb368f2c/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index 8938d93..c9ca395 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
BoundReference, Uns
 import 
org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.metric.{SQLMetrics, 
SQLShuffleMetricsReporter}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.MutablePair
@@ -49,7 +49,7 @@ case class ShuffleExchangeExec(
 
   override lazy val metrics = Map(
     "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")
-  ) ++ SQLMetrics.getShuffleReadMetrics(sparkContext)
+  ) ++ SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
 
   override def nodeName: String = {
     val extraInfo = coordinator match {

http://git-wip-us.apache.org/repos/asf/spark/blob/cb368f2c/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index ea845da..e9ab7cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -25,7 +25,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
-import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.metric.SQLShuffleMetricsReporter
 
 /**
  * Take the first `limit` elements and collect them to a single partition.
@@ -38,7 +38,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) 
extends UnaryExecNode
   override def outputPartitioning: Partitioning = SinglePartition
   override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
   private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
-  override lazy val metrics = SQLMetrics.getShuffleReadMetrics(sparkContext)
+  override lazy val metrics = 
SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
   protected override def doExecute(): RDD[InternalRow] = {
     val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
     val shuffled = new ShuffledRowRDD(
@@ -154,7 +154,7 @@ case class TakeOrderedAndProjectExec(
 
   private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override lazy val metrics = SQLMetrics.getShuffleReadMetrics(sparkContext)
+  override lazy val metrics = 
SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
 
   protected override def doExecute(): RDD[InternalRow] = {
     val ord = new LazilyGeneratedOrdering(sortOrder, child.output)

http://git-wip-us.apache.org/repos/asf/spark/blob/cb368f2c/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 0b5ee3a..cbf707f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -82,14 +82,6 @@ object SQLMetrics {
 
   private val baseForAvgMetric: Int = 10
 
-  val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched"
-  val LOCAL_BLOCKS_FETCHED = "localBlocksFetched"
-  val REMOTE_BYTES_READ = "remoteBytesRead"
-  val REMOTE_BYTES_READ_TO_DISK = "remoteBytesReadToDisk"
-  val LOCAL_BYTES_READ = "localBytesRead"
-  val FETCH_WAIT_TIME = "fetchWaitTime"
-  val RECORDS_READ = "recordsRead"
-
   /**
    * Converts a double value to long value by multiplying a base integer, so 
we can store it in
    * `SQLMetrics`. It only works for average metrics. When showing the metrics 
on UI, we restore
@@ -202,16 +194,4 @@ object SQLMetrics {
         SparkListenerDriverAccumUpdates(executionId.toLong, metrics.map(m => 
m.id -> m.value)))
     }
   }
-
-  /**
-   * Create all shuffle read relative metrics and return the Map.
-   */
-  def getShuffleReadMetrics(sc: SparkContext): Map[String, SQLMetric] = Map(
-    REMOTE_BLOCKS_FETCHED -> createMetric(sc, "remote blocks fetched"),
-    LOCAL_BLOCKS_FETCHED -> createMetric(sc, "local blocks fetched"),
-    REMOTE_BYTES_READ -> createSizeMetric(sc, "remote bytes read"),
-    REMOTE_BYTES_READ_TO_DISK -> createSizeMetric(sc, "remote bytes read to 
disk"),
-    LOCAL_BYTES_READ -> createSizeMetric(sc, "local bytes read"),
-    FETCH_WAIT_TIME -> createTimingMetric(sc, "fetch wait time"),
-    RECORDS_READ -> createMetric(sc, "records read"))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cb368f2c/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
index 542141e..780f0d7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
@@ -17,24 +17,32 @@
 
 package org.apache.spark.sql.execution.metric
 
+import org.apache.spark.SparkContext
 import org.apache.spark.executor.TempShuffleReadMetrics
 
 /**
  * A shuffle metrics reporter for SQL exchange operators.
  * @param tempMetrics [[TempShuffleReadMetrics]] created in TaskContext.
  * @param metrics All metrics in current SparkPlan. This param should not 
empty and
- *   contains all shuffle metrics defined in 
[[SQLMetrics.getShuffleReadMetrics]].
+ *   contains all shuffle metrics defined in createShuffleReadMetrics.
  */
 private[spark] class SQLShuffleMetricsReporter(
-  tempMetrics: TempShuffleReadMetrics,
-  metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics {
-  private[this] val _remoteBlocksFetched = 
metrics(SQLMetrics.REMOTE_BLOCKS_FETCHED)
-  private[this] val _localBlocksFetched = 
metrics(SQLMetrics.LOCAL_BLOCKS_FETCHED)
-  private[this] val _remoteBytesRead = metrics(SQLMetrics.REMOTE_BYTES_READ)
-  private[this] val _remoteBytesReadToDisk = 
metrics(SQLMetrics.REMOTE_BYTES_READ_TO_DISK)
-  private[this] val _localBytesRead = metrics(SQLMetrics.LOCAL_BYTES_READ)
-  private[this] val _fetchWaitTime = metrics(SQLMetrics.FETCH_WAIT_TIME)
-  private[this] val _recordsRead = metrics(SQLMetrics.RECORDS_READ)
+    tempMetrics: TempShuffleReadMetrics,
+    metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics {
+  private[this] val _remoteBlocksFetched =
+    metrics(SQLShuffleMetricsReporter.REMOTE_BLOCKS_FETCHED)
+  private[this] val _localBlocksFetched =
+    metrics(SQLShuffleMetricsReporter.LOCAL_BLOCKS_FETCHED)
+  private[this] val _remoteBytesRead =
+    metrics(SQLShuffleMetricsReporter.REMOTE_BYTES_READ)
+  private[this] val _remoteBytesReadToDisk =
+    metrics(SQLShuffleMetricsReporter.REMOTE_BYTES_READ_TO_DISK)
+  private[this] val _localBytesRead =
+    metrics(SQLShuffleMetricsReporter.LOCAL_BYTES_READ)
+  private[this] val _fetchWaitTime =
+    metrics(SQLShuffleMetricsReporter.FETCH_WAIT_TIME)
+  private[this] val _recordsRead =
+    metrics(SQLShuffleMetricsReporter.RECORDS_READ)
 
   override def incRemoteBlocksFetched(v: Long): Unit = {
     _remoteBlocksFetched.add(v)
@@ -65,3 +73,25 @@ private[spark] class SQLShuffleMetricsReporter(
     tempMetrics.incRecordsRead(v)
   }
 }
+
+private[spark] object SQLShuffleMetricsReporter {
+  val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched"
+  val LOCAL_BLOCKS_FETCHED = "localBlocksFetched"
+  val REMOTE_BYTES_READ = "remoteBytesRead"
+  val REMOTE_BYTES_READ_TO_DISK = "remoteBytesReadToDisk"
+  val LOCAL_BYTES_READ = "localBytesRead"
+  val FETCH_WAIT_TIME = "fetchWaitTime"
+  val RECORDS_READ = "recordsRead"
+
+  /**
+   * Create all shuffle read relative metrics and return the Map.
+   */
+  def createShuffleReadMetrics(sc: SparkContext): Map[String, SQLMetric] = Map(
+    REMOTE_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "remote blocks 
fetched"),
+    LOCAL_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "local blocks 
fetched"),
+    REMOTE_BYTES_READ -> SQLMetrics.createSizeMetric(sc, "remote bytes read"),
+    REMOTE_BYTES_READ_TO_DISK -> SQLMetrics.createSizeMetric(sc, "remote bytes 
read to disk"),
+    LOCAL_BYTES_READ -> SQLMetrics.createSizeMetric(sc, "local bytes read"),
+    FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"),
+    RECORDS_READ -> SQLMetrics.createMetric(sc, "records read"))
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cb368f2c/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index 96b3aa5..1ad5713 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
-import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.metric.SQLShuffleMetricsReporter
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.ShuffleBlockId
 import org.apache.spark.util.collection.ExternalSorter
@@ -140,7 +140,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with 
LocalSparkSession {
         new UnsafeRowSerializer(2))
     val shuffled = new ShuffledRowRDD(
       dependency,
-      SQLMetrics.getShuffleReadMetrics(spark.sparkContext))
+      SQLShuffleMetricsReporter.createShuffleReadMetrics(spark.sparkContext))
     shuffled.count()
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to