This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 89cc547 [SPARK-35881][SQL][FOLLOWUP] Remove the AQE post stage creation extension 89cc547 is described below commit 89cc547afd9794b057da93a45a90da04b534ceac Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Thu Aug 12 21:35:28 2021 +0800 [SPARK-35881][SQL][FOLLOWUP] Remove the AQE post stage creation extension ### What changes were proposed in this pull request? This is a followup of #33140 It turns out that we may be able to complete the AQE and columnar execution integration without the AQE post stage creation extension. The rule `ApplyColumnarRulesAndInsertTransitions` can add to-columnar transition if the shuffle/broadcast supports columnar. ### Why are the changes needed? remove APIs that are not needed. ### Does this PR introduce _any_ user-facing change? No, the APIs are not released yet. ### How was this patch tested? existing and manual tests Closes #33701 from cloud-fan/aqe. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 124d011ee73f9805ac840aa5a6eddc27cd09b2e1) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/SparkSessionExtensions.scala | 20 -------------------- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 16 ++++++++++------ .../spark/sql/internal/BaseSessionStateBuilder.scala | 7 +------ .../org/apache/spark/sql/internal/SessionState.scala | 3 +-- 4 files changed, 12 insertions(+), 34 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala index 18ebae5..a4ec481 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala @@ -47,7 +47,6 @@ import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan} * <li>(External) Catalog listeners.</li> * <li>Columnar Rules.</li> * <li>Adaptive Query Stage Preparation Rules.</li> - * <li>Adaptive Query Post Stage Preparation Rules.</li> * </ul> * * The extensions can be used by calling `withExtensions` on the [[SparkSession.Builder]], for @@ -111,12 +110,9 @@ class SparkSessionExtensions { type TableFunctionDescription = (FunctionIdentifier, ExpressionInfo, TableFunctionBuilder) type ColumnarRuleBuilder = SparkSession => ColumnarRule type QueryStagePrepRuleBuilder = SparkSession => Rule[SparkPlan] - type PostStageCreationRuleBuilder = SparkSession => Rule[SparkPlan] private[this] val columnarRuleBuilders = mutable.Buffer.empty[ColumnarRuleBuilder] private[this] val queryStagePrepRuleBuilders = mutable.Buffer.empty[QueryStagePrepRuleBuilder] - private[this] val postStageCreationRuleBuilders = - mutable.Buffer.empty[PostStageCreationRuleBuilder] /** * Build the override rules for columnar execution. @@ -133,14 +129,6 @@ class SparkSessionExtensions { } /** - * Build the override rules for the final query stage preparation phase of adaptive query - * execution. - */ - private[sql] def buildPostStageCreationRules(session: SparkSession): Seq[Rule[SparkPlan]] = { - postStageCreationRuleBuilders.map(_.apply(session)).toSeq - } - - /** * Inject a rule that can override the columnar execution of an executor. */ def injectColumnar(builder: ColumnarRuleBuilder): Unit = { @@ -155,14 +143,6 @@ class SparkSessionExtensions { queryStagePrepRuleBuilders += builder } - /** - * Inject a rule that can override the final query stage preparation phase of adaptive query - * execution. - */ - def injectPostStageCreationRule(builder: PostStageCreationRuleBuilder): Unit = { - postStageCreationRuleBuilders += builder - } - private[this] val resolutionRuleBuilders = mutable.Buffer.empty[RuleBuilder] /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 2c242d1..cd47fd0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -125,14 +125,18 @@ case class AdaptiveSparkPlanExec( OptimizeShuffleWithLocalRead ) - @transient private val staticPostStageCreationRules: Seq[Rule[SparkPlan]] = - CollapseCodegenStages() +: context.session.sessionState.postStageCreationRules + // This rule is stateful as it maintains the codegen stage ID. We can't create a fresh one every + // time and need to keep it in a variable. + @transient private val collapseCodegenStagesRule: Rule[SparkPlan] = + CollapseCodegenStages() // A list of physical optimizer rules to be applied right after a new stage is created. The input // plan to these rules has exchange as its root node. - private def postStageCreationRules(outputsColumnar: Boolean) = + private def postStageCreationRules(outputsColumnar: Boolean) = Seq( ApplyColumnarRulesAndInsertTransitions( - context.session.sessionState.columnarRules, outputsColumnar) +: staticPostStageCreationRules + context.session.sessionState.columnarRules, outputsColumnar), + collapseCodegenStagesRule + ) private def optimizeQueryStage(plan: SparkPlan, isFinalStage: Boolean): SparkPlan = { val optimized = queryStageOptimizerRules.foldLeft(plan) { case (latestPlan, rule) => @@ -522,7 +526,7 @@ case class AdaptiveSparkPlanExec( case s: ShuffleExchangeLike => val newShuffle = applyPhysicalRules( s.withNewChildren(Seq(optimizedPlan)), - postStageCreationRules(outputsColumnar = false), + postStageCreationRules(outputsColumnar = s.supportsColumnar), Some((planChangeLogger, "AQE Post Stage Creation"))) if (!newShuffle.isInstanceOf[ShuffleExchangeLike]) { throw new IllegalStateException( @@ -532,7 +536,7 @@ case class AdaptiveSparkPlanExec( case b: BroadcastExchangeLike => val newBroadcast = applyPhysicalRules( b.withNewChildren(Seq(optimizedPlan)), - postStageCreationRules(outputsColumnar = false), + postStageCreationRules(outputsColumnar = b.supportsColumnar), Some((planChangeLogger, "AQE Post Stage Creation"))) if (!newBroadcast.isInstanceOf[BroadcastExchangeLike]) { throw new IllegalStateException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 1c0c916..8289819 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -307,10 +307,6 @@ abstract class BaseSessionStateBuilder( extensions.buildQueryStagePrepRules(session) } - protected def postStageCreationRules: Seq[Rule[SparkPlan]] = { - extensions.buildPostStageCreationRules(session) - } - /** * Create a query execution object. */ @@ -364,8 +360,7 @@ abstract class BaseSessionStateBuilder( createQueryExecution, createClone, columnarRules, - queryStagePrepRules, - postStageCreationRules) + queryStagePrepRules) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 7685e54..cdf764a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -79,8 +79,7 @@ private[sql] class SessionState( createQueryExecution: (LogicalPlan, CommandExecutionMode.Value) => QueryExecution, createClone: (SparkSession, SessionState) => SessionState, val columnarRules: Seq[ColumnarRule], - val queryStagePrepRules: Seq[Rule[SparkPlan]], - val postStageCreationRules: Seq[Rule[SparkPlan]]) { + val queryStagePrepRules: Seq[Rule[SparkPlan]]) { // The following fields are lazy to avoid creating the Hive client when creating SessionState. lazy val catalog: SessionCatalog = catalogBuilder() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org