This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b27a287 [SPARK-33016][SQL] Potential SQLMetrics missed which might cause WEB UI display issue while AQE is on b27a287 is described below commit b27a287ff293c02dcad0c45cca71a5244664d7f5 Author: xuewei.linxuewei <xuewei.linxue...@alibaba-inc.com> AuthorDate: Mon Oct 12 14:48:40 2020 +0000 [SPARK-33016][SQL] Potential SQLMetrics missed which might cause WEB UI display issue while AQE is on ### What changes were proposed in this pull request? With following scenario when AQE is on, SQLMetrics could be incorrect. 1. Stage A and B are created, and UI updated thru event onAdaptiveExecutionUpdate. 2. Stage A and B are running. Subquery in stage A keep updating metrics thru event onAdaptiveSQLMetricUpdate. 3. Stage B completes, while stage A's subquery is still running, updating metrics. 4. Completion of stage B triggers new stage creation and UI update thru event onAdaptiveExecutionUpdate again (just like step 1). So decided to make a trade off of keeping more duplicate SQLMetrics without deleting them when AQE with newPlan updated. ### Why are the changes needed? Make SQLMetrics behavior 100% correct. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Updated SQLAppStatusListenerSuite. Closes #29965 from leanken/leanken-SPARK-33016. Authored-by: xuewei.linxuewei <xuewei.linxue...@alibaba-inc.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/execution/ui/SQLAppStatusListener.scala | 4 ++-- .../sql/execution/ui/SQLAppStatusListenerSuite.scala | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 175340d..963aec7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -341,7 +341,7 @@ class SQLAppStatusListener( val exec = getOrCreateExecution(executionId) exec.physicalPlanDescription = physicalPlanDescription - exec.metrics = sqlPlanMetrics + exec.metrics ++= sqlPlanMetrics update(exec) } @@ -349,7 +349,7 @@ class SQLAppStatusListener( val SparkListenerSQLAdaptiveSQLMetricUpdates(executionId, sqlPlanMetrics) = event val exec = getOrCreateExecution(executionId) - exec.metrics = exec.metrics ++ sqlPlanMetrics + exec.metrics ++= sqlPlanMetrics update(exec) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index f49a3a3..00f2371 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -680,7 +680,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils assert(sparkPlanInfo.nodeName === "WholeStageCodegen (2)") } - test("SPARK-32615: SQLMetrics validation after sparkPlanInfo updated in AQE") { + test("SPARK-32615,SPARK-33016: SQLMetrics validation after sparkPlanInfo updated in AQE") { val statusStore = createStatusStore() val listener = statusStore.listener.get @@ -755,7 +755,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils .allNodes.flatMap(_.metrics.map(_.accumulatorId)) // Assume that AQE update sparkPlanInfo with newPlan - // ExecutionMetrics will be replaced using newPlan's SQLMetrics + // ExecutionMetrics will be appended using newPlan's SQLMetrics listener.onOtherEvent(SparkListenerSQLAdaptiveExecutionUpdate( executionId, "test", @@ -770,8 +770,8 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0))) listener.onTaskStart(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0))) - // live metrics will be override, and ExecutionMetrics should be empty as the newPlan updated. - assert(statusStore.executionMetrics(executionId).isEmpty) + // historical metrics will be kept despite of the newPlan updated. + assert(statusStore.executionMetrics(executionId).size == 2) // update new metrics with Id 4 & 5, since 3 is timing metrics, // timing metrics has a complicated string presentation so we don't test it here. @@ -780,9 +780,9 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils (0L, 1, 0, createAccumulatorInfos(newMetricsValueMap)) ))) - assert(statusStore.executionMetrics(executionId).size == 2) + assert(statusStore.executionMetrics(executionId).size == 4) statusStore.executionMetrics(executionId).foreach { m => - assert(m._2 == "500") + assert(m._2 == "100" || m._2 == "500") } listener.onTaskEnd(SparkListenerTaskEnd( @@ -802,10 +802,10 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils JobSucceeded )) - // aggregateMetrics should ignore metrics from job 0 + // aggregateMetrics should contains all metrics from job 0 and job 1 val aggregateMetrics = listener.liveExecutionMetrics(executionId) if (aggregateMetrics.isDefined) { - oldAccumulatorIds.foreach(id => assert(!aggregateMetrics.get.contains(id))) + assert(aggregateMetrics.get.keySet.size == 4) } listener.onOtherEvent(SparkListenerSQLExecutionEnd( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org