Repository: spark
Updated Branches:
  refs/heads/master 09a91d98b -> 93112e693


[SPARK-26142][SQL] Implement shuffle read metrics in SQL

## What changes were proposed in this pull request?

Implement `SQLShuffleMetricsReporter` on the sql side as the customized 
ShuffleMetricsReporter, which extended the `TempShuffleReadMetrics` and update 
SQLMetrics, in this way shuffle metrics can be reported in the SQL UI.

## How was this patch tested?

Add UT in SQLMetricsSuite.
Manual test locally, before:
![image](https://user-images.githubusercontent.com/4833765/48960517-30f97880-efa8-11e8-982c-92d05938fd1d.png)
after:
![image](https://user-images.githubusercontent.com/4833765/48960587-b54bfb80-efa8-11e8-8e95-7a3c8c74cc5c.png)

Closes #23128 from xuanyuanking/SPARK-26142.

Lead-authored-by: Yuanjian Li <xyliyuanj...@gmail.com>
Co-authored-by: liyuanjian <liyuanj...@baidu.com>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93112e69
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93112e69
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93112e69

Branch: refs/heads/master
Commit: 93112e693082f3fba24cebaf9a98dcf5c1eb84af
Parents: 09a91d9
Author: Yuanjian Li <xyliyuanj...@gmail.com>
Authored: Wed Nov 28 20:18:13 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Nov 28 20:18:13 2018 +0800

----------------------------------------------------------------------
 .../spark/sql/execution/ShuffledRowRDD.scala    |  9 ++-
 .../exchange/ShuffleExchangeExec.scala          |  5 +-
 .../org/apache/spark/sql/execution/limit.scala  | 10 ++-
 .../spark/sql/execution/metric/SQLMetrics.scala | 20 ++++++
 .../metric/SQLShuffleMetricsReporter.scala      | 67 ++++++++++++++++++++
 .../execution/UnsafeRowSerializerSuite.scala    |  5 +-
 .../sql/execution/metric/SQLMetricsSuite.scala  | 21 ++++--
 7 files changed, 126 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/93112e69/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
index 542266b..9b05faa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
@@ -22,6 +22,7 @@ import java.util.Arrays
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleMetricsReporter}
 
 /**
  * The [[Partition]] used by [[ShuffledRowRDD]]. A post-shuffle partition
@@ -112,6 +113,7 @@ class CoalescedPartitioner(val parent: Partitioner, val 
partitionStartIndices: A
  */
 class ShuffledRowRDD(
     var dependency: ShuffleDependency[Int, InternalRow, InternalRow],
+    metrics: Map[String, SQLMetric],
     specifiedPartitionStartIndices: Option[Array[Int]] = None)
   extends RDD[InternalRow](dependency.rdd.context, Nil) {
 
@@ -154,7 +156,10 @@ class ShuffledRowRDD(
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
     val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition]
-    val metrics = context.taskMetrics().createTempShuffleReadMetrics()
+    val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
+    // `SQLShuffleMetricsReporter` will update its own metrics for SQL 
exchange operator,
+    // as well as the `tempMetrics` for basic shuffle metrics.
+    val sqlMetricsReporter = new SQLShuffleMetricsReporter(tempMetrics, 
metrics)
     // The range of pre-shuffle partitions that we are fetching at here is
     // [startPreShufflePartitionIndex, endPreShufflePartitionIndex - 1].
     val reader =
@@ -163,7 +168,7 @@ class ShuffledRowRDD(
         shuffledRowPartition.startPreShufflePartitionIndex,
         shuffledRowPartition.endPreShufflePartitionIndex,
         context,
-        metrics)
+        sqlMetricsReporter)
     reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/93112e69/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index d6742ab..8938d93 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -48,7 +48,8 @@ case class ShuffleExchangeExec(
   //       e.g. it can be null on the Executor side
 
   override lazy val metrics = Map(
-    "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"))
+    "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")
+  ) ++ SQLMetrics.getShuffleReadMetrics(sparkContext)
 
   override def nodeName: String = {
     val extraInfo = coordinator match {
@@ -108,7 +109,7 @@ case class ShuffleExchangeExec(
       assert(newPartitioning.isInstanceOf[HashPartitioning])
       newPartitioning = UnknownPartitioning(indices.length)
     }
-    new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices)
+    new ShuffledRowRDD(shuffleDependency, metrics, 
specifiedPartitionStartIndices)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/93112e69/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 90dafcf..ea845da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -25,6 +25,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.metric.SQLMetrics
 
 /**
  * Take the first `limit` elements and collect them to a single partition.
@@ -37,11 +38,13 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) 
extends UnaryExecNode
   override def outputPartitioning: Partitioning = SinglePartition
   override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
   private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
+  override lazy val metrics = SQLMetrics.getShuffleReadMetrics(sparkContext)
   protected override def doExecute(): RDD[InternalRow] = {
     val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
     val shuffled = new ShuffledRowRDD(
       ShuffleExchangeExec.prepareShuffleDependency(
-        locallyLimited, child.output, SinglePartition, serializer))
+        locallyLimited, child.output, SinglePartition, serializer),
+      metrics)
     shuffled.mapPartitionsInternal(_.take(limit))
   }
 }
@@ -151,6 +154,8 @@ case class TakeOrderedAndProjectExec(
 
   private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
+  override lazy val metrics = SQLMetrics.getShuffleReadMetrics(sparkContext)
+
   protected override def doExecute(): RDD[InternalRow] = {
     val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
     val localTopK: RDD[InternalRow] = {
@@ -160,7 +165,8 @@ case class TakeOrderedAndProjectExec(
     }
     val shuffled = new ShuffledRowRDD(
       ShuffleExchangeExec.prepareShuffleDependency(
-        localTopK, child.output, SinglePartition, serializer))
+        localTopK, child.output, SinglePartition, serializer),
+      metrics)
     shuffled.mapPartitions { iter =>
       val topK = 
org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), 
limit)(ord)
       if (projectList != child.output) {

http://git-wip-us.apache.org/repos/asf/spark/blob/93112e69/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index cbf707f..0b5ee3a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -82,6 +82,14 @@ object SQLMetrics {
 
   private val baseForAvgMetric: Int = 10
 
+  val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched"
+  val LOCAL_BLOCKS_FETCHED = "localBlocksFetched"
+  val REMOTE_BYTES_READ = "remoteBytesRead"
+  val REMOTE_BYTES_READ_TO_DISK = "remoteBytesReadToDisk"
+  val LOCAL_BYTES_READ = "localBytesRead"
+  val FETCH_WAIT_TIME = "fetchWaitTime"
+  val RECORDS_READ = "recordsRead"
+
   /**
    * Converts a double value to long value by multiplying a base integer, so 
we can store it in
    * `SQLMetrics`. It only works for average metrics. When showing the metrics 
on UI, we restore
@@ -194,4 +202,16 @@ object SQLMetrics {
         SparkListenerDriverAccumUpdates(executionId.toLong, metrics.map(m => 
m.id -> m.value)))
     }
   }
+
+  /**
+   * Create all shuffle read relative metrics and return the Map.
+   */
+  def getShuffleReadMetrics(sc: SparkContext): Map[String, SQLMetric] = Map(
+    REMOTE_BLOCKS_FETCHED -> createMetric(sc, "remote blocks fetched"),
+    LOCAL_BLOCKS_FETCHED -> createMetric(sc, "local blocks fetched"),
+    REMOTE_BYTES_READ -> createSizeMetric(sc, "remote bytes read"),
+    REMOTE_BYTES_READ_TO_DISK -> createSizeMetric(sc, "remote bytes read to 
disk"),
+    LOCAL_BYTES_READ -> createSizeMetric(sc, "local bytes read"),
+    FETCH_WAIT_TIME -> createTimingMetric(sc, "fetch wait time"),
+    RECORDS_READ -> createMetric(sc, "records read"))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/93112e69/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
new file mode 100644
index 0000000..542141e
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.metric
+
+import org.apache.spark.executor.TempShuffleReadMetrics
+
+/**
+ * A shuffle metrics reporter for SQL exchange operators.
+ * @param tempMetrics [[TempShuffleReadMetrics]] created in TaskContext.
+ * @param metrics All metrics in current SparkPlan. This param should not 
empty and
+ *   contains all shuffle metrics defined in 
[[SQLMetrics.getShuffleReadMetrics]].
+ */
+private[spark] class SQLShuffleMetricsReporter(
+  tempMetrics: TempShuffleReadMetrics,
+  metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics {
+  private[this] val _remoteBlocksFetched = 
metrics(SQLMetrics.REMOTE_BLOCKS_FETCHED)
+  private[this] val _localBlocksFetched = 
metrics(SQLMetrics.LOCAL_BLOCKS_FETCHED)
+  private[this] val _remoteBytesRead = metrics(SQLMetrics.REMOTE_BYTES_READ)
+  private[this] val _remoteBytesReadToDisk = 
metrics(SQLMetrics.REMOTE_BYTES_READ_TO_DISK)
+  private[this] val _localBytesRead = metrics(SQLMetrics.LOCAL_BYTES_READ)
+  private[this] val _fetchWaitTime = metrics(SQLMetrics.FETCH_WAIT_TIME)
+  private[this] val _recordsRead = metrics(SQLMetrics.RECORDS_READ)
+
+  override def incRemoteBlocksFetched(v: Long): Unit = {
+    _remoteBlocksFetched.add(v)
+    tempMetrics.incRemoteBlocksFetched(v)
+  }
+  override def incLocalBlocksFetched(v: Long): Unit = {
+    _localBlocksFetched.add(v)
+    tempMetrics.incLocalBlocksFetched(v)
+  }
+  override def incRemoteBytesRead(v: Long): Unit = {
+    _remoteBytesRead.add(v)
+    tempMetrics.incRemoteBytesRead(v)
+  }
+  override def incRemoteBytesReadToDisk(v: Long): Unit = {
+    _remoteBytesReadToDisk.add(v)
+    tempMetrics.incRemoteBytesReadToDisk(v)
+  }
+  override def incLocalBytesRead(v: Long): Unit = {
+    _localBytesRead.add(v)
+    tempMetrics.incLocalBytesRead(v)
+  }
+  override def incFetchWaitTime(v: Long): Unit = {
+    _fetchWaitTime.add(v)
+    tempMetrics.incFetchWaitTime(v)
+  }
+  override def incRecordsRead(v: Long): Unit = {
+    _recordsRead.add(v)
+    tempMetrics.incRecordsRead(v)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/93112e69/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index d305ce3..96b3aa5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.ShuffleBlockId
 import org.apache.spark.util.collection.ExternalSorter
@@ -137,7 +138,9 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with 
LocalSparkSession {
         rowsRDD,
         new PartitionIdPassthrough(2),
         new UnsafeRowSerializer(2))
-    val shuffled = new ShuffledRowRDD(dependency)
+    val shuffled = new ShuffledRowRDD(
+      dependency,
+      SQLMetrics.getShuffleReadMetrics(spark.sparkContext))
     shuffled.count()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/93112e69/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index b955c15..0f1d08b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -94,8 +94,13 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
         "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"),
       Map("number of output rows" -> 1L,
         "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))
+    val shuffleExpected1 = Map(
+      "records read" -> 2L,
+      "local blocks fetched" -> 2L,
+      "remote blocks fetched" -> 0L)
     testSparkPlanMetrics(df, 1, Map(
       2L -> (("HashAggregate", expected1(0))),
+      1L -> (("Exchange", shuffleExpected1)),
       0L -> (("HashAggregate", expected1(1))))
     )
 
@@ -106,8 +111,13 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
         "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"),
       Map("number of output rows" -> 3L,
         "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))
+    val shuffleExpected2 = Map(
+      "records read" -> 4L,
+      "local blocks fetched" -> 4L,
+      "remote blocks fetched" -> 0L)
     testSparkPlanMetrics(df2, 1, Map(
       2L -> (("HashAggregate", expected2(0))),
+      1L -> (("Exchange", shuffleExpected2)),
       0L -> (("HashAggregate", expected2(1))))
     )
   }
@@ -191,7 +201,11 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
       testSparkPlanMetrics(df, 1, Map(
         0L -> (("SortMergeJoin", Map(
           // It's 4 because we only read 3 rows in the first partition and 1 
row in the second one
-          "number of output rows" -> 4L))))
+          "number of output rows" -> 4L))),
+        2L -> (("Exchange", Map(
+          "records read" -> 4L,
+          "local blocks fetched" -> 2L,
+          "remote blocks fetched" -> 0L))))
       )
     }
   }
@@ -208,7 +222,7 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
         "SELECT * FROM testData2 left JOIN testDataForJoin ON testData2.a = 
testDataForJoin.a")
       testSparkPlanMetrics(df, 1, Map(
         0L -> (("SortMergeJoin", Map(
-          // It's 4 because we only read 3 rows in the first partition and 1 
row in the second one
+          // It's 8 because we read 6 rows in the left and 2 row in the right 
one
           "number of output rows" -> 8L))))
       )
 
@@ -216,7 +230,7 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
         "SELECT * FROM testDataForJoin right JOIN testData2 ON testData2.a = 
testDataForJoin.a")
       testSparkPlanMetrics(df2, 1, Map(
         0L -> (("SortMergeJoin", Map(
-          // It's 4 because we only read 3 rows in the first partition and 1 
row in the second one
+          // It's 8 because we read 6 rows in the left and 2 row in the right 
one
           "number of output rows" -> 8L))))
       )
     }
@@ -287,7 +301,6 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
       // Assume the execution plan is
       // ... -> ShuffledHashJoin(nodeId = 1) -> Project(nodeId = 0)
       val df = df1.join(df2, "key")
-      val metrics = getSparkPlanMetrics(df, 1, Set(1L))
       testSparkPlanMetrics(df, 1, Map(
         1L -> (("ShuffledHashJoin", Map(
           "number of output rows" -> 2L,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to