[ https://issues.apache.org/jira/browse/SPARK-46240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
jiang13021 updated SPARK-46240: ------------------------------- Description: Some rules (Rule[SparkPlan]) are applied when preparing for the executedPlan. However, users do not have the ability to add rules in this context. {code:java} // org.apache.spark.sql.execution.QueryExecution#preparations private[execution] def preparations( sparkSession: SparkSession, adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None, subquery: Boolean): Seq[Rule[SparkPlan]] = { // `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op // as the original plan is hidden behind `AdaptiveSparkPlanExec`. adaptiveExecutionRule.toSeq ++ Seq( CoalesceBucketsInJoin, PlanDynamicPruningFilters(sparkSession), PlanSubqueries(sparkSession), RemoveRedundantProjects, EnsureRequirements(), // `ReplaceHashWithSortAgg` needs to be added after `EnsureRequirements` to guarantee the // sort order of each node is checked to be valid. ReplaceHashWithSortAgg, // `RemoveRedundantSorts` needs to be added after `EnsureRequirements` to guarantee the same // number of partitions when instantiating PartitioningCollection. RemoveRedundantSorts, DisableUnnecessaryBucketedScan, ApplyColumnarRulesAndInsertTransitions( sparkSession.sessionState.columnarRules, outputsColumnar = false), CollapseCodegenStages()) ++ (if (subquery) { Nil } else { Seq(ReuseExchangeAndSubquery) }) }{code} We need to add some "Rule[SparkPlan]"s at this position because currently, all such rules are present in AQE, which requires users to use AQE and meet the requirements to enter AdaptiveSparkPlanExec. This makes it difficult to implement certain extensions for simple SQLs. For example, adding some new datasource filters for external data sources is challenging. Modifying DataSourceStrategy directly is not conducive to staying in sync with future advancements in the community. Additionally, customizing the Strategy makes it difficult to append new functionalities in an incremental manner. If we define AQE rules, they would not be effective for the simplest 'SELECT * FROM ... WHERE ...' statements. Therefore, it is necessary to introduce a customizable Rule[SparkPlan] between sparkPlan and executedPlan. We could add an extension called "ExecutedPlanPrepRule" to SparkSessionExtensions, which would allow users to add their own rules. was: Some rules (Rule[SparkPlan]) are applied when preparing for the executedPlan. However, users do not have the ability to add rules in this context. {code:java} // org.apache.spark.sql.execution.QueryExecution#preparations private[execution] def preparations( sparkSession: SparkSession, adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None, subquery: Boolean): Seq[Rule[SparkPlan]] = { // `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op // as the original plan is hidden behind `AdaptiveSparkPlanExec`. adaptiveExecutionRule.toSeq ++ Seq( CoalesceBucketsInJoin, PlanDynamicPruningFilters(sparkSession), PlanSubqueries(sparkSession), RemoveRedundantProjects, EnsureRequirements(), // `ReplaceHashWithSortAgg` needs to be added after `EnsureRequirements` to guarantee the // sort order of each node is checked to be valid. ReplaceHashWithSortAgg, // `RemoveRedundantSorts` needs to be added after `EnsureRequirements` to guarantee the same // number of partitions when instantiating PartitioningCollection. RemoveRedundantSorts, DisableUnnecessaryBucketedScan, ApplyColumnarRulesAndInsertTransitions( sparkSession.sessionState.columnarRules, outputsColumnar = false), CollapseCodegenStages()) ++ (if (subquery) { Nil } else { Seq(ReuseExchangeAndSubquery) }) }{code} We could add an extension called "PrepExecutedPlanRule" to SparkSessionExtensions, which would allow users to add their own rules. Summary: Add ExecutedPlanPrepRules to SparkSessionExtensions (was: Add PrepExecutedPlanRule to SparkSessionExtensions) > Add ExecutedPlanPrepRules to SparkSessionExtensions > --------------------------------------------------- > > Key: SPARK-46240 > URL: https://issues.apache.org/jira/browse/SPARK-46240 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.2.0, 3.3.0, 3.4.0 > Reporter: jiang13021 > Priority: Major > > Some rules (Rule[SparkPlan]) are applied when preparing for the executedPlan. > However, users do not have the ability to add rules in this context. > {code:java} > // org.apache.spark.sql.execution.QueryExecution#preparations > private[execution] def preparations( > sparkSession: SparkSession, > adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None, > subquery: Boolean): Seq[Rule[SparkPlan]] = { > // `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following > rules will be no-op > // as the original plan is hidden behind `AdaptiveSparkPlanExec`. > adaptiveExecutionRule.toSeq ++ > Seq( > CoalesceBucketsInJoin, > PlanDynamicPruningFilters(sparkSession), > PlanSubqueries(sparkSession), > RemoveRedundantProjects, > EnsureRequirements(), > // `ReplaceHashWithSortAgg` needs to be added after `EnsureRequirements` > to guarantee the > // sort order of each node is checked to be valid. > ReplaceHashWithSortAgg, > // `RemoveRedundantSorts` needs to be added after `EnsureRequirements` to > guarantee the same > // number of partitions when instantiating PartitioningCollection. > RemoveRedundantSorts, > DisableUnnecessaryBucketedScan, > ApplyColumnarRulesAndInsertTransitions( > sparkSession.sessionState.columnarRules, outputsColumnar = false), > CollapseCodegenStages()) ++ > (if (subquery) { > Nil > } else { > Seq(ReuseExchangeAndSubquery) > }) > }{code} > We need to add some "Rule[SparkPlan]"s at this position because currently, > all such rules are present in AQE, which requires users to use AQE and meet > the requirements to enter AdaptiveSparkPlanExec. This makes it difficult to > implement certain extensions for simple SQLs. > For example, adding some new datasource filters for external data sources is > challenging. Modifying DataSourceStrategy directly is not conducive to > staying in sync with future advancements in the community. Additionally, > customizing the Strategy makes it difficult to append new functionalities in > an incremental manner. If we define AQE rules, they would not be effective > for the simplest 'SELECT * FROM ... WHERE ...' statements. Therefore, it is > necessary to introduce a customizable Rule[SparkPlan] between sparkPlan and > executedPlan. > We could add an extension called "ExecutedPlanPrepRule" to > SparkSessionExtensions, which would allow users to add their own rules. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org