This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 87ae397  [SPARK-36372][SQL] v2 ALTER TABLE ADD COLUMNS should check 
duplicates for the user specified columns
87ae397 is described below

commit 87ae3978971c46a17f8a550f9d3e6934a74cc3a4
Author: Terry Kim <yumin...@gmail.com>
AuthorDate: Mon Aug 2 17:54:50 2021 +0800

    [SPARK-36372][SQL] v2 ALTER TABLE ADD COLUMNS should check duplicates for 
the user specified columns
    
    ### What changes were proposed in this pull request?
    
    Currently, v2 ALTER TABLE ADD COLUMNS does not check duplicates for the 
user specified columns. For example,
    ```
    spark.sql(s"CREATE TABLE $t (id int) USING $v2Format")
    spark.sql("ALTER TABLE $t ADD COLUMNS (data string, data string)")
    ```
    doesn't fail the analysis, and it's up to the catalog implementation to 
handle it. For v1 command, the duplication is checked before invoking the 
catalog.
    
    ### Why are the changes needed?
    
    To check the duplicate columns during analysis and be consistent with v1 
command.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, now the above will command will print out the fllowing:
    ```
    org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the 
user specified columns: `data`
    ```
    
    ### How was this patch tested?
    
    Added new unit tests
    
    Closes #33600 from imback82/alter_add_duplicate_columns.
    
    Authored-by: Terry Kim <yumin...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 3b713e7f6189dfe1c5bbb1a527bf1266bde69f69)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  5 ++++
 .../spark/sql/connector/AlterTableTests.scala      | 23 ++++++++++++++++
 .../connector/V2CommandsCaseSensitivitySuite.scala | 32 ++++++++++++++++++++--
 3 files changed, 57 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 2d8ac64..77f721c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.connector.catalog.{LookupCatalog, 
SupportsPartitionM
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SchemaUtils
 
 /**
  * Throws user facing errors when passed invalid queries that fail to analyze.
@@ -951,6 +952,10 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
         colsToAdd.foreach { colToAdd =>
           checkColumnNotExists("add", colToAdd.name, table.schema)
         }
+        SchemaUtils.checkColumnNameDuplication(
+          colsToAdd.map(_.name.quoted),
+          "in the user specified columns",
+          alter.conf.resolver)
 
       case AlterTableRenameColumn(table: ResolvedTable, col: 
ResolvedFieldName, newName) =>
         checkColumnNotExists("rename", col.path :+ newName, table.schema)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
index 004a64a..1bd45f5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
@@ -384,6 +384,29 @@ trait AlterTableTests extends SharedSparkSession {
     }
   }
 
+  test("SPARK-36372: Adding duplicate columns should not be allowed") {
+    val t = s"${catalogAndNamespace}table_name"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id int) USING $v2Format")
+      val e = intercept[AnalysisException] {
+        sql(s"ALTER TABLE $t ADD COLUMNS (data string, data1 string, data 
string)")
+      }
+      assert(e.message.contains("Found duplicate column(s) in the user 
specified columns: `data`"))
+    }
+  }
+
+  test("SPARK-36372: Adding duplicate nested columns should not be allowed") {
+    val t = s"${catalogAndNamespace}table_name"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id int, point struct<x: double, y: double>) USING 
$v2Format")
+      val e = intercept[AnalysisException] {
+        sql(s"ALTER TABLE $t ADD COLUMNS (point.z double, point.z double, 
point.xx double)")
+      }
+      assert(e.message.contains(
+        "Found duplicate column(s) in the user specified columns: `point.z`"))
+    }
+  }
+
   test("AlterTable: update column type int -> long") {
     val t = s"${catalogAndNamespace}table_name"
     withTable(t) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
index 6651576..763cd6a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
@@ -179,7 +179,7 @@ class V2CommandsCaseSensitivitySuite extends 
SharedSparkSession with AnalysisTes
           Some(UnresolvedFieldPosition(ColumnPosition.after("id")))),
         QualifiedColType(
           None,
-          "x",
+          "y",
           LongType,
           true,
           None,
@@ -227,6 +227,28 @@ class V2CommandsCaseSensitivitySuite extends 
SharedSparkSession with AnalysisTes
     )
   }
 
+  test("SPARK-36372: Adding duplicate columns should not be allowed") {
+    alterTableTest(
+      AlterTableAddColumns(
+        table,
+        Seq(QualifiedColType(
+          Some(UnresolvedFieldName(Seq("point"))),
+          "z",
+          LongType,
+          true,
+          None,
+          None),
+        QualifiedColType(
+          Some(UnresolvedFieldName(Seq("point"))),
+          "Z",
+          LongType,
+          true,
+          None,
+          None))),
+      Seq("Found duplicate column(s) in the user specified columns: 
`point.z`"),
+      expectErrorOnCaseSensitive = false)
+  }
+
   test("AlterTable: drop column resolution") {
     Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", 
"x")).foreach { ref =>
       alterTableTest(
@@ -272,10 +294,14 @@ class V2CommandsCaseSensitivitySuite extends 
SharedSparkSession with AnalysisTes
     }
   }
 
-  private def alterTableTest(alter: AlterTableColumnCommand, error: 
Seq[String]): Unit = {
+  private def alterTableTest(
+      alter: AlterTableColumnCommand,
+      error: Seq[String],
+      expectErrorOnCaseSensitive: Boolean = true): Unit = {
     Seq(true, false).foreach { caseSensitive =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
-        if (caseSensitive) {
+        val expectError = if (expectErrorOnCaseSensitive) caseSensitive else 
!caseSensitive
+        if (expectError) {
           assertAnalysisError(alter, error, caseSensitive)
         } else {
           assertAnalysisSuccess(alter, caseSensitive)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to