This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6198f38 [SPARK-31473][SQL] AQE should set active session during execution 6198f38 is described below commit 6198f384054e7f86521891ceeb1a231f449a16a8 Author: Maryann Xue <maryann....@gmail.com> AuthorDate: Sat Apr 18 00:08:36 2020 -0700 [SPARK-31473][SQL] AQE should set active session during execution ### What changes were proposed in this pull request? AQE creates new SparkPlan nodes during execution. This PR makes sure that the active session is set correctly during this process and AQE execution is not disrupted by external session change. ### Why are the changes needed? To prevent potential errors. If not changed, the physical plans generated by AQE would have the wrong SparkSession or even null SparkSession, which could lead to NPE. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added UT. Closes #28247 from maryannxue/aqe-activesession. Authored-by: Maryann Xue <maryann....@gmail.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 9 +++++++-- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 13 ++++++++++++- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 217817e..3ac4ea5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -139,7 +139,12 @@ case class AdaptiveSparkPlanExec( } private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized { - if (!isFinalPlan) { + if (isFinalPlan) return currentPhysicalPlan + + // In case of this adaptive plan being executed out of `withActive` scoped functions, e.g., + // `plan.queryExecution.rdd`, we need to set active session here as new plan nodes can be + // created in the middle of the execution. + context.session.withActive { // Subqueries do not have their own execution IDs and therefore rely on the main query to // update UI. val executionId = Option(context.session.sparkContext.getLocalProperty( @@ -225,8 +230,8 @@ case class AdaptiveSparkPlanExec( isFinalPlan = true executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan))) logOnLevel(s"Final plan: $currentPhysicalPlan") + currentPhysicalPlan } - currentPhysicalPlan } override def executeCollect(): Array[InternalRow] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 35dec44..694be98 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -23,12 +23,13 @@ import java.net.URI import org.apache.log4j.Level import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{QueryTest, Row, SparkSession} import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, SparkPlan} import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight, SortMergeJoinExec} import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate +import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StructType} @@ -907,4 +908,14 @@ class AdaptiveQueryExecSuite assert(plan.asInstanceOf[DataWritingCommandExec].child.isInstanceOf[AdaptiveSparkPlanExec]) } } + + test("AQE should set active session during execution") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + val df = spark.range(10).select(sum('id)) + assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec]) + SparkSession.setActiveSession(null) + checkAnswer(df, Seq(Row(45))) + SparkSession.setActiveSession(spark) // recover the active session. + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org