This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b4816c  [SPARK-34320][SQL] Migrate ALTER TABLE DROP COLUMNS commands 
to use UnresolvedTable to resolve the identifier
5b4816c is described below

commit 5b4816cfc8e290d6f56e57227cb397eebf6a030e
Author: Terry Kim <yumin...@gmail.com>
AuthorDate: Thu Jun 24 14:59:25 2021 +0000

    [SPARK-34320][SQL] Migrate ALTER TABLE DROP COLUMNS commands to use 
UnresolvedTable to resolve the identifier
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to migrate the following `ALTER TABLE ... DROP COLUMNS` 
command to use `UnresolvedTable` as a `child` to resolve the table identifier. 
This allows consistent resolution rules (temp view first, etc.) to be applied 
for both v1/v2 commands. More info about the consistent resolution rule 
proposal can be found in 
[JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal 
doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?u
 [...]
    
    ### Why are the changes needed?
    
    This is a part of effort to make the relation lookup behavior consistent: 
[SPARK-29900](https://issues.apache.org/jira/browse/SPARK-29900).
    
    ### Does this PR introduce _any_ user-facing change?
    
    After this PR, the above `ALTER TABLE ... DROP COLUMNS` commands will have 
a consistent resolution behavior.
    
    ### How was this patch tested?
    
    Updated existing tests.
    
    Closes #32854 from imback82/alter_alternative.
    
    Authored-by: Terry Kim <yumin...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 28 ++++++++++++++++++++++
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  9 +++++++
 .../sql/catalyst/analysis/ResolveCatalogs.scala    |  5 ----
 .../sql/catalyst/analysis/v2ResolutionPlans.scala  | 14 +++++++++++
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 10 ++++----
 .../sql/catalyst/plans/logical/statements.scala    |  7 ------
 .../sql/catalyst/plans/logical/v2Commands.scala    | 28 +++++++++++++++++++++-
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 14 +++++++----
 .../catalyst/analysis/ResolveSessionCatalog.scala  | 11 ++-------
 .../datasources/v2/DataSourceV2Strategy.scala      |  4 ++++
 .../v2/jdbc/JDBCTableCatalogSuite.scala            |  4 ++--
 11 files changed, 101 insertions(+), 33 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 7cb270c..005784c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -299,6 +299,7 @@ class Analyzer(override val catalogManager: CatalogManager)
     Batch("Post-Hoc Resolution", Once,
       Seq(ResolveCommandsWithIfExists) ++
       postHocResolutionRules: _*),
+    Batch("Normalize Alter Table Field Names", Once, ResolveFieldNames),
     Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),
     Batch("Remove Unresolved Hints", Once,
       new ResolveHints.RemoveAllHints),
@@ -3520,6 +3521,33 @@ class Analyzer(override val catalogManager: 
CatalogManager)
     }
   }
 
+  /**
+   * Rule to mostly resolve, normalize and rewrite column names based on case 
sensitivity
+   * for alter table commands.
+   */
+  object ResolveFieldNames extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+      case a: AlterTableCommand if a.table.resolved =>
+        a.transformExpressions {
+          case u: UnresolvedFieldName =>
+            val table = a.table.asInstanceOf[ResolvedTable]
+            resolveFieldNames(table.schema, 
u.name).map(ResolvedFieldName(_)).getOrElse(u)
+        }
+    }
+
+    /**
+     * Returns the resolved field name if the field can be resolved, returns 
None if the column is
+     * not found. An error will be thrown in CheckAnalysis for columns that 
can't be resolved.
+     */
+    private def resolveFieldNames(
+        schema: StructType,
+        fieldNames: Seq[String]): Option[Seq[String]] = {
+      val fieldOpt = schema.findNestedField(
+        fieldNames, includeCollections = true, conf.resolver)
+      fieldOpt.map { case (path, field) => path :+ field.name }
+    }
+  }
+
   /** Rule to mostly resolve, normalize and rewrite column names based on case 
sensitivity. */
   object ResolveAlterTableChanges extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 28b0f1f..7679a87 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -443,6 +443,15 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
           case write: V2WriteCommand if write.resolved =>
             write.query.schema.foreach(f => 
TypeUtils.failWithIntervalType(f.dataType))
 
+          case alter: AlterTableCommand if alter.table.resolved =>
+            alter.transformExpressions {
+              case u: UnresolvedFieldName =>
+                val table = alter.table.asInstanceOf[ResolvedTable]
+                alter.failAnalysis(
+                  s"Cannot ${alter.operation} missing field ${u.name.quoted} 
in ${table.name} " +
+                    s"schema: ${table.schema.treeString}")
+            }
+
           case alter: AlterTable if alter.table.resolved =>
             val table = alter.table
             def findField(operation: String, fieldName: Array[String]): 
StructField = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 3b87688..59a1c13 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -93,11 +93,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
       val changes = Seq(TableChange.renameColumn(col.toArray, newName))
       createAlterTable(nameParts, catalog, tbl, changes)
 
-    case AlterTableDropColumnsStatement(
-         nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
-      val changes = cols.map(col => TableChange.deleteColumn(col.toArray))
-      createAlterTable(nameParts, catalog, tbl, changes)
-
     case c @ CreateTableStatement(
          NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, 
_, _) =>
       assertNoNullTypeInSchema(c.tableSchema)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
index 50ee9fb..7b85563 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
@@ -89,6 +89,18 @@ case class UnresolvedPartitionSpec(
   override lazy val resolved = false
 }
 
+sealed trait FieldName extends LeafExpression with Unevaluable {
+  def name: Seq[String]
+  override def dataType: DataType = throw new IllegalStateException(
+    "UnresolvedFieldName.dataType should not be called.")
+  override def nullable: Boolean = throw new IllegalStateException(
+    "UnresolvedFieldName.nullable should not be called.")
+}
+
+case class UnresolvedFieldName(name: Seq[String]) extends FieldName {
+  override lazy val resolved = false
+}
+
 /**
  * Holds the name of a function that has yet to be looked up in a catalog. It 
will be resolved to
  * [[ResolvedFunc]] during analysis.
@@ -138,6 +150,8 @@ case class ResolvedPartitionSpec(
     ident: InternalRow,
     location: Option[String] = None) extends PartitionSpec
 
+case class ResolvedFieldName(name: Seq[String]) extends FieldName
+
 /**
  * A plan containing resolved (temp) views.
  */
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index bffa931..6a373ab 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -3661,7 +3661,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
SQLConfHelper with Logg
   }
 
   /**
-   * Parse a [[AlterTableDropColumnsStatement]] command.
+   * Parse a [[AlterTableDropColumns]] command.
    *
    * For example:
    * {{{
@@ -3672,9 +3672,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
SQLConfHelper with Logg
   override def visitDropTableColumns(
       ctx: DropTableColumnsContext): LogicalPlan = withOrigin(ctx) {
     val columnsToDrop = 
ctx.columns.multipartIdentifier.asScala.map(typedVisit[Seq[String]])
-    AlterTableDropColumnsStatement(
-      visitMultipartIdentifier(ctx.multipartIdentifier),
-      columnsToDrop.toSeq)
+    AlterTableDropColumns(
+      createUnresolvedTable(
+        ctx.multipartIdentifier,
+        "ALTER TABLE ... DROP COLUMNS"),
+      columnsToDrop.map(UnresolvedFieldName(_)).toSeq)
   }
 
   /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index 80940e7..b87f65f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -266,13 +266,6 @@ case class AlterTableRenameColumnStatement(
     newName: String) extends LeafParsedStatement
 
 /**
- * ALTER TABLE ... DROP COLUMNS command, as parsed from SQL.
- */
-case class AlterTableDropColumnsStatement(
-    tableName: Seq[String],
-    columnsToDrop: Seq[Seq[String]]) extends LeafParsedStatement
-
-/**
  * An INSERT INTO statement, as parsed from SQL.
  *
  * @param table                the logical plan representing the table.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 310a437..e131777 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
-import org.apache.spark.sql.catalyst.analysis.{NamedRelation, PartitionSpec, 
UnresolvedException}
+import org.apache.spark.sql.catalyst.analysis.{FieldName, NamedRelation, 
PartitionSpec, UnresolvedException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSet, Expression, Unevaluable}
 import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
@@ -1098,3 +1098,29 @@ case class UnsetTableProperties(
   override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan =
     copy(table = newChild)
 }
+
+trait AlterTableCommand extends UnaryCommand {
+  def table: LogicalPlan
+  def operation: String
+  def changes: Seq[TableChange]
+  override def child: LogicalPlan = table
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... DROP COLUMNS command.
+ */
+case class AlterTableDropColumns(
+    table: LogicalPlan,
+    columnsToDrop: Seq[FieldName]) extends AlterTableCommand {
+  override def operation: String = "delete"
+
+  override def changes: Seq[TableChange] = {
+    columnsToDrop.map { col =>
+      require(col.resolved, "FieldName should be resolved before it's 
converted to TableChange.")
+      TableChange.deleteColumn(col.name.toArray)
+    }
+  }
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan =
+    copy(table = newChild)
+}
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index c064891..b0e0d58 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser
 import java.util.Locale
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, 
LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, 
UnresolvedInlineTable, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, 
UnresolvedTable, UnresolvedTableOrView, UnresolvedView}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, 
LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFieldName, 
UnresolvedFunc, UnresolvedInlineTable, UnresolvedNamespace, UnresolvedRelation, 
UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView}
 import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, 
FileResource, FunctionResource, JarResource}
 import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -992,7 +992,9 @@ class DDLParserSuite extends AnalysisTest {
   test("alter table: drop column") {
     comparePlans(
       parsePlan("ALTER TABLE table_name DROP COLUMN a.b.c"),
-      AlterTableDropColumnsStatement(Seq("table_name"), Seq(Seq("a", "b", 
"c"))))
+      AlterTableDropColumns(
+        UnresolvedTable(Seq("table_name"), "ALTER TABLE ... DROP COLUMNS", 
None),
+        Seq(UnresolvedFieldName(Seq("a", "b", "c")))))
   }
 
   test("alter table: drop multiple columns") {
@@ -1000,9 +1002,11 @@ class DDLParserSuite extends AnalysisTest {
     Seq(sql, sql.replace("COLUMN", "COLUMNS")).foreach { drop =>
       comparePlans(
         parsePlan(drop),
-        AlterTableDropColumnsStatement(
-          Seq("table_name"),
-          Seq(Seq("x"), Seq("y"), Seq("a", "b", "c"))))
+        AlterTableDropColumns(
+          UnresolvedTable(Seq("table_name"), "ALTER TABLE ... DROP COLUMNS", 
None),
+          Seq(UnresolvedFieldName(Seq("x")),
+            UnresolvedFieldName(Seq("y")),
+            UnresolvedFieldName(Seq("a", "b", "c")))))
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 7f3d0c6..427c570 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -157,15 +157,8 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
         createAlterTable(nameParts, catalog, tbl, changes)
       }
 
-    case AlterTableDropColumnsStatement(
-         nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
-      loadTable(catalog, tbl.asIdentifier).collect {
-        case v1Table: V1Table =>
-          throw QueryCompilationErrors.dropColumnOnlySupportedWithV2TableError
-      }.getOrElse {
-        val changes = cols.map(col => TableChange.deleteColumn(col.toArray))
-        createAlterTable(nameParts, catalog, tbl, changes)
-      }
+    case AlterTableDropColumns(ResolvedV1TableIdentifier(_), _) =>
+      throw QueryCompilationErrors.dropColumnOnlySupportedWithV2TableError
 
     case SetTableProperties(ResolvedV1TableIdentifier(ident), props) =>
       AlterTableSetPropertiesCommand(ident.asTableIdentifier, props, isView = 
false)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 3e2214c..c5d3e59 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -437,6 +437,10 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       val changes = keys.map(key => TableChange.removeProperty(key))
       AlterTableExec(table.catalog, table.identifier, changes) :: Nil
 
+    case a: AlterTableCommand if a.table.resolved =>
+      val table = a.table.asInstanceOf[ResolvedTable]
+      AlterTableExec(table.catalog, table.identifier, a.changes) :: Nil
+
     case _ => Nil
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
index b94d868..dd170ef 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
@@ -240,7 +240,7 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
       val msg = intercept[AnalysisException] {
         sql(s"ALTER TABLE $tableName DROP COLUMN bad_column")
       }.getMessage
-      assert(msg.contains("Cannot delete missing field bad_column in 
test.alt_table schema"))
+      assert(msg.contains("Cannot delete missing field bad_column in 
h2.test.alt_table schema"))
     }
     // Drop a column to not existing table and namespace
     Seq("h2.test.not_existing_table", 
"h2.bad_test.not_existing_table").foreach { table =>
@@ -362,7 +362,7 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
         val msg = intercept[AnalysisException] {
           sql(s"ALTER TABLE $tableName DROP COLUMN C3")
         }.getMessage
-        assert(msg.contains("Cannot delete missing field C3 in test.alt_table 
schema"))
+        assert(msg.contains("Cannot delete missing field C3 in 
h2.test.alt_table schema"))
       }
 
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to