This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 2247239 [SPARK-31046][SQL] Make more efficient and clean up AQE update UI code 2247239 is described below commit 2247239dd5fef9506f025c993c43fb8365e57ffd Author: maryannxue <maryann...@apache.org> AuthorDate: Thu Mar 5 18:53:01 2020 +0800 [SPARK-31046][SQL] Make more efficient and clean up AQE update UI code ### What changes were proposed in this pull request? This PR avoids sending redundant metrics (those that have been included in previous update) as well as useless metrics (those in future stages) to Spark UI in AQE UI metrics update. ### Why are the changes needed? This change will make UI metrics update more efficient. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Manual test in Spark UI. Closes #27799 from maryannxue/aqe-ui-cleanup. Authored-by: maryannxue <maryann...@apache.org> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 9b602e26d2a5252623a2ed66a1a2b382665fdab4) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 36 +++++----------------- 1 file changed, 8 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index c018ca4..f2ebe1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -37,7 +37,6 @@ import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec._ import org.apache.spark.sql.execution.exchange._ -import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLAdaptiveSQLMetricUpdates, SQLPlanMetric} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.ThreadUtils @@ -133,21 +132,6 @@ case class AdaptiveSparkPlanExec( executedPlan.resetMetrics() } - private def collectSQLMetrics(plan: SparkPlan): Seq[SQLMetric] = { - val metrics = new mutable.ArrayBuffer[SQLMetric]() - plan.foreach { - case p: ShuffleQueryStageExec if (p.resultOption.isEmpty) => - collectSQLMetrics(p.plan).foreach(metrics += _) - case p: BroadcastQueryStageExec if (p.resultOption.isEmpty) => - collectSQLMetrics(p.plan).foreach(metrics += _) - case p: SparkPlan => - p.metrics.foreach { case metric => - metrics += metric._2 - } - } - metrics - } - private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized { if (!isFinalPlan) { // Subqueries do not have their own execution IDs and therefore rely on the main query to @@ -163,7 +147,7 @@ case class AdaptiveSparkPlanExec( currentPhysicalPlan = result.newPlan if (result.newStages.nonEmpty) { stagesToReplace = result.newStages ++ stagesToReplace - executionId.foreach(onUpdatePlan) + executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan))) // Start materialization of all new stages and fail fast if any stages failed eagerly result.newStages.foreach { stage => @@ -232,7 +216,7 @@ case class AdaptiveSparkPlanExec( // Run the final plan when there's no more unfinished stages. currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules) isFinalPlan = true - executionId.foreach(onUpdatePlan) + executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan))) logDebug(s"Final plan: $currentPhysicalPlan") } currentPhysicalPlan @@ -496,14 +480,18 @@ case class AdaptiveSparkPlanExec( /** * Notify the listeners of the physical plan change. */ - private def onUpdatePlan(executionId: Long): Unit = { + private def onUpdatePlan(executionId: Long, newSubPlans: Seq[SparkPlan]): Unit = { if (isSubquery) { // When executing subqueries, we can't update the query plan in the UI as the // UI doesn't support partial update yet. However, the subquery may have been // optimized into a different plan and we must let the UI know the SQL metrics // of the new plan nodes, so that it can track the valid accumulator updates later // and display SQL metrics correctly. - onUpdateSQLMetrics(collectSQLMetrics(currentPhysicalPlan), executionId) + val newMetrics = newSubPlans.flatMap { p => + p.flatMap(_.metrics.values.map(m => SQLPlanMetric(m.name.get, m.id, m.metricType))) + } + context.session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveSQLMetricUpdates( + executionId.toLong, newMetrics)) } else { context.session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveExecutionUpdate( executionId, @@ -512,14 +500,6 @@ case class AdaptiveSparkPlanExec( } } - private def onUpdateSQLMetrics(sqlMetrics: Seq[SQLMetric], executionId: Long): Unit = { - val sqlPlanMetrics = sqlMetrics.map { case sqlMetric => - SQLPlanMetric(sqlMetric.name.get, sqlMetric.id, sqlMetric.metricType) - } - context.session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveSQLMetricUpdates( - executionId.toLong, sqlPlanMetrics)) - } - /** * Cancel all running stages with best effort and throw an Exception containing all stage * materialization errors and stage cancellation errors. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org