This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e1ed80a  [SPARK-31473][SQL] AQE should set active session during 
execution
e1ed80a is described below

commit e1ed80a134a2d613a9221a15847cad3274cd6f2b
Author: Maryann Xue <maryann....@gmail.com>
AuthorDate: Sat Apr 18 00:08:36 2020 -0700

    [SPARK-31473][SQL] AQE should set active session during execution
    
    ### What changes were proposed in this pull request?
    
    AQE creates new SparkPlan nodes during execution. This PR makes sure that 
the active session is set correctly during this process and AQE execution is 
not disrupted by external session change.
    
    ### Why are the changes needed?
    
    To prevent potential errors. If not changed, the physical plans generated 
by AQE would have the wrong SparkSession or even null SparkSession, which could 
lead to NPE.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added UT.
    
    Closes #28247 from maryannxue/aqe-activesession.
    
    Authored-by: Maryann Xue <maryann....@gmail.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
    (cherry picked from commit 6198f384054e7f86521891ceeb1a231f449a16a8)
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../sql/execution/adaptive/AdaptiveSparkPlanExec.scala      |  9 +++++++--
 .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala     | 13 ++++++++++++-
 2 files changed, 19 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index b54a32f..2b46724 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -139,7 +139,12 @@ case class AdaptiveSparkPlanExec(
   }
 
   private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
-    if (!isFinalPlan) {
+    if (isFinalPlan) return currentPhysicalPlan
+
+    // In case of this adaptive plan being executed out of `withActive` scoped 
functions, e.g.,
+    // `plan.queryExecution.rdd`, we need to set active session here as new 
plan nodes can be
+    // created in the middle of the execution.
+    context.session.withActive {
       // Subqueries do not have their own execution IDs and therefore rely on 
the main query to
       // update UI.
       val executionId = Option(context.session.sparkContext.getLocalProperty(
@@ -225,8 +230,8 @@ case class AdaptiveSparkPlanExec(
       isFinalPlan = true
       executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
       logOnLevel(s"Final plan: $currentPhysicalPlan")
+      currentPhysicalPlan
     }
-    currentPhysicalPlan
   }
 
   override def executeCollect(): Array[InternalRow] = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 64dd9aa..6da510f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -23,13 +23,14 @@ import java.net.URI
 import org.apache.log4j.Level
 
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, 
SparkListenerJobStart}
-import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.{QueryTest, Row, SparkSession}
 import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, 
SparkPlan}
 import 
org.apache.spark.sql.execution.adaptive.OptimizeLocalShuffleReader.LOCAL_SHUFFLE_READER_DESCRIPTION
 import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
Exchange, ReusedExchangeExec}
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
BuildRight, SortMergeJoinExec}
 import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{IntegerType, StructType}
@@ -805,4 +806,14 @@ class AdaptiveQueryExecSuite
       
assert(plan.asInstanceOf[DataWritingCommandExec].child.isInstanceOf[AdaptiveSparkPlanExec])
     }
   }
+
+  test("AQE should set active session during execution") {
+    withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+      val df = spark.range(10).select(sum('id))
+      
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
+      SparkSession.setActiveSession(null)
+      checkAnswer(df, Seq(Row(45)))
+      SparkSession.setActiveSession(spark) // recover the active session.
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to