This is an automated email from the ASF dual-hosted git repository.

jiangxb1987 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new db7b865  [SPARK-31253][SQL][FOLLOW-UP] simplify the code of 
calculating size metrics of AQE shuffle
db7b865 is described below

commit db7b8651a19d5a749a9f0b4e8fb517e6994921c2
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Fri Apr 17 13:20:34 2020 -0700

    [SPARK-31253][SQL][FOLLOW-UP] simplify the code of calculating size metrics 
of AQE shuffle
    
    ### What changes were proposed in this pull request?
    
    A followup of https://github.com/apache/spark/pull/28175:
    1. use mutable collection to store the driver metrics
    2. don't send size metrics if there is no map stats, as UI will display 
size as 0 if there is no data
    3. calculate partition data size separately, to make the code easier to 
read.
    
    ### Why are the changes needed?
    
    code simplification
    
    ### Does this PR introduce any user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #28240 from cloud-fan/refactor.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com>
---
 .../adaptive/CustomShuffleReaderExec.scala         | 50 ++++++++++------------
 1 file changed, 22 insertions(+), 28 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
index 68f20bc..6450d49 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
@@ -97,14 +97,27 @@ case class CustomShuffleReaderExec private(
     case _ => None
   }
 
+  @transient private lazy val partitionDataSizes: Option[Seq[Long]] = {
+    if (!isLocalReader && shuffleStage.get.mapStats.isDefined) {
+      val bytesByPartitionId = shuffleStage.get.mapStats.get.bytesByPartitionId
+      Some(partitionSpecs.map {
+        case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
+          startReducerIndex.until(endReducerIndex).map(bytesByPartitionId).sum
+        case p: PartialReducerPartitionSpec => p.dataSize
+        case p => throw new IllegalStateException("unexpected " + p)
+      })
+    } else {
+      None
+    }
+  }
+
   private def sendDriverMetrics(): Unit = {
     val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-    var driverAccumUpdates: Seq[(Long, Long)] = Seq.empty
+    val driverAccumUpdates = ArrayBuffer.empty[(Long, Long)]
 
     val numPartitionsMetric = metrics("numPartitions")
     numPartitionsMetric.set(partitionSpecs.length)
-    driverAccumUpdates = driverAccumUpdates :+
-      (numPartitionsMetric.id, partitionSpecs.length.toLong)
+    driverAccumUpdates += (numPartitionsMetric.id -> 
partitionSpecs.length.toLong)
 
     if (hasSkewedPartition) {
       val skewedMetric = metrics("numSkewedPartitions")
@@ -112,33 +125,14 @@ case class CustomShuffleReaderExec private(
         case p: PartialReducerPartitionSpec => p.reducerIndex
       }.distinct.length
       skewedMetric.set(numSkewedPartitions)
-      driverAccumUpdates = driverAccumUpdates :+ (skewedMetric.id, 
numSkewedPartitions.toLong)
+      driverAccumUpdates += (skewedMetric.id -> numSkewedPartitions.toLong)
     }
 
-    if(!isLocalReader) {
-      val partitionMetrics = metrics("partitionDataSize")
-      val mapStats = shuffleStage.get.mapStats
-
-      if (mapStats.isEmpty) {
-        partitionMetrics.set(0)
-        driverAccumUpdates = driverAccumUpdates :+ (partitionMetrics.id, 0L)
-      } else {
-        var sum = 0L
-        partitionSpecs.foreach {
-          case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
-            val dataSize = startReducerIndex.until(endReducerIndex).map(
-              mapStats.get.bytesByPartitionId(_)).sum
-            driverAccumUpdates = driverAccumUpdates :+ (partitionMetrics.id, 
dataSize)
-            sum += dataSize
-          case p: PartialReducerPartitionSpec =>
-            driverAccumUpdates = driverAccumUpdates :+ (partitionMetrics.id, 
p.dataSize)
-            sum += p.dataSize
-          case p => throw new IllegalStateException("unexpected " + p)
-        }
-
-        // Set sum value to "partitionDataSize" metric.
-        partitionMetrics.set(sum)
-      }
+    partitionDataSizes.foreach { dataSizes =>
+      val partitionDataSizeMetrics = metrics("partitionDataSize")
+      driverAccumUpdates ++= dataSizes.map(partitionDataSizeMetrics.id -> _)
+      // Set sum value to "partitionDataSize" metric.
+      partitionDataSizeMetrics.set(dataSizes.sum)
     }
 
     SQLMetrics.postDriverMetricsUpdatedByValue(sparkContext, executionId, 
driverAccumUpdates)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to