This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 288bdd28635 [SPARK-40714][SQL] Remove `PartitionAlreadyExistsException` 288bdd28635 is described below commit 288bdd28635d71623dfc6fe82dc1e3aab5fa6c63 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Mon Oct 10 19:54:17 2022 +0300 [SPARK-40714][SQL] Remove `PartitionAlreadyExistsException` ### What changes were proposed in this pull request? In the PR, I propose to remove `PartitionAlreadyExistsException` and use `PartitionsAlreadyExistException` instead of it. ### Why are the changes needed? 1. To simplify user apps. After the changes, users don't need to catch both exceptions `PartitionsAlreadyExistException` as well as `PartitionAlreadyExistsException `. 2. To improve code maintenance since don't need to support almost the same code. 3. To avoid errors like the PR https://github.com/apache/spark/pull/38152 fixed `PartitionsAlreadyExistException` but not `PartitionAlreadyExistsException`. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *SupportsPartitionManagementSuite" $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *.AlterTableAddPartitionSuite" ``` Closes #38161 from MaxGekk/remove-PartitionAlreadyExistsException. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- docs/sql-migration-guide.md | 1 + .../catalog/SupportsAtomicPartitionManagement.java | 5 ++--- .../catalog/SupportsPartitionManagement.java | 10 +++++----- .../catalyst/analysis/AlreadyExistException.scala | 22 ++++++++-------------- .../sql/catalyst/catalog/InMemoryCatalog.scala | 2 +- .../catalog/InMemoryAtomicPartitionTable.scala | 4 ++-- .../connector/catalog/InMemoryPartitionTable.scala | 6 +++--- .../catalog/SupportsPartitionManagementSuite.scala | 6 +++--- .../AlterTableRenamePartitionSuiteBase.scala | 8 ++++---- .../command/v1/AlterTableAddPartitionSuite.scala | 2 +- .../command/v2/AlterTableAddPartitionSuite.scala | 2 +- .../spark/sql/hive/client/HiveClientImpl.scala | 4 ++-- .../spark/sql/hive/client/HiveClientSuite.scala | 2 +- 13 files changed, 34 insertions(+), 40 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index bc7f17fd5cb..18cc579e4f9 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -33,6 +33,7 @@ license: | - Valid Base64 string should include symbols from in base64 alphabet (A-Za-z0-9+/), optional padding (`=`), and optional whitespaces. Whitespaces are skipped in conversion except when they are preceded by padding symbol(s). If padding is present it should conclude the string and follow rules described in RFC 4648 ยง 4. - Valid hexadecimal strings should include only allowed symbols (0-9A-Fa-f). - Valid values for `fmt` are case-insensitive `hex`, `base64`, `utf-8`, `utf8`. + - Since Spark 3.4, Spark throws only `PartitionsAlreadyExistException` when it creates partitions but some of them exist already. In Spark 3.3 or earlier, Spark can throw either `PartitionsAlreadyExistException` or `PartitionAlreadyExistsException`. ## Upgrading from Spark SQL 3.2 to 3.3 diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java index e2c693f2d0a..09b26d8f793 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java @@ -22,7 +22,6 @@ import java.util.Map; import org.apache.spark.annotation.Experimental; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException; -import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException; /** @@ -50,11 +49,11 @@ public interface SupportsAtomicPartitionManagement extends SupportsPartitionMana default void createPartition( InternalRow ident, Map<String, String> properties) - throws PartitionAlreadyExistsException, UnsupportedOperationException { + throws PartitionsAlreadyExistException, UnsupportedOperationException { try { createPartitions(new InternalRow[]{ident}, new Map[]{properties}); } catch (PartitionsAlreadyExistException e) { - throw new PartitionAlreadyExistsException(e.getMessage()); + throw new PartitionsAlreadyExistException(e.getMessage()); } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java index ec2b61a7664..4830e193222 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java @@ -22,7 +22,7 @@ import java.util.Map; import org.apache.spark.annotation.Experimental; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException; -import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException; +import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException; import org.apache.spark.sql.types.StructType; /** @@ -59,13 +59,13 @@ public interface SupportsPartitionManagement extends Table { * * @param ident a new partition identifier * @param properties the metadata of a partition - * @throws PartitionAlreadyExistsException If a partition already exists for the identifier + * @throws PartitionsAlreadyExistException If a partition already exists for the identifier * @throws UnsupportedOperationException If partition property is not supported */ void createPartition( InternalRow ident, Map<String, String> properties) - throws PartitionAlreadyExistsException, UnsupportedOperationException; + throws PartitionsAlreadyExistException, UnsupportedOperationException; /** * Drop a partition from table. @@ -147,14 +147,14 @@ public interface SupportsPartitionManagement extends Table { * @param to new partition identifier * @return true if renaming completes successfully otherwise false * @throws UnsupportedOperationException If partition renaming is not supported - * @throws PartitionAlreadyExistsException If the `to` partition exists already + * @throws PartitionsAlreadyExistException If the `to` partition exists already * @throws NoSuchPartitionException If the `from` partition does not exist * * @since 3.2.0 */ default boolean renamePartition(InternalRow from, InternalRow to) throws UnsupportedOperationException, - PartitionAlreadyExistsException, + PartitionsAlreadyExistException, NoSuchPartitionException { throw new UnsupportedOperationException("Partition renaming is not supported"); } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala index f65c29a06cc..c1dd80e3f77 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala @@ -59,29 +59,23 @@ class TableAlreadyExistsException(message: String, cause: Option[Throwable] = No class TempTableAlreadyExistsException(table: String) extends TableAlreadyExistsException(s"Temporary view '$table' already exists") -class PartitionAlreadyExistsException(message: String) extends AnalysisException(message) { - def this(db: String, table: String, spec: TablePartitionSpec) = { - this(s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n")) - } - - def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = { - this(s"Partition already exists in table $tableName:" + - partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name)) - .map( kv => s"${kv._1} -> ${kv._2}").mkString(",")) - } -} - class PartitionsAlreadyExistException(message: String) extends AnalysisException(message) { def this(db: String, table: String, specs: Seq[TablePartitionSpec]) = { - this(s"The following partitions already exists in table '$table' database '$db':\n" + this(s"The following partitions already exist in table '$table' database '$db':\n" + specs.mkString("\n===\n")) } + def this(db: String, table: String, spec: TablePartitionSpec) = + this(db, table, Seq(spec)) + def this(tableName: String, partitionIdents: Seq[InternalRow], partitionSchema: StructType) = { - this(s"The following partitions already exists in table $tableName:" + + this(s"The following partitions already exist in table $tableName:" + partitionIdents.map(id => partitionSchema.map(_.name).zip(id.toSeq(partitionSchema)) .map( kv => s"${kv._1} -> ${kv._2}").mkString(",")).mkString("\n===\n")) } + + def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = + this(tableName, Seq(partitionIdent), partitionSchema) } class FunctionAlreadyExistsException(db: String, func: String) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 218a342e669..90e824284bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -90,7 +90,7 @@ class InMemoryCatalog( specs: Seq[TablePartitionSpec]): Unit = { specs.foreach { s => if (partitionExists(db, table, s)) { - throw new PartitionAlreadyExistsException(db = db, table = table, spec = s) + throw new PartitionsAlreadyExistException(db = db, table = table, spec = s) } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryAtomicPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryAtomicPartitionTable.scala index a48eb04a988..dd3d77f26cd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryAtomicPartitionTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryAtomicPartitionTable.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.connector.catalog import java.util import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException, PartitionsAlreadyExistException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -39,7 +39,7 @@ class InMemoryAtomicPartitionTable ( ident: InternalRow, properties: util.Map[String, String]): Unit = { if (memoryTablePartitions.containsKey(ident)) { - throw new PartitionAlreadyExistsException(name, ident, partitionSchema) + throw new PartitionsAlreadyExistException(name, ident, partitionSchema) } else { createPartitionKey(ident.toSeq(schema)) memoryTablePartitions.put(ident, properties) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTable.scala index 660140e282e..7280d6a5b07 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTable.scala @@ -23,7 +23,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -51,7 +51,7 @@ class InMemoryPartitionTable( ident: InternalRow, properties: util.Map[String, String]): Unit = { if (memoryTablePartitions.containsKey(ident)) { - throw new PartitionAlreadyExistsException(name, ident, partitionSchema) + throw new PartitionsAlreadyExistException(name, ident, partitionSchema) } else { createPartitionKey(ident.toSeq(schema)) memoryTablePartitions.put(ident, properties) @@ -111,7 +111,7 @@ class InMemoryPartitionTable( override def renamePartition(from: InternalRow, to: InternalRow): Boolean = { if (memoryTablePartitions.containsKey(to)) { - throw new PartitionAlreadyExistsException(name, to, partitionSchema) + throw new PartitionsAlreadyExistException(name, to, partitionSchema) } else { val partValue = memoryTablePartitions.remove(from) if (partValue == null) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala index e5aeb90b841..7f7c5299445 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException} import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference} import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -218,10 +218,10 @@ class SupportsPartitionManagementSuite extends SparkFunSuite { test("renamePartition") { val partTable = createMultiPartTable() - val errMsg1 = intercept[PartitionAlreadyExistsException] { + val errMsg1 = intercept[PartitionsAlreadyExistException] { partTable.renamePartition(InternalRow(0, "abc"), InternalRow(1, "abc")) }.getMessage - assert(errMsg1.contains("Partition already exists")) + assert(errMsg1.contains("partitions already exist")) val newPart = InternalRow(2, "xyz") val errMsg2 = intercept[NoSuchPartitionException] { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableRenamePartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableRenamePartitionSuiteBase.scala index 080cd89c4a2..6e67946a557 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableRenamePartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableRenamePartitionSuiteBase.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{AnalysisException, QueryTest, Row} -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException} import org.apache.spark.sql.internal.SQLConf /** @@ -75,15 +75,15 @@ trait AlterTableRenamePartitionSuiteBase extends QueryTest with DDLCommandTestUt } } - test("target partition exists") { + test("target partitions exist") { withNamespaceAndTable("ns", "tbl") { t => createSinglePartTable(t) sql(s"INSERT INTO $t PARTITION (id = 2) SELECT 'def'") checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) - val errMsg = intercept[PartitionAlreadyExistsException] { + val errMsg = intercept[PartitionsAlreadyExistException] { sql(s"ALTER TABLE $t PARTITION (id = 1) RENAME TO PARTITION (id = 2)") }.getMessage - assert(errMsg.contains("Partition already exists")) + assert(errMsg.contains("partitions already exist")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala index 6b2308766f6..54287cc6a47 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala @@ -148,7 +148,7 @@ trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuit " PARTITION (id=2) LOCATION 'loc1'") }.getMessage assert(errMsg === - """The following partitions already exists in table 'tbl' database 'ns': + """The following partitions already exist in table 'tbl' database 'ns': |Map(id -> 2)""".stripMargin) sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala index a238dfcf2dd..dc6e5a2909d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala @@ -111,7 +111,7 @@ class AlterTableAddPartitionSuite sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" + " PARTITION (id=2) LOCATION 'loc1'") }.getMessage - assert(errMsg === s"The following partitions already exists in table $t:id -> 2") + assert(errMsg === s"The following partitions already exist in table $t:id -> 2") sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + " PARTITION (id=2) LOCATION 'loc1'") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index bef320174ec..db600bcd3d4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -50,7 +50,7 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionAlreadyExistsException, PartitionsAlreadyExistException} +import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.CatalogUtils.stringToURI @@ -707,7 +707,7 @@ private[hive] class HiveClientImpl( hiveTable.setOwner(userName) specs.zip(newSpecs).foreach { case (oldSpec, newSpec) => if (shim.getPartition(client, hiveTable, newSpec.asJava, false) != null) { - throw new PartitionAlreadyExistsException(db, table, newSpec) + throw new PartitionsAlreadyExistException(db, table, newSpec) } val hivePart = getPartitionOption(rawHiveTable, oldSpec) .map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index 184e03d088c..e6abc7b96c1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -528,7 +528,7 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) val errMsg = intercept[PartitionsAlreadyExistException] { client.createPartitions("default", "src_part", partitions, ignoreIfExists = false) }.getMessage - assert(errMsg.contains("partitions already exists")) + assert(errMsg.contains("partitions already exist")) } finally { client.dropPartitions( "default", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org