This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new a630e8d [SPARK-33676][SQL] Require exact matching of partition spec to the schema in V2 `ALTER TABLE .. ADD/DROP PARTITION` a630e8d is described below commit a630e8d14bd36ede97ba1469a6da148464c90ee3 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Mon Dec 7 08:14:36 2020 +0000 [SPARK-33676][SQL] Require exact matching of partition spec to the schema in V2 `ALTER TABLE .. ADD/DROP PARTITION` ### What changes were proposed in this pull request? Check that partitions specs passed to v2 `ALTER TABLE .. ADD/DROP PARTITION` exactly match to the partition schema (all partition fields from the schema are specified in partition specs). ### Why are the changes needed? 1. To have the same behavior as V1 `ALTER TABLE .. ADD/DROP PARTITION` that output the error: ```sql spark-sql> create table tab1 (id int, a int, b int) using parquet partitioned by (a, b); spark-sql> ALTER TABLE tab1 ADD PARTITION (A='9'); Error in query: Partition spec is invalid. The spec (a) must match the partition spec (a, b) defined in table '`default`.`tab1`'; ``` 2. To prevent future errors caused by not fully specified partition specs. ### Does this PR introduce _any_ user-facing change? Yes. The V2 implementation of `ALTER TABLE .. ADD/DROP PARTITION` output the same error as V1 commands. ### How was this patch tested? By running the test suite with new UT: ``` $ build/sbt "test:testOnly *AlterTablePartitionV2SQLSuite" ``` Closes #30624 from MaxGekk/add-partition-full-spec. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 26c0493318c2a3e5b74ff3829de88605aff8e832) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/analysis/ResolvePartitionSpec.scala | 20 ++++++++++++++++---- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 15 ++++++--------- .../apache/spark/sql/util/PartitioningUtils.scala | 18 ++++++++++++++++++ .../connector/AlterTablePartitionV2SQLSuite.scala | 20 ++++++++++++++++++++ 4 files changed, 60 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala index 38991a9..feb05d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement import org.apache.spark.sql.types._ -import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec +import org.apache.spark.sql.util.PartitioningUtils.{normalizePartitionSpec, requireExactMatchedPartitionSpec} /** * Resolve [[UnresolvedPartitionSpec]] to [[ResolvedPartitionSpec]] in partition related commands. @@ -35,11 +35,21 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case r @ AlterTableAddPartition( ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs, _) => - r.copy(parts = resolvePartitionSpecs(table.name, partSpecs, table.partitionSchema())) + val partitionSchema = table.partitionSchema() + r.copy(parts = resolvePartitionSpecs( + table.name, + partSpecs, + partitionSchema, + requireExactMatchedPartitionSpec(table.name, _, partitionSchema.fieldNames))) case r @ AlterTableDropPartition( ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs, _, _, _) => - r.copy(parts = resolvePartitionSpecs(table.name, partSpecs, table.partitionSchema())) + val partitionSchema = table.partitionSchema() + r.copy(parts = resolvePartitionSpecs( + table.name, + partSpecs, + partitionSchema, + requireExactMatchedPartitionSpec(table.name, _, partitionSchema.fieldNames))) case r @ ShowPartitions(ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs) => r.copy(pattern = resolvePartitionSpecs( @@ -51,7 +61,8 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] { private def resolvePartitionSpecs( tableName: String, partSpecs: Seq[PartitionSpec], - partSchema: StructType): Seq[ResolvedPartitionSpec] = + partSchema: StructType, + checkSpec: TablePartitionSpec => Unit = _ => ()): Seq[ResolvedPartitionSpec] = partSpecs.map { case unresolvedPartSpec: UnresolvedPartitionSpec => val normalizedSpec = normalizePartitionSpec( @@ -59,6 +70,7 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] { partSchema.map(_.name), tableName, conf.resolver) + checkSpec(normalizedSpec) val partitionNames = normalizedSpec.keySet val requestedFields = partSchema.filter(field => partitionNames.contains(field.name)) ResolvedPartitionSpec( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 29481b8..9a21b85 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.sql.util.{CaseInsensitiveStringMap, PartitioningUtils} import org.apache.spark.util.Utils object SessionCatalog { @@ -1162,14 +1162,11 @@ class SessionCatalog( private def requireExactMatchedPartitionSpec( specs: Seq[TablePartitionSpec], table: CatalogTable): Unit = { - val defined = table.partitionColumnNames.sorted - specs.foreach { s => - if (s.keys.toSeq.sorted != defined) { - throw new AnalysisException( - s"Partition spec is invalid. The spec (${s.keys.mkString(", ")}) must match " + - s"the partition spec (${table.partitionColumnNames.mkString(", ")}) defined in " + - s"table '${table.identifier}'") - } + specs.foreach { spec => + PartitioningUtils.requireExactMatchedPartitionSpec( + table.identifier.toString, + spec, + table.partitionColumnNames) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala index 586aa6c..e473e1d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.util import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.Resolver +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec object PartitioningUtils { /** @@ -44,4 +45,21 @@ object PartitioningUtils { normalizedPartSpec.toMap } + + /** + * Verify if the input partition spec exactly matches the existing defined partition spec + * The columns must be the same but the orders could be different. + */ + def requireExactMatchedPartitionSpec( + tableName: String, + spec: TablePartitionSpec, + partitionColumnNames: Seq[String]): Unit = { + val defined = partitionColumnNames.sorted + if (spec.keys.toSeq.sorted != defined) { + throw new AnalysisException( + s"Partition spec is invalid. The spec (${spec.keys.mkString(", ")}) must match " + + s"the partition spec (${partitionColumnNames.mkString(", ")}) defined in " + + s"table '$tableName'") + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index 47b5e5e..45d47c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -261,4 +261,24 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { } } } + + test("SPARK-33676: not fully specified partition spec") { + val t = "testpart.ns1.ns2.tbl" + withTable(t) { + sql(s""" + |CREATE TABLE $t (id bigint, part0 int, part1 string) + |USING foo + |PARTITIONED BY (part0, part1)""".stripMargin) + Seq( + s"ALTER TABLE $t ADD PARTITION (part0 = 1)", + s"ALTER TABLE $t DROP PARTITION (part0 = 1)" + ).foreach { alterTable => + val errMsg = intercept[AnalysisException] { + sql(alterTable) + }.getMessage + assert(errMsg.contains("Partition spec is invalid. " + + "The spec (part0) must match the partition spec (part0, part1)")) + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org