This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 7313d71438e4 [SPARK-48273][SQL] Fix late rewrite of PlanWithUnresolvedIdentifier 7313d71438e4 is described below commit 7313d71438e4691f7c086e90ded4a6f644cdcdc5 Author: Nikola Mandic <nikola.man...@databricks.com> AuthorDate: Tue May 28 09:59:53 2024 -0700 [SPARK-48273][SQL] Fix late rewrite of PlanWithUnresolvedIdentifier ### What changes were proposed in this pull request? `PlanWithUnresolvedIdentifier` is rewritten later in analysis which causes rules like `SubstituteUnresolvedOrdinals` to miss the new plan. This causes following queries to fail: ``` create temporary view identifier('v1') as (select my_col from (values (1), (2), (1) as (my_col)) group by 1); -- cache table identifier('t1') as (select my_col from (values (1), (2), (1) as (my_col)) group by 1); -- create table identifier('t2') as (select my_col from (values (1), (2), (1) as (my_col)) group by 1); insert into identifier('t2') select my_col from (values (3) as (my_col)) group by 1; ``` Fix this by explicitly applying rules after plan rewrite. ### Why are the changes needed? To fix the described bug. ### Does this PR introduce _any_ user-facing change? Yes, it fixes the mentioned problematic queries. ### How was this patch tested? Updated existing `identifier-clause.sql` golden file. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46580 from nikolamand-db/SPARK-48273. Authored-by: Nikola Mandic <nikola.man...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 731a2cfcffaeeeb1f1c107080ca77000330d79b5) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 9 ++-- .../analysis/ResolveIdentifierClause.scala | 11 ++-- .../spark/sql/catalyst/rules/RuleExecutor.scala | 2 +- .../analyzer-results/identifier-clause.sql.out | 59 ++++++++++++++++++++++ .../sql-tests/inputs/identifier-clause.sql | 9 ++++ .../sql-tests/results/identifier-clause.sql.out | 56 ++++++++++++++++++++ 6 files changed, 139 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index ed7b978045c7..5890a9692e20 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -255,7 +255,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor TypeCoercion.typeCoercionRules } - override def batches: Seq[Batch] = Seq( + private def earlyBatches: Seq[Batch] = Seq( Batch("Substitution", fixedPoint, // This rule optimizes `UpdateFields` expression chains so looks more like optimization rule. // However, when manipulating deeply nested schema, `UpdateFields` expression tree could be @@ -275,7 +275,10 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor Batch("Simple Sanity Check", Once, LookupFunctions), Batch("Keep Legacy Outputs", Once, - KeepLegacyOutputs), + KeepLegacyOutputs) + ) + + override def batches: Seq[Batch] = earlyBatches ++ Seq( Batch("Resolution", fixedPoint, new ResolveCatalogs(catalogManager) :: ResolveInsertInto :: @@ -319,7 +322,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor ResolveTimeZone :: ResolveRandomSeed :: ResolveBinaryArithmetic :: - ResolveIdentifierClause :: + new ResolveIdentifierClause(earlyBatches) :: ResolveUnion :: ResolveRowLevelCommandAssignments :: RewriteDeleteFromTable :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala index e0d3e5629ef8..422bad3d89e2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala @@ -20,19 +20,24 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.expressions.{AliasHelper, EvalHelper, Expression} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_IDENTIFIER import org.apache.spark.sql.types.StringType /** * Resolves the identifier expressions and builds the original plans/expressions. */ -object ResolveIdentifierClause extends Rule[LogicalPlan] with AliasHelper with EvalHelper { +class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch]) + extends Rule[LogicalPlan] with AliasHelper with EvalHelper { + + private val executor = new RuleExecutor[LogicalPlan] { + override def batches: Seq[Batch] = earlyBatches.asInstanceOf[Seq[Batch]] + } override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( _.containsAnyPattern(UNRESOLVED_IDENTIFIER)) { case p: PlanWithUnresolvedIdentifier if p.identifierExpr.resolved => - p.planBuilder.apply(evalIdentifierExpr(p.identifierExpr)) + executor.execute(p.planBuilder.apply(evalIdentifierExpr(p.identifierExpr))) case other => other.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_IDENTIFIER)) { case e: ExpressionWithUnresolvedIdentifier if e.identifierExpr.resolved => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 9d29ca1f9c6e..c16b50a2b17a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -143,7 +143,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { override val maxIterationsSetting: String = null) extends Strategy /** A batch of rules. */ - protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*) + protected[catalyst] case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*) /** Defines a sequence of rule batches, to be overridden by the implementation. */ protected def batches: Seq[Batch] diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause.sql.out index f91d0a26cf8a..823ce43247a7 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause.sql.out @@ -881,6 +881,65 @@ org.apache.spark.sql.catalyst.parser.ParseException } +-- !query +create temporary view identifier('v1') as (select my_col from (values (1), (2), (1) as (my_col)) group by 1) +-- !query analysis +CreateViewCommand `v1`, (select my_col from (values (1), (2), (1) as (my_col)) group by 1), false, false, LocalTempView, UNSUPPORTED, true + +- Aggregate [my_col#x], [my_col#x] + +- SubqueryAlias __auto_generated_subquery_name + +- SubqueryAlias as + +- LocalRelation [my_col#x] + + +-- !query +cache table identifier('t1') as (select my_col from (values (1), (2), (1) as (my_col)) group by 1) +-- !query analysis +CacheTableAsSelect t1, (select my_col from (values (1), (2), (1) as (my_col)) group by 1), false, true + +- Aggregate [my_col#x], [my_col#x] + +- SubqueryAlias __auto_generated_subquery_name + +- SubqueryAlias as + +- LocalRelation [my_col#x] + + +-- !query +create table identifier('t2') as (select my_col from (values (1), (2), (1) as (my_col)) group by 1) +-- !query analysis +CreateDataSourceTableAsSelectCommand `spark_catalog`.`default`.`t2`, ErrorIfExists, [my_col] + +- Aggregate [my_col#x], [my_col#x] + +- SubqueryAlias __auto_generated_subquery_name + +- SubqueryAlias as + +- LocalRelation [my_col#x] + + +-- !query +insert into identifier('t2') select my_col from (values (3) as (my_col)) group by 1 +-- !query analysis +InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/t2, false, Parquet, [path=file:[not included in comparison]/{warehouse_dir}/t2], Append, `spark_catalog`.`default`.`t2`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/t2), [my_col] ++- Aggregate [my_col#x], [my_col#x] + +- SubqueryAlias __auto_generated_subquery_name + +- SubqueryAlias as + +- LocalRelation [my_col#x] + + +-- !query +drop view v1 +-- !query analysis +DropTempViewCommand v1 + + +-- !query +drop table t1 +-- !query analysis +DropTempViewCommand t1 + + +-- !query +drop table t2 +-- !query analysis +DropTable false, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t2 + + -- !query SELECT row_number() OVER IDENTIFIER('x.win') FROM VALUES(1) AS T(c1) WINDOW win AS (ORDER BY c1) -- !query analysis diff --git a/sql/core/src/test/resources/sql-tests/inputs/identifier-clause.sql b/sql/core/src/test/resources/sql-tests/inputs/identifier-clause.sql index 07ae15707293..9e6314202b5f 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/identifier-clause.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/identifier-clause.sql @@ -122,6 +122,15 @@ CREATE TEMPORARY FUNCTION IDENTIFIER('default.my' || 'DoubleAvg') AS 'test.org.a DROP TEMPORARY FUNCTION IDENTIFIER('default.my' || 'DoubleAvg'); CREATE TEMPORARY VIEW IDENTIFIER('default.v')(c1) AS VALUES(1); +-- SPARK-48273: Aggregation operation in statements using identifier clause for table name +create temporary view identifier('v1') as (select my_col from (values (1), (2), (1) as (my_col)) group by 1); +cache table identifier('t1') as (select my_col from (values (1), (2), (1) as (my_col)) group by 1); +create table identifier('t2') as (select my_col from (values (1), (2), (1) as (my_col)) group by 1); +insert into identifier('t2') select my_col from (values (3) as (my_col)) group by 1; +drop view v1; +drop table t1; +drop table t2; + -- Not supported SELECT row_number() OVER IDENTIFIER('x.win') FROM VALUES(1) AS T(c1) WINDOW win AS (ORDER BY c1); SELECT T1.c1 FROM VALUES(1) AS T1(c1) JOIN VALUES(1) AS T2(c1) USING (IDENTIFIER('c1')); diff --git a/sql/core/src/test/resources/sql-tests/results/identifier-clause.sql.out b/sql/core/src/test/resources/sql-tests/results/identifier-clause.sql.out index ed87f69fc5e6..4d62c371a171 100644 --- a/sql/core/src/test/resources/sql-tests/results/identifier-clause.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/identifier-clause.sql.out @@ -1010,6 +1010,62 @@ org.apache.spark.sql.catalyst.parser.ParseException } +-- !query +create temporary view identifier('v1') as (select my_col from (values (1), (2), (1) as (my_col)) group by 1) +-- !query schema +struct<> +-- !query output + + + +-- !query +cache table identifier('t1') as (select my_col from (values (1), (2), (1) as (my_col)) group by 1) +-- !query schema +struct<> +-- !query output + + + +-- !query +create table identifier('t2') as (select my_col from (values (1), (2), (1) as (my_col)) group by 1) +-- !query schema +struct<> +-- !query output + + + +-- !query +insert into identifier('t2') select my_col from (values (3) as (my_col)) group by 1 +-- !query schema +struct<> +-- !query output + + + +-- !query +drop view v1 +-- !query schema +struct<> +-- !query output + + + +-- !query +drop table t1 +-- !query schema +struct<> +-- !query output + + + +-- !query +drop table t2 +-- !query schema +struct<> +-- !query output + + + -- !query SELECT row_number() OVER IDENTIFIER('x.win') FROM VALUES(1) AS T(c1) WINDOW win AS (ORDER BY c1) -- !query schema --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org