This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5b4816c [SPARK-34320][SQL] Migrate ALTER TABLE DROP COLUMNS commands to use UnresolvedTable to resolve the identifier 5b4816c is described below commit 5b4816cfc8e290d6f56e57227cb397eebf6a030e Author: Terry Kim <yumin...@gmail.com> AuthorDate: Thu Jun 24 14:59:25 2021 +0000 [SPARK-34320][SQL] Migrate ALTER TABLE DROP COLUMNS commands to use UnresolvedTable to resolve the identifier ### What changes were proposed in this pull request? This PR proposes to migrate the following `ALTER TABLE ... DROP COLUMNS` command to use `UnresolvedTable` as a `child` to resolve the table identifier. This allows consistent resolution rules (temp view first, etc.) to be applied for both v1/v2 commands. More info about the consistent resolution rule proposal can be found in [JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?u [...] ### Why are the changes needed? This is a part of effort to make the relation lookup behavior consistent: [SPARK-29900](https://issues.apache.org/jira/browse/SPARK-29900). ### Does this PR introduce _any_ user-facing change? After this PR, the above `ALTER TABLE ... DROP COLUMNS` commands will have a consistent resolution behavior. ### How was this patch tested? Updated existing tests. Closes #32854 from imback82/alter_alternative. Authored-by: Terry Kim <yumin...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 28 ++++++++++++++++++++++ .../sql/catalyst/analysis/CheckAnalysis.scala | 9 +++++++ .../sql/catalyst/analysis/ResolveCatalogs.scala | 5 ---- .../sql/catalyst/analysis/v2ResolutionPlans.scala | 14 +++++++++++ .../spark/sql/catalyst/parser/AstBuilder.scala | 10 ++++---- .../sql/catalyst/plans/logical/statements.scala | 7 ------ .../sql/catalyst/plans/logical/v2Commands.scala | 28 +++++++++++++++++++++- .../spark/sql/catalyst/parser/DDLParserSuite.scala | 14 +++++++---- .../catalyst/analysis/ResolveSessionCatalog.scala | 11 ++------- .../datasources/v2/DataSourceV2Strategy.scala | 4 ++++ .../v2/jdbc/JDBCTableCatalogSuite.scala | 4 ++-- 11 files changed, 101 insertions(+), 33 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7cb270c..005784c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -299,6 +299,7 @@ class Analyzer(override val catalogManager: CatalogManager) Batch("Post-Hoc Resolution", Once, Seq(ResolveCommandsWithIfExists) ++ postHocResolutionRules: _*), + Batch("Normalize Alter Table Field Names", Once, ResolveFieldNames), Batch("Normalize Alter Table", Once, ResolveAlterTableChanges), Batch("Remove Unresolved Hints", Once, new ResolveHints.RemoveAllHints), @@ -3520,6 +3521,33 @@ class Analyzer(override val catalogManager: CatalogManager) } } + /** + * Rule to mostly resolve, normalize and rewrite column names based on case sensitivity + * for alter table commands. + */ + object ResolveFieldNames extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case a: AlterTableCommand if a.table.resolved => + a.transformExpressions { + case u: UnresolvedFieldName => + val table = a.table.asInstanceOf[ResolvedTable] + resolveFieldNames(table.schema, u.name).map(ResolvedFieldName(_)).getOrElse(u) + } + } + + /** + * Returns the resolved field name if the field can be resolved, returns None if the column is + * not found. An error will be thrown in CheckAnalysis for columns that can't be resolved. + */ + private def resolveFieldNames( + schema: StructType, + fieldNames: Seq[String]): Option[Seq[String]] = { + val fieldOpt = schema.findNestedField( + fieldNames, includeCollections = true, conf.resolver) + fieldOpt.map { case (path, field) => path :+ field.name } + } + } + /** Rule to mostly resolve, normalize and rewrite column names based on case sensitivity. */ object ResolveAlterTableChanges extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 28b0f1f..7679a87 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -443,6 +443,15 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { case write: V2WriteCommand if write.resolved => write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType)) + case alter: AlterTableCommand if alter.table.resolved => + alter.transformExpressions { + case u: UnresolvedFieldName => + val table = alter.table.asInstanceOf[ResolvedTable] + alter.failAnalysis( + s"Cannot ${alter.operation} missing field ${u.name.quoted} in ${table.name} " + + s"schema: ${table.schema.treeString}") + } + case alter: AlterTable if alter.table.resolved => val table = alter.table def findField(operation: String, fieldName: Array[String]): StructField = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 3b87688..59a1c13 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -93,11 +93,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) val changes = Seq(TableChange.renameColumn(col.toArray, newName)) createAlterTable(nameParts, catalog, tbl, changes) - case AlterTableDropColumnsStatement( - nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) => - val changes = cols.map(col => TableChange.deleteColumn(col.toArray)) - createAlterTable(nameParts, catalog, tbl, changes) - case c @ CreateTableStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) => assertNoNullTypeInSchema(c.tableSchema) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index 50ee9fb..7b85563 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -89,6 +89,18 @@ case class UnresolvedPartitionSpec( override lazy val resolved = false } +sealed trait FieldName extends LeafExpression with Unevaluable { + def name: Seq[String] + override def dataType: DataType = throw new IllegalStateException( + "UnresolvedFieldName.dataType should not be called.") + override def nullable: Boolean = throw new IllegalStateException( + "UnresolvedFieldName.nullable should not be called.") +} + +case class UnresolvedFieldName(name: Seq[String]) extends FieldName { + override lazy val resolved = false +} + /** * Holds the name of a function that has yet to be looked up in a catalog. It will be resolved to * [[ResolvedFunc]] during analysis. @@ -138,6 +150,8 @@ case class ResolvedPartitionSpec( ident: InternalRow, location: Option[String] = None) extends PartitionSpec +case class ResolvedFieldName(name: Seq[String]) extends FieldName + /** * A plan containing resolved (temp) views. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index bffa931..6a373ab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3661,7 +3661,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Parse a [[AlterTableDropColumnsStatement]] command. + * Parse a [[AlterTableDropColumns]] command. * * For example: * {{{ @@ -3672,9 +3672,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg override def visitDropTableColumns( ctx: DropTableColumnsContext): LogicalPlan = withOrigin(ctx) { val columnsToDrop = ctx.columns.multipartIdentifier.asScala.map(typedVisit[Seq[String]]) - AlterTableDropColumnsStatement( - visitMultipartIdentifier(ctx.multipartIdentifier), - columnsToDrop.toSeq) + AlterTableDropColumns( + createUnresolvedTable( + ctx.multipartIdentifier, + "ALTER TABLE ... DROP COLUMNS"), + columnsToDrop.map(UnresolvedFieldName(_)).toSeq) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 80940e7..b87f65f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -266,13 +266,6 @@ case class AlterTableRenameColumnStatement( newName: String) extends LeafParsedStatement /** - * ALTER TABLE ... DROP COLUMNS command, as parsed from SQL. - */ -case class AlterTableDropColumnsStatement( - tableName: Seq[String], - columnsToDrop: Seq[Seq[String]]) extends LeafParsedStatement - -/** * An INSERT INTO statement, as parsed from SQL. * * @param table the logical plan representing the table. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 310a437..e131777 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.analysis.{NamedRelation, PartitionSpec, UnresolvedException} +import org.apache.spark.sql.catalyst.analysis.{FieldName, NamedRelation, PartitionSpec, UnresolvedException} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable} import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema @@ -1098,3 +1098,29 @@ case class UnsetTableProperties( override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(table = newChild) } + +trait AlterTableCommand extends UnaryCommand { + def table: LogicalPlan + def operation: String + def changes: Seq[TableChange] + override def child: LogicalPlan = table +} + +/** + * The logical plan of the ALTER TABLE ... DROP COLUMNS command. + */ +case class AlterTableDropColumns( + table: LogicalPlan, + columnsToDrop: Seq[FieldName]) extends AlterTableCommand { + override def operation: String = "delete" + + override def changes: Seq[TableChange] = { + columnsToDrop.map { col => + require(col.resolved, "FieldName should be resolved before it's converted to TableChange.") + TableChange.deleteColumn(col.name.toArray) + } + } + + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + copy(table = newChild) +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index c064891..b0e0d58 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser import java.util.Locale import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedInlineTable, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFieldName, UnresolvedFunc, UnresolvedInlineTable, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView} import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, JarResource} import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal} import org.apache.spark.sql.catalyst.plans.logical._ @@ -992,7 +992,9 @@ class DDLParserSuite extends AnalysisTest { test("alter table: drop column") { comparePlans( parsePlan("ALTER TABLE table_name DROP COLUMN a.b.c"), - AlterTableDropColumnsStatement(Seq("table_name"), Seq(Seq("a", "b", "c")))) + AlterTableDropColumns( + UnresolvedTable(Seq("table_name"), "ALTER TABLE ... DROP COLUMNS", None), + Seq(UnresolvedFieldName(Seq("a", "b", "c"))))) } test("alter table: drop multiple columns") { @@ -1000,9 +1002,11 @@ class DDLParserSuite extends AnalysisTest { Seq(sql, sql.replace("COLUMN", "COLUMNS")).foreach { drop => comparePlans( parsePlan(drop), - AlterTableDropColumnsStatement( - Seq("table_name"), - Seq(Seq("x"), Seq("y"), Seq("a", "b", "c")))) + AlterTableDropColumns( + UnresolvedTable(Seq("table_name"), "ALTER TABLE ... DROP COLUMNS", None), + Seq(UnresolvedFieldName(Seq("x")), + UnresolvedFieldName(Seq("y")), + UnresolvedFieldName(Seq("a", "b", "c"))))) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 7f3d0c6..427c570 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -157,15 +157,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) createAlterTable(nameParts, catalog, tbl, changes) } - case AlterTableDropColumnsStatement( - nameParts @ SessionCatalogAndTable(catalog, tbl), cols) => - loadTable(catalog, tbl.asIdentifier).collect { - case v1Table: V1Table => - throw QueryCompilationErrors.dropColumnOnlySupportedWithV2TableError - }.getOrElse { - val changes = cols.map(col => TableChange.deleteColumn(col.toArray)) - createAlterTable(nameParts, catalog, tbl, changes) - } + case AlterTableDropColumns(ResolvedV1TableIdentifier(_), _) => + throw QueryCompilationErrors.dropColumnOnlySupportedWithV2TableError case SetTableProperties(ResolvedV1TableIdentifier(ident), props) => AlterTableSetPropertiesCommand(ident.asTableIdentifier, props, isView = false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 3e2214c..c5d3e59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -437,6 +437,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat val changes = keys.map(key => TableChange.removeProperty(key)) AlterTableExec(table.catalog, table.identifier, changes) :: Nil + case a: AlterTableCommand if a.table.resolved => + val table = a.table.asInstanceOf[ResolvedTable] + AlterTableExec(table.catalog, table.identifier, a.changes) :: Nil + case _ => Nil } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index b94d868..dd170ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -240,7 +240,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { val msg = intercept[AnalysisException] { sql(s"ALTER TABLE $tableName DROP COLUMN bad_column") }.getMessage - assert(msg.contains("Cannot delete missing field bad_column in test.alt_table schema")) + assert(msg.contains("Cannot delete missing field bad_column in h2.test.alt_table schema")) } // Drop a column to not existing table and namespace Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table => @@ -362,7 +362,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { val msg = intercept[AnalysisException] { sql(s"ALTER TABLE $tableName DROP COLUMN C3") }.getMessage - assert(msg.contains("Cannot delete missing field C3 in test.alt_table schema")) + assert(msg.contains("Cannot delete missing field C3 in h2.test.alt_table schema")) } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org