This is an automated email from the ASF dual-hosted git repository. huaxingao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 293c085 [SPARK-36895][SQL][FOLLOWUP] CREATE INDEX command should rely on the analyzer framework to resolve columns 293c085 is described below commit 293c085d677220e71966d98c25cde8a06ae78468 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Tue Nov 2 14:39:42 2021 -0700 [SPARK-36895][SQL][FOLLOWUP] CREATE INDEX command should rely on the analyzer framework to resolve columns ### What changes were proposed in this pull request? This PR leverages the existing framework to resolve columns in the CREATE INDEX command. ### Why are the changes needed? To fail earlier instead of passing invalid column names to v2 sources. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new test Closes #34467 from cloud-fan/col. Lead-authored-by: Wenchen Fan <wenc...@databricks.com> Co-authored-by: Wenchen Fan <cloud0...@gmail.com> Signed-off-by: Huaxin Gao <huaxin_...@apple.com> --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 15 +++++++++++---- .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 4 ++-- .../spark/sql/catalyst/plans/logical/v2Commands.scala | 12 +++++++----- .../apache/spark/sql/catalyst/parser/DDLParserSuite.scala | 6 +++--- .../execution/datasources/v2/DataSourceV2Strategy.scala | 5 ++++- .../apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 11 ++++++++--- 6 files changed, 35 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f0a1c8c..068886e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -272,7 +272,7 @@ class Analyzer(override val catalogManager: CatalogManager) ResolveInsertInto :: ResolveRelations :: ResolvePartitionSpec :: - ResolveAlterTableCommands :: + ResolveFieldNameAndPosition :: AddMetadataColumns :: DeduplicateRelations :: ResolveReferences :: @@ -3529,11 +3529,18 @@ class Analyzer(override val catalogManager: CatalogManager) } /** - * Rule to mostly resolve, normalize and rewrite column names based on case sensitivity - * for alter table column commands. + * Rule to resolve, normalize and rewrite field names based on case sensitivity for commands. */ - object ResolveAlterTableCommands extends Rule[LogicalPlan] { + object ResolveFieldNameAndPosition extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case cmd: CreateIndex if cmd.table.resolved && + cmd.columns.exists(_._1.isInstanceOf[UnresolvedFieldName]) => + val table = cmd.table.asInstanceOf[ResolvedTable] + cmd.copy(columns = cmd.columns.map { + case (u: UnresolvedFieldName, prop) => resolveFieldNames(table, u.name, u) -> prop + case other => other + }) + case a: AlterTableCommand if a.table.resolved && hasUnresolvedFieldName(a) => val table = a.table.asInstanceOf[ResolvedTable] a.transformExpressions { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 722a055..a16674f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -4429,7 +4429,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } val columns = ctx.columns.multipartIdentifierProperty.asScala - .map(_.multipartIdentifier.getText).toSeq + .map(_.multipartIdentifier).map(typedVisit[Seq[String]]).toSeq val columnsProperties = ctx.columns.multipartIdentifierProperty.asScala .map(x => (Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) @@ -4439,7 +4439,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg indexName, indexType, ctx.EXISTS != null, - columns.map(FieldReference(_).asInstanceOf[FieldReference]).zip(columnsProperties), + columns.map(UnresolvedFieldName(_)).zip(columnsProperties), options) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index e349822..f5ebae8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.analysis.{NamedRelation, PartitionSpec, UnresolvedException} +import org.apache.spark.sql.catalyst.analysis.{FieldName, NamedRelation, PartitionSpec, UnresolvedException} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.FunctionResource import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable} @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema import org.apache.spark.sql.catalyst.trees.BinaryLike import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog._ -import org.apache.spark.sql.connector.expressions.{NamedReference, Transform} +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.write.Write import org.apache.spark.sql.types.{BooleanType, DataType, MetadataBuilder, StringType, StructType} @@ -1061,12 +1061,14 @@ case class UncacheTable( * The logical plan of the CREATE INDEX command. */ case class CreateIndex( - child: LogicalPlan, + table: LogicalPlan, indexName: String, indexType: String, ignoreIfExists: Boolean, - columns: Seq[(NamedReference, Map[String, String])], + columns: Seq[(FieldName, Map[String, String])], properties: Map[String, String]) extends UnaryCommand { + override def child: LogicalPlan = table + override lazy val resolved: Boolean = table.resolved && columns.forall(_._1.resolved) override protected def withNewChildInternal(newChild: LogicalPlan): CreateIndex = - copy(child = newChild) + copy(table = newChild) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 13588e9..bdb6a15 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -2276,18 +2276,18 @@ class DDLParserSuite extends AnalysisTest { test("CREATE INDEX") { parseCompare("CREATE index i1 ON a.b.c USING BTREE (col1)", CreateIndex(UnresolvedTable(Seq("a", "b", "c"), "CREATE INDEX", None), "i1", "BTREE", false, - Array(FieldReference("col1")).toSeq.zip(Seq(Map.empty[String, String])), Map.empty)) + Seq(UnresolvedFieldName(Seq("col1"))).zip(Seq(Map.empty[String, String])), Map.empty)) parseCompare("CREATE index IF NOT EXISTS i1 ON TABLE a.b.c USING BTREE" + " (col1 OPTIONS ('k1'='v1'), col2 OPTIONS ('k2'='v2')) ", CreateIndex(UnresolvedTable(Seq("a", "b", "c"), "CREATE INDEX", None), "i1", "BTREE", true, - Array(FieldReference("col1"), FieldReference("col2")).toSeq + Seq(UnresolvedFieldName(Seq("col1")), UnresolvedFieldName(Seq("col2"))) .zip(Seq(Map("k1" -> "v1"), Map("k2" -> "v2"))), Map.empty)) parseCompare("CREATE index i1 ON a.b.c" + " (col1 OPTIONS ('k1'='v1'), col2 OPTIONS ('k2'='v2')) OPTIONS ('k3'='v3', 'k4'='v4')", CreateIndex(UnresolvedTable(Seq("a", "b", "c"), "CREATE INDEX", None), "i1", "", false, - Array(FieldReference("col1"), FieldReference("col2")).toSeq + Seq(UnresolvedFieldName(Seq("col1")), UnresolvedFieldName(Seq("col2"))) .zip(Seq(Map("k1" -> "v1"), Map("k2" -> "v2"))), Map("k3" -> "v3", "k4" -> "v4"))) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index b688c32..f1513a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -436,7 +436,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat indexName, indexType, ifNotExists, columns, properties) => table match { case s: SupportsIndex => - CreateIndexExec(s, indexName, indexType, ifNotExists, columns, properties):: Nil + val namedRefs = columns.map { case (field, prop) => + FieldReference(field.name) -> prop + } + CreateIndexExec(s, indexName, indexType, ifNotExists, namedRefs, properties) :: Nil case _ => throw QueryCompilationErrors.tableIndexNotSupportedError( s"CreateIndex is not supported in this table ${table.name}.") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index ee3a156..d2d46d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2937,10 +2937,15 @@ class DataSourceV2SQLSuite val t = "testcat.tbl" withTable(t) { sql(s"CREATE TABLE $t (id bigint, data string COMMENT 'hello') USING foo") - val ex = intercept[AnalysisException] { - sql(s"CREATE index i1 ON $t(col1)") + val e1 = intercept[AnalysisException] { + sql(s"CREATE index i1 ON $t(non_exist)") + } + assert(e1.getMessage.contains(s"Missing field non_exist in table $t")) + + val e2 = intercept[AnalysisException] { + sql(s"CREATE index i1 ON $t(id)") } - assert(ex.getMessage.contains(s"CreateIndex is not supported in this table $t.")) + assert(e2.getMessage.contains(s"CreateIndex is not supported in this table $t.")) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org