This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new ba43922 [SPARK-31658][SQL] Fix SQL UI not showing write commands of AQE plan ba43922 is described below commit ba4392217b461d20bfd10dbc00714dbb7268d71a Author: manuzhang <owenzhang1...@gmail.com> AuthorDate: Fri May 8 10:24:13 2020 -0700 [SPARK-31658][SQL] Fix SQL UI not showing write commands of AQE plan Show write commands on SQL UI of an AQE plan Currently the leaf node of an AQE plan is always a `AdaptiveSparkPlan` which is not true when it's a child of a write command. Hence, the node of the write command as well as its metrics are not shown on the SQL UI. ![image](https://user-images.githubusercontent.com/1191767/81288918-1893f580-9098-11ea-9771-e3d0820ba806.png) ![image](https://user-images.githubusercontent.com/1191767/81289008-3a8d7800-9098-11ea-93ec-516bbaf25d2d.png) No Add UT. Closes #28474 from manuzhang/aqe-ui. Lead-authored-by: manuzhang <owenzhang1...@gmail.com> Co-authored-by: Xiao Li <gatorsm...@gmail.com> Signed-off-by: gatorsmile <gatorsm...@gmail.com> (cherry picked from commit 77c690a7252b22c9dd8f3cb7ac32f79fd6845cad) Signed-off-by: gatorsmile <gatorsm...@gmail.com> --- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 4 +-- .../adaptive/AdaptiveQueryExecSuite.scala | 35 ++++++++++++++++++++-- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index cd6936b..90d1db9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -526,8 +526,8 @@ case class AdaptiveSparkPlanExec( } else { context.session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveExecutionUpdate( executionId, - SQLExecution.getQueryExecution(executionId).toString, - SparkPlanInfo.fromSparkPlan(this))) + context.qe.toString, + SparkPlanInfo.fromSparkPlan(context.qe.executedPlan))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index f30d1e9..29b9755 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -805,9 +805,11 @@ class AdaptiveQueryExecSuite test("SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of write commands") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { - val plan = sql("CREATE TABLE t1 AS SELECT 1 col").queryExecution.executedPlan - assert(plan.isInstanceOf[DataWritingCommandExec]) - assert(plan.asInstanceOf[DataWritingCommandExec].child.isInstanceOf[AdaptiveSparkPlanExec]) + withTable("t1") { + val plan = sql("CREATE TABLE t1 AS SELECT 1 col").queryExecution.executedPlan + assert(plan.isInstanceOf[DataWritingCommandExec]) + assert(plan.asInstanceOf[DataWritingCommandExec].child.isInstanceOf[AdaptiveSparkPlanExec]) + } } } @@ -847,4 +849,31 @@ class AdaptiveQueryExecSuite } } } + + test("SPARK-31658: SQL UI should show write commands") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + withTable("t1") { + var checkDone = false + val listener = new SparkListener { + override def onOtherEvent(event: SparkListenerEvent): Unit = { + event match { + case SparkListenerSQLAdaptiveExecutionUpdate(_, _, planInfo) => + assert(planInfo.nodeName == "Execute CreateDataSourceTableAsSelectCommand") + checkDone = true + case _ => // ignore other events + } + } + } + spark.sparkContext.addSparkListener(listener) + try { + sql("CREATE TABLE t1 AS SELECT 1 col").collect() + spark.sparkContext.listenerBus.waitUntilEmpty() + assert(checkDone) + } finally { + spark.sparkContext.removeSparkListener(listener) + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org