This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new b29cb1a [SPARK-30719][SQL] do not log warning if AQE is intentionally skipped and add a config to force apply b29cb1a is described below commit b29cb1a82b1a1facf1dd040025db93d998dad4cd Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Thu Feb 6 09:16:14 2020 -0800 [SPARK-30719][SQL] do not log warning if AQE is intentionally skipped and add a config to force apply ### What changes were proposed in this pull request? Update `InsertAdaptiveSparkPlan` to not log warning if AQE is skipped intentionally. This PR also add a config to not skip AQE. ### Why are the changes needed? It's not a warning at all if we intentionally skip AQE. ### Does this PR introduce any user-facing change? no ### How was this patch tested? run `AdaptiveQueryExecSuite` locally and verify that there is no warning logs. Closes #27452 from cloud-fan/aqe. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Xiao Li <gatorsm...@gmail.com> (cherry picked from commit 8ce58627ebe4f0372fba9a30d8cd4213611acd9b) Signed-off-by: Xiao Li <gatorsm...@gmail.com> --- .../org/apache/spark/sql/internal/SQLConf.scala | 9 +++ .../adaptive/InsertAdaptiveSparkPlan.scala | 83 ++++++++++++---------- .../adaptive/AdaptiveQueryExecSuite.scala | 9 +++ 3 files changed, 65 insertions(+), 36 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index acc0922..bed8410 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -358,6 +358,15 @@ object SQLConf { .booleanConf .createWithDefault(false) + val ADAPTIVE_EXECUTION_FORCE_APPLY = buildConf("spark.sql.adaptive.forceApply") + .internal() + .doc("Adaptive query execution is skipped when the query does not have exchanges or " + + "sub-queries. By setting this config to true (together with " + + s"'${ADAPTIVE_EXECUTION_ENABLED.key}' enabled), Spark will force apply adaptive query " + + "execution for all supported queries.") + .booleanConf + .createWithDefault(false) + val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED = buildConf("spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled") .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, this enables reducing " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala index 9252827..621c063 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala @@ -40,49 +40,60 @@ case class InsertAdaptiveSparkPlan( private val conf = adaptiveExecutionContext.session.sessionState.conf - def containShuffle(plan: SparkPlan): Boolean = { - plan.find { - case _: Exchange => true - case s: SparkPlan => !s.requiredChildDistribution.forall(_ == UnspecifiedDistribution) - }.isDefined - } - - def containSubQuery(plan: SparkPlan): Boolean = { - plan.find(_.expressions.exists(_.find { - case _: SubqueryExpression => true - case _ => false - }.isDefined)).isDefined - } - override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, false) private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = plan match { + case _ if !conf.adaptiveExecutionEnabled => plan case _: ExecutedCommandExec => plan - case _ if conf.adaptiveExecutionEnabled && supportAdaptive(plan) - && (isSubquery || containShuffle(plan) || containSubQuery(plan)) => - try { - // Plan sub-queries recursively and pass in the shared stage cache for exchange reuse. Fall - // back to non-adaptive mode if adaptive execution is supported in any of the sub-queries. - val subqueryMap = buildSubqueryMap(plan) - val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap) - val preprocessingRules = Seq( - planSubqueriesRule) - // Run pre-processing rules. - val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules) - logDebug(s"Adaptive execution enabled for plan: $plan") - AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery) - } catch { - case SubqueryAdaptiveNotSupportedException(subquery) => - logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " + - s"but is not supported for sub-query: $subquery.") - plan - } - case _ => - if (conf.adaptiveExecutionEnabled) { + case _ if shouldApplyAQE(plan, isSubquery) => + if (supportAdaptive(plan)) { + try { + // Plan sub-queries recursively and pass in the shared stage cache for exchange reuse. + // Fall back to non-AQE mode if AQE is not supported in any of the sub-queries. + val subqueryMap = buildSubqueryMap(plan) + val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap) + val preprocessingRules = Seq( + planSubqueriesRule) + // Run pre-processing rules. + val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules) + logDebug(s"Adaptive execution enabled for plan: $plan") + AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery) + } catch { + case SubqueryAdaptiveNotSupportedException(subquery) => + logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " + + s"but is not supported for sub-query: $subquery.") + plan + } + } else { logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " + s"but is not supported for query: $plan.") + plan } - plan + + case _ => plan + } + + // AQE is only useful when the query has exchanges or sub-queries. This method returns true if + // one of the following conditions is satisfied: + // - The config ADAPTIVE_EXECUTION_FORCE_APPLY is true. + // - The input query is from a sub-query. When this happens, it means we've already decided to + // apply AQE for the main query and we must continue to do it. + // - The query contains exchanges. + // - The query may need to add exchanges. It's an overkill to run `EnsureRequirements` here, so + // we just check `SparkPlan.requiredChildDistribution` and see if it's possible that the + // the query needs to add exchanges later. + // - The query contains sub-query. + private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = { + conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || { + plan.find { + case _: Exchange => true + case p if !p.requiredChildDistribution.forall(_ == UnspecifiedDistribution) => true + case p => p.expressions.exists(_.find { + case _: SubqueryExpression => true + case _ => false + }.isDefined) + }.isDefined + } } private def supportAdaptive(plan: SparkPlan): Boolean = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 78a1183..96e9772 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -780,4 +780,13 @@ class AdaptiveQueryExecSuite ) } } + + test("force apply AQE") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + val plan = sql("SELECT * FROM testData").queryExecution.executedPlan + assert(plan.isInstanceOf[AdaptiveSparkPlanExec]) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org