This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b29cb1a  [SPARK-30719][SQL] do not log warning if AQE is intentionally 
skipped and add a config to force apply
b29cb1a is described below

commit b29cb1a82b1a1facf1dd040025db93d998dad4cd
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Thu Feb 6 09:16:14 2020 -0800

    [SPARK-30719][SQL] do not log warning if AQE is intentionally skipped and 
add a config to force apply
    
    ### What changes were proposed in this pull request?
    
    Update `InsertAdaptiveSparkPlan` to not log warning if AQE is skipped 
intentionally.
    
    This PR also add a config to not skip AQE.
    
    ### Why are the changes needed?
    
    It's not a warning at all if we intentionally skip AQE.
    
    ### Does this PR introduce any user-facing change?
    
    no
    
    ### How was this patch tested?
    
    run `AdaptiveQueryExecSuite` locally and verify that there is no warning 
logs.
    
    Closes #27452 from cloud-fan/aqe.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Xiao Li <gatorsm...@gmail.com>
    (cherry picked from commit 8ce58627ebe4f0372fba9a30d8cd4213611acd9b)
    Signed-off-by: Xiao Li <gatorsm...@gmail.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  9 +++
 .../adaptive/InsertAdaptiveSparkPlan.scala         | 83 ++++++++++++----------
 .../adaptive/AdaptiveQueryExecSuite.scala          |  9 +++
 3 files changed, 65 insertions(+), 36 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index acc0922..bed8410 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -358,6 +358,15 @@ object SQLConf {
     .booleanConf
     .createWithDefault(false)
 
+  val ADAPTIVE_EXECUTION_FORCE_APPLY = 
buildConf("spark.sql.adaptive.forceApply")
+    .internal()
+    .doc("Adaptive query execution is skipped when the query does not have 
exchanges or " +
+      "sub-queries. By setting this config to true (together with " +
+      s"'${ADAPTIVE_EXECUTION_ENABLED.key}' enabled), Spark will force apply 
adaptive query " +
+      "execution for all supported queries.")
+    .booleanConf
+    .createWithDefault(false)
+
   val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED =
     buildConf("spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled")
       .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, 
this enables reducing " +
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
index 9252827..621c063 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
@@ -40,49 +40,60 @@ case class InsertAdaptiveSparkPlan(
 
   private val conf = adaptiveExecutionContext.session.sessionState.conf
 
-  def containShuffle(plan: SparkPlan): Boolean = {
-    plan.find {
-      case _: Exchange => true
-      case s: SparkPlan => !s.requiredChildDistribution.forall(_ == 
UnspecifiedDistribution)
-    }.isDefined
-  }
-
-  def containSubQuery(plan: SparkPlan): Boolean = {
-    plan.find(_.expressions.exists(_.find {
-      case _: SubqueryExpression => true
-      case _ => false
-    }.isDefined)).isDefined
-  }
-
   override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, false)
 
   private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = 
plan match {
+    case _ if !conf.adaptiveExecutionEnabled => plan
     case _: ExecutedCommandExec => plan
-    case _ if conf.adaptiveExecutionEnabled && supportAdaptive(plan)
-      && (isSubquery || containShuffle(plan) || containSubQuery(plan)) =>
-      try {
-        // Plan sub-queries recursively and pass in the shared stage cache for 
exchange reuse. Fall
-        // back to non-adaptive mode if adaptive execution is supported in any 
of the sub-queries.
-        val subqueryMap = buildSubqueryMap(plan)
-        val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap)
-        val preprocessingRules = Seq(
-          planSubqueriesRule)
-        // Run pre-processing rules.
-        val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, 
preprocessingRules)
-        logDebug(s"Adaptive execution enabled for plan: $plan")
-        AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, 
preprocessingRules, isSubquery)
-      } catch {
-        case SubqueryAdaptiveNotSupportedException(subquery) =>
-          logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
-            s"but is not supported for sub-query: $subquery.")
-          plan
-      }
-    case _ =>
-      if (conf.adaptiveExecutionEnabled) {
+    case _ if shouldApplyAQE(plan, isSubquery) =>
+      if (supportAdaptive(plan)) {
+        try {
+          // Plan sub-queries recursively and pass in the shared stage cache 
for exchange reuse.
+          // Fall back to non-AQE mode if AQE is not supported in any of the 
sub-queries.
+          val subqueryMap = buildSubqueryMap(plan)
+          val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap)
+          val preprocessingRules = Seq(
+            planSubqueriesRule)
+          // Run pre-processing rules.
+          val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, 
preprocessingRules)
+          logDebug(s"Adaptive execution enabled for plan: $plan")
+          AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, 
preprocessingRules, isSubquery)
+        } catch {
+          case SubqueryAdaptiveNotSupportedException(subquery) =>
+            logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled 
" +
+              s"but is not supported for sub-query: $subquery.")
+            plan
+        }
+      } else {
         logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
           s"but is not supported for query: $plan.")
+        plan
       }
-      plan
+
+    case _ => plan
+  }
+
+  // AQE is only useful when the query has exchanges or sub-queries. This 
method returns true if
+  // one of the following conditions is satisfied:
+  //   - The config ADAPTIVE_EXECUTION_FORCE_APPLY is true.
+  //   - The input query is from a sub-query. When this happens, it means 
we've already decided to
+  //     apply AQE for the main query and we must continue to do it.
+  //   - The query contains exchanges.
+  //   - The query may need to add exchanges. It's an overkill to run 
`EnsureRequirements` here, so
+  //     we just check `SparkPlan.requiredChildDistribution` and see if it's 
possible that the
+  //     the query needs to add exchanges later.
+  //   - The query contains sub-query.
+  private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = {
+    conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || {
+      plan.find {
+        case _: Exchange => true
+        case p if !p.requiredChildDistribution.forall(_ == 
UnspecifiedDistribution) => true
+        case p => p.expressions.exists(_.find {
+          case _: SubqueryExpression => true
+          case _ => false
+        }.isDefined)
+      }.isDefined
+    }
   }
 
   private def supportAdaptive(plan: SparkPlan): Boolean = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 78a1183..96e9772 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -780,4 +780,13 @@ class AdaptiveQueryExecSuite
       )
     }
   }
+
+  test("force apply AQE") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+      val plan = sql("SELECT * FROM testData").queryExecution.executedPlan
+      assert(plan.isInstanceOf[AdaptiveSparkPlanExec])
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to