This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 89397b5eecf [SPARK-40521][SQL] Return only exists partitions in 
`PartitionsAlreadyExistException` from Hive's create partition
89397b5eecf is described below

commit 89397b5eecf9bdb54e740c50dada5bd183e8d9fc
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Fri Oct 7 15:40:36 2022 +0300

    [SPARK-40521][SQL] Return only exists partitions in 
`PartitionsAlreadyExistException` from Hive's create partition
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to modify Hive catalog implementation of create 
partition in the case when Hive reports that some of partitions exist already. 
In that case, Spark will check existence of all input partitions one-by-one and 
return **only existing** partitions in `PartitionAlreadyExistsException`.
    
    ### Why are the changes needed?
    1. To do not confuse Spark SQL users about existing partitions.
    2. To be consistent with other catalogs V2 and V1 In-Memory.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, it changes user-facing error message.
    
    Before:
    ```
    spark-sql> CREATE TABLE t (id bigint, data string) USING HIVE PARTITIONED 
BY (id);
    spark-sql> ALTER TABLE t ADD PARTITION (id=2) LOCATION 'loc1';
    spark-sql> ALTER TABLE t ADD PARTITION (id=1) LOCATION 'loc' PARTITION 
(id=2) LOCATION 'loc1';
    The following partitions already exists in table 't' database 'default':
    Map(id -> 1)
    ===
    Map(id -> 2)
    ```
    The error shows an existent and a non-existent partitions.
    
    After:
    ```
    ...
    spark-sql> ALTER TABLE t ADD PARTITION (id=1) LOCATION 'loc' PARTITION 
(id=2) LOCATION 'loc1';
    The following partitions already exists in table 't' database 'default':
    Map(id -> 2)
    ```
    The error contains only existent partitions that cause issues.
    
    ### How was this patch tested?
    By running the modified test suites:
    ```
    $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly 
*.AlterTableAddPartitionSuite"
    ```
    
    Closes #38134 from MaxGekk/fix-hive-partition-exists.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../command/AlterTableAddPartitionSuiteBase.scala   | 18 ------------------
 .../command/v1/AlterTableAddPartitionSuite.scala    | 21 +++++++++++++++++++++
 .../command/v2/AlterTableAddPartitionSuite.scala    | 19 +++++++++++++++++++
 .../spark/sql/hive/client/HiveClientImpl.scala      |  6 +++++-
 4 files changed, 45 insertions(+), 19 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala
index dee14953c09..8472a581fa7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.command
 import java.time.{Duration, Period}
 
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
-import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
 import org.apache.spark.sql.internal.SQLConf
 
 /**
@@ -167,23 +166,6 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest 
with DDLCommandTestUtils
     }
   }
 
-  test("partition already exists") {
-    withNamespaceAndTable("ns", "tbl") { t =>
-      sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED 
BY (id)")
-      sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'")
-
-      val errMsg = intercept[PartitionsAlreadyExistException] {
-        sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" +
-          " PARTITION (id=2) LOCATION 'loc1'")
-      }.getMessage
-      assert(errMsg.contains("The following partitions already exists"))
-
-      sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" +
-        " PARTITION (id=2) LOCATION 'loc1'")
-      checkPartitions(t, Map("id" -> "1"), Map("id" -> "2"))
-    }
-  }
-
   test("SPARK-33474: Support typed literals as partition spec values") {
     withNamespaceAndTable("ns", "tbl") { t =>
       sql(s"CREATE TABLE $t(name STRING, part DATE) USING PARQUET PARTITIONED 
BY (part)")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala
index b2e626be1b1..6b2308766f6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.command.v1
 
 import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
 import org.apache.spark.sql.execution.command
 import org.apache.spark.sql.internal.SQLConf
 
@@ -135,6 +136,26 @@ trait AlterTableAddPartitionSuiteBase extends 
command.AlterTableAddPartitionSuit
       }
     }
   }
+
+  // TODO: Move this test to the common trait as soon as it is migrated on 
checkError()
+  test("partition already exists") {
+    withNamespaceAndTable("ns", "tbl") { t =>
+      sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED 
BY (id)")
+      sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'")
+
+      val errMsg = intercept[PartitionsAlreadyExistException] {
+        sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" +
+          " PARTITION (id=2) LOCATION 'loc1'")
+      }.getMessage
+      assert(errMsg ===
+      """The following partitions already exists in table 'tbl' database 'ns':
+        |Map(id -> 2)""".stripMargin)
+
+      sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" +
+        " PARTITION (id=2) LOCATION 'loc1'")
+      checkPartitions(t, Map("id" -> "1"), Map("id" -> "2"))
+    }
+  }
 }
 
 /**
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala
index fabe399c340..2784f1e4bdd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.command.v2
 
 import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
 import org.apache.spark.sql.execution.command
 
 /**
@@ -99,4 +100,22 @@ class AlterTableAddPartitionSuite
       }
     }
   }
+
+  // TODO: Move this test to the common trait as soon as it is migrated on 
checkError()
+  test("partition already exists") {
+    withNamespaceAndTable("ns", "tbl") { t =>
+      sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED 
BY (id)")
+      sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'")
+
+      val errMsg = intercept[PartitionsAlreadyExistException] {
+        sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" +
+          " PARTITION (id=2) LOCATION 'loc1'")
+      }.getMessage
+      assert(errMsg === s"The following partitions already exists in table 
$t:2 -> id")
+
+      sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" +
+        " PARTITION (id=2) LOCATION 'loc1'")
+      checkPartitions(t, Map("id" -> "1"), Map("id" -> "2"))
+    }
+  }
 }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 61951cde8d0..bef320174ec 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -635,7 +635,11 @@ private[hive] class HiveClientImpl(
       ignoreIfExists: Boolean): Unit = withHiveState {
     def replaceExistException(e: Throwable): Unit = e match {
       case _: HiveException if e.getCause.isInstanceOf[AlreadyExistsException] 
=>
-        throw new PartitionsAlreadyExistException(db, table, parts.map(_.spec))
+        val t = shim.getTable(client, db, table)
+        val exists = parts.filter { part =>
+          shim.getPartition(client, t, part.spec.asJava, forceCreate = false) 
!= null
+        }
+        throw new PartitionsAlreadyExistException(db, table, 
exists.map(_.spec))
       case _ => throw e
     }
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to