This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 2247239  [SPARK-31046][SQL] Make more efficient and clean up AQE 
update UI code
2247239 is described below

commit 2247239dd5fef9506f025c993c43fb8365e57ffd
Author: maryannxue <maryann...@apache.org>
AuthorDate: Thu Mar 5 18:53:01 2020 +0800

    [SPARK-31046][SQL] Make more efficient and clean up AQE update UI code
    
    ### What changes were proposed in this pull request?
    This PR avoids sending redundant metrics (those that have been included in 
previous update) as well as useless metrics (those in future stages) to Spark 
UI in AQE UI metrics update.
    
    ### Why are the changes needed?
    This change will make UI metrics update more efficient.
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Manual test in Spark UI.
    
    Closes #27799 from maryannxue/aqe-ui-cleanup.
    
    Authored-by: maryannxue <maryann...@apache.org>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 9b602e26d2a5252623a2ed66a1a2b382665fdab4)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../execution/adaptive/AdaptiveSparkPlanExec.scala | 36 +++++-----------------
 1 file changed, 8 insertions(+), 28 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index c018ca4..f2ebe1a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -37,7 +37,6 @@ import org.apache.spark.sql.catalyst.trees.TreeNodeTag
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec._
 import org.apache.spark.sql.execution.exchange._
-import org.apache.spark.sql.execution.metric.SQLMetric
 import 
org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, 
SparkListenerSQLAdaptiveSQLMetricUpdates, SQLPlanMetric}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.ThreadUtils
@@ -133,21 +132,6 @@ case class AdaptiveSparkPlanExec(
     executedPlan.resetMetrics()
   }
 
-  private def collectSQLMetrics(plan: SparkPlan): Seq[SQLMetric] = {
-    val metrics = new mutable.ArrayBuffer[SQLMetric]()
-    plan.foreach {
-      case p: ShuffleQueryStageExec if (p.resultOption.isEmpty) =>
-        collectSQLMetrics(p.plan).foreach(metrics += _)
-      case p: BroadcastQueryStageExec if (p.resultOption.isEmpty) =>
-        collectSQLMetrics(p.plan).foreach(metrics += _)
-      case p: SparkPlan =>
-        p.metrics.foreach { case metric =>
-          metrics += metric._2
-        }
-    }
-    metrics
-  }
-
   private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
     if (!isFinalPlan) {
       // Subqueries do not have their own execution IDs and therefore rely on 
the main query to
@@ -163,7 +147,7 @@ case class AdaptiveSparkPlanExec(
         currentPhysicalPlan = result.newPlan
         if (result.newStages.nonEmpty) {
           stagesToReplace = result.newStages ++ stagesToReplace
-          executionId.foreach(onUpdatePlan)
+          executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan)))
 
           // Start materialization of all new stages and fail fast if any 
stages failed eagerly
           result.newStages.foreach { stage =>
@@ -232,7 +216,7 @@ case class AdaptiveSparkPlanExec(
       // Run the final plan when there's no more unfinished stages.
       currentPhysicalPlan = applyPhysicalRules(result.newPlan, 
queryStageOptimizerRules)
       isFinalPlan = true
-      executionId.foreach(onUpdatePlan)
+      executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
       logDebug(s"Final plan: $currentPhysicalPlan")
     }
     currentPhysicalPlan
@@ -496,14 +480,18 @@ case class AdaptiveSparkPlanExec(
   /**
    * Notify the listeners of the physical plan change.
    */
-  private def onUpdatePlan(executionId: Long): Unit = {
+  private def onUpdatePlan(executionId: Long, newSubPlans: Seq[SparkPlan]): 
Unit = {
     if (isSubquery) {
       // When executing subqueries, we can't update the query plan in the UI 
as the
       // UI doesn't support partial update yet. However, the subquery may have 
been
       // optimized into a different plan and we must let the UI know the SQL 
metrics
       // of the new plan nodes, so that it can track the valid accumulator 
updates later
       // and display SQL metrics correctly.
-      onUpdateSQLMetrics(collectSQLMetrics(currentPhysicalPlan), executionId)
+      val newMetrics = newSubPlans.flatMap { p =>
+        p.flatMap(_.metrics.values.map(m => SQLPlanMetric(m.name.get, m.id, 
m.metricType)))
+      }
+      
context.session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveSQLMetricUpdates(
+        executionId.toLong, newMetrics))
     } else {
       
context.session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveExecutionUpdate(
         executionId,
@@ -512,14 +500,6 @@ case class AdaptiveSparkPlanExec(
     }
   }
 
-  private def onUpdateSQLMetrics(sqlMetrics: Seq[SQLMetric], executionId: 
Long): Unit = {
-    val sqlPlanMetrics = sqlMetrics.map { case sqlMetric =>
-      SQLPlanMetric(sqlMetric.name.get, sqlMetric.id, sqlMetric.metricType)
-    }
-    
context.session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveSQLMetricUpdates(
-      executionId.toLong, sqlPlanMetrics))
-  }
-
   /**
    * Cancel all running stages with best effort and throw an Exception 
containing all stage
    * materialization errors and stage cancellation errors.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to