This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 2ebeaa1 [SPARK-33591][SQL][3.1] Recognize `null` in partition spec values 2ebeaa1 is described below commit 2ebeaa15655841a4448fd8fca255dad79038222b Author: Max Gekk <max.g...@gmail.com> AuthorDate: Mon Jan 11 04:56:39 2021 +0000 [SPARK-33591][SQL][3.1] Recognize `null` in partition spec values ### What changes were proposed in this pull request? 1. Recognize `null` while parsing partition specs, and put `null` instead of `"null"` as partition values. 2. For V1 catalog: replace `null` by `__HIVE_DEFAULT_PARTITION__`. 3. For V2 catalogs: pass `null` AS IS, and let catalog implementations to decide how to handle `null`s as partition values in spec. ### Why are the changes needed? Currently, `null` in partition specs is recognized as the `"null"` string which could lead to incorrect results, for example: ```sql spark-sql> CREATE TABLE tbl5 (col1 INT, p1 STRING) USING PARQUET PARTITIONED BY (p1); spark-sql> INSERT INTO TABLE tbl5 PARTITION (p1 = null) SELECT 0; spark-sql> SELECT isnull(p1) FROM tbl5; false ``` Even we inserted a row to the partition with the `null` value, **the resulted table doesn't contain `null`**. ### Does this PR introduce _any_ user-facing change? Yes. After the changes, the example above works as expected: ```sql spark-sql> SELECT isnull(p1) FROM tbl5; true ``` ### How was this patch tested? By running the affected test suites `SQLQuerySuite`, `AlterTablePartitionV2SQLSuite` and `v1/ShowPartitionsSuite`. Authored-by: Max Gekk <max.gekkgmail.com> Signed-off-by: Wenchen Fan <wenchendatabricks.com> (cherry picked from commit 157b72ac9fa0057d5fd6d7ed52a6c4b22ebd1dfc) Signed-off-by: Max Gekk <max.gekkgmail.com> Closes #31094 from MaxGekk/partition-spec-value-null-3.1. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../catalyst/catalog/ExternalCatalogUtils.scala | 10 +++++++ .../sql/catalyst/catalog/InMemoryCatalog.scala | 7 ++++- .../sql/catalyst/catalog/SessionCatalog.scala | 2 +- .../spark/sql/catalyst/parser/AstBuilder.scala | 1 + .../spark/sql/execution/datasources/rules.scala | 3 +- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 9 ++++++ .../connector/AlterTablePartitionV2SQLSuite.scala | 15 ++++++++++ .../spark/sql/execution/command/DDLSuite.scala | 8 ++++++ .../execution/command/v1/ShowPartitionsSuite.scala | 12 ++++++++ .../spark/sql/hive/HiveExternalCatalog.scala | 33 ++++++++++++---------- .../sql/hive/execution/InsertIntoHiveTable.scala | 2 ++ 11 files changed, 84 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala index 00445a1..9d6e0a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala @@ -161,6 +161,10 @@ object ExternalCatalogUtils { } } + private def isNullPartitionValue(value: String): Boolean = { + value == null || value == DEFAULT_PARTITION_NAME + } + /** * Returns true if `spec1` is a partial partition spec w.r.t. `spec2`, e.g. PARTITION (a=1) is a * partial partition spec w.r.t. PARTITION (a=1,b=2). @@ -169,9 +173,15 @@ object ExternalCatalogUtils { spec1: TablePartitionSpec, spec2: TablePartitionSpec): Boolean = { spec1.forall { + case (partitionColumn, value) if isNullPartitionValue(value) => + isNullPartitionValue(spec2(partitionColumn)) case (partitionColumn, value) => spec2(partitionColumn) == value } } + + def convertNullPartitionValues(spec: TablePartitionSpec): TablePartitionSpec = { + spec.mapValues(v => if (v == null) DEFAULT_PARTITION_NAME else v).toMap + } } object CatalogUtils { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 31644a5..c059503 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -541,7 +541,12 @@ class InMemoryCatalog( listPartitions(db, table, partialSpec).map { partition => partitionColumnNames.map { name => - escapePathName(name) + "=" + escapePathName(partition.spec(name)) + val partValue = if (partition.spec(name) == null) { + DEFAULT_PARTITION_NAME + } else { + escapePathName(partition.spec(name)) + } + escapePathName(name) + "=" + partValue }.mkString("/") }.sorted } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 466a856..ab886d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1185,7 +1185,7 @@ class SessionCatalog( */ private def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = { specs.foreach { s => - if (s.values.exists(_.isEmpty)) { + if (s.values.exists(v => v != null && v.isEmpty)) { val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") throw new AnalysisException( s"Partition spec is invalid. The spec ($spec) contains an empty partition column value") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 9d74ac9..34f56e9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -511,6 +511,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg */ protected def visitStringConstant(ctx: ConstantContext): String = withOrigin(ctx) { ctx match { + case _: NullLiteralContext => null case s: StringLiteralContext => createString(s) case o => o.getText } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index b9866e4..4fd6684 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -406,7 +406,8 @@ object PreprocessTableInsertion extends Rule[LogicalPlan] { catalogTable.get.tracksPartitionsInCatalog if (partitionsTrackedByCatalog && normalizedPartSpec.nonEmpty) { // empty partition column value - if (normalizedPartSpec.filter(_._2.isDefined).exists(_._2.get.isEmpty)) { + if (normalizedPartSpec.map(_._2) + .filter(_.isDefined).map(_.get).exists(v => v != null && v.isEmpty)) { val spec = normalizedPartSpec.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") throw new AnalysisException( s"Partition spec is invalid. The spec ($spec) contains an empty partition column value") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index a003275..f6706f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3773,6 +3773,15 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } } + + test("SPARK-33591: null as a partition value") { + val t = "part_table" + withTable(t) { + sql(s"CREATE TABLE $t (col1 INT, p1 STRING) USING PARQUET PARTITIONED BY (p1)") + sql(s"INSERT INTO TABLE $t PARTITION (p1 = null) SELECT 0") + checkAnswer(sql(s"SELECT * FROM $t"), Row(0, null)) + } + } } case class Foo(bar: Option[String]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index 45d47c6..9987043 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -281,4 +281,19 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { } } } + + test("SPARK-33591: null as a partition value") { + val t = "testpart.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (col1 INT, p1 STRING) USING foo PARTITIONED BY (p1)") + sql(s"ALTER TABLE $t ADD PARTITION (p1 = null)") + + val partTable = catalog("testpart").asTableCatalog + .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")) + .asPartitionable + assert(partTable.partitionExists(InternalRow(null))) + sql(s"ALTER TABLE $t DROP PARTITION (p1 = null)") + assert(!partTable.partitionExists(InternalRow(null))) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index cfda1b8..cb25777 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1730,6 +1730,14 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { // use int literal as partition value for int type partition column sql("ALTER TABLE tab1 DROP PARTITION (a=9, b=9)") assert(catalog.listPartitions(tableIdent).isEmpty) + + // null partition values + createTablePartition(catalog, Map("a" -> null, "b" -> null), tableIdent) + val nullPartValue = if (isUsingHiveMetastore) "__HIVE_DEFAULT_PARTITION__" else null + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == + Set(Map("a" -> nullPartValue, "b" -> nullPartValue))) + sql("ALTER TABLE tab1 DROP PARTITION (a = null, b = null)") + assert(catalog.listPartitions(tableIdent).isEmpty) } protected def testRenamePartitions(isDatasourceTable: Boolean): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala index c752a5f..bbca3e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala @@ -67,6 +67,18 @@ trait ShowPartitionsSuiteBase extends command.ShowPartitionsSuiteBase { assert(errMsg.contains("'SHOW PARTITIONS' expects a table")) } } + + test("SPARK-33591: null as a partition value") { + val t = "part_table" + withTable(t) { + sql(s"CREATE TABLE $t (col1 INT, p1 STRING) $defaultUsing PARTITIONED BY (p1)") + sql(s"INSERT INTO TABLE $t PARTITION (p1 = null) SELECT 0") + checkAnswer(sql(s"SHOW PARTITIONS $t"), Row("p1=__HIVE_DEFAULT_PARTITION__")) + checkAnswer( + sql(s"SHOW PARTITIONS $t PARTITION (p1 = null)"), + Row("p1=__HIVE_DEFAULT_PARTITION__")) + } + } } class ShowPartitionsSuite extends ShowPartitionsSuiteBase with SharedSparkSession { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 54c237f..cc7aa14 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -950,9 +950,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // Hive metastore is not case preserving and the partition columns are always lower cased. We need // to lower case the column names in partition specification before calling partition related Hive // APIs, to match this behaviour. - private def lowerCasePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = { + private def toMetaStorePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = { // scalastyle:off caselocale - spec.map { case (k, v) => k.toLowerCase -> v } + val lowNames = spec.map { case (k, v) => k.toLowerCase -> v } + ExternalCatalogUtils.convertNullPartitionValues(lowNames) // scalastyle:on caselocale } @@ -1001,8 +1002,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri))) } - val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec))) - client.createPartitions(db, table, lowerCasedParts, ignoreIfExists) + val metaStoreParts = partsWithLocation + .map(p => p.copy(spec = toMetaStorePartitionSpec(p.spec))) + client.createPartitions(db, table, metaStoreParts, ignoreIfExists) } override def dropPartitions( @@ -1014,7 +1016,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat retainData: Boolean): Unit = withClient { requireTableExists(db, table) client.dropPartitions( - db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, retainData) + db, table, parts.map(toMetaStorePartitionSpec), ignoreIfNotExists, purge, retainData) } override def renamePartitions( @@ -1023,7 +1025,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit = withClient { client.renamePartitions( - db, table, specs.map(lowerCasePartitionSpec), newSpecs.map(lowerCasePartitionSpec)) + db, table, specs.map(toMetaStorePartitionSpec), newSpecs.map(toMetaStorePartitionSpec)) val tableMeta = getTable(db, table) val partitionColumnNames = tableMeta.partitionColumnNames @@ -1039,7 +1041,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val fs = tablePath.getFileSystem(hadoopConf) val newParts = newSpecs.map { spec => val rightPath = renamePartitionDirectory(fs, tablePath, partitionColumnNames, spec) - val partition = client.getPartition(db, table, lowerCasePartitionSpec(spec)) + val partition = client.getPartition(db, table, toMetaStorePartitionSpec(spec)) partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toUri))) } alterPartitions(db, table, newParts) @@ -1149,12 +1151,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit = withClient { - val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec))) + val metaStoreParts = newParts.map(p => p.copy(spec = toMetaStorePartitionSpec(p.spec))) val rawTable = getRawTable(db, table) // convert partition statistics to properties so that we can persist them through hive api - val withStatsProps = lowerCasedParts.map { p => + val withStatsProps = metaStoreParts.map { p => if (p.stats.isDefined) { val statsProperties = statsToProperties(p.stats.get) p.copy(parameters = p.parameters ++ statsProperties) @@ -1170,7 +1172,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition = withClient { - val part = client.getPartition(db, table, lowerCasePartitionSpec(spec)) + val part = client.getPartition(db, table, toMetaStorePartitionSpec(spec)) restorePartitionMetadata(part, getTable(db, table)) } @@ -1208,7 +1210,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat db: String, table: String, spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient { - client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { part => + client.getPartitionOption(db, table, toMetaStorePartitionSpec(spec)).map { part => restorePartitionMetadata(part, getTable(db, table)) } } @@ -1223,7 +1225,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val catalogTable = getTable(db, table) val partColNameMap = buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName) val clientPartitionNames = - client.getPartitionNames(catalogTable, partialSpec.map(lowerCasePartitionSpec)) + client.getPartitionNames(catalogTable, partialSpec.map(toMetaStorePartitionSpec)) clientPartitionNames.map { partitionPath => val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partitionPath) partSpec.map { case (partName, partValue) => @@ -1242,11 +1244,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient { val partColNameMap = buildLowerCasePartColNameMap(getTable(db, table)) - val res = client.getPartitions(db, table, partialSpec.map(lowerCasePartitionSpec)).map { part => - part.copy(spec = restorePartitionSpec(part.spec, partColNameMap)) + val metaStoreSpec = partialSpec.map(toMetaStorePartitionSpec) + val res = client.getPartitions(db, table, metaStoreSpec) + .map { part => part.copy(spec = restorePartitionSpec(part.spec, partColNameMap)) } - partialSpec match { + metaStoreSpec match { // This might be a bug of Hive: When the partition value inside the partial partition spec // contains dot, and we ask Hive to list partitions w.r.t. the partial partition spec, Hive // treats dot as matching any single character and may return more partitions than we diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 3c3f31a..61aef24 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -133,6 +133,7 @@ case class InsertIntoHiveTable( val numDynamicPartitions = partition.values.count(_.isEmpty) val numStaticPartitions = partition.values.count(_.nonEmpty) val partitionSpec = partition.map { + case (key, Some(null)) => key -> ExternalCatalogUtils.DEFAULT_PARTITION_NAME case (key, Some(value)) => key -> value case (key, None) => key -> "" } @@ -229,6 +230,7 @@ case class InsertIntoHiveTable( val caseInsensitiveDpMap = CaseInsensitiveMap(dpMap) val updatedPartitionSpec = partition.map { + case (key, Some(null)) => key -> ExternalCatalogUtils.DEFAULT_PARTITION_NAME case (key, Some(value)) => key -> value case (key, None) if caseInsensitiveDpMap.contains(key) => key -> caseInsensitiveDpMap(key) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org