This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new e120a53 [SPARK-38120][SQL] Fix HiveExternalCatalog.listPartitions
when partition column name is upper case and dot in partition value
e120a53 is described below
commit e120a532f202a6e7a8069101fe1fbf9c62f00fe5
Author: khalidmammadov <[email protected]>
AuthorDate: Wed Feb 9 16:54:31 2022 +0800
[SPARK-38120][SQL] Fix HiveExternalCatalog.listPartitions when partition
column name is upper case and dot in partition value
### What changes were proposed in this pull request?
HiveExternalCatalog.listPartitions method call is failing when a partition
column name is upper case and partition value contains dot. It's related to
this change
https://github.com/apache/spark/commit/f18b905f6cace7686ef169fda7de474079d0af23
The test case in that PR does not produce the issue as partition column
name is lower case.
This change will lowercase the partition column name during comparison to
produce expected result, it's is inline with the actual spec transformation
i.e. making it lower case for Hive and using the same function
Below how to reproduce the issue:
```
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_312)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.TableIdentifier
scala> spark.sql("CREATE TABLE customer(id INT, name STRING) PARTITIONED BY
(partCol1 STRING, partCol2 STRING)")
22/02/06 21:10:45 WARN ResolveSessionCatalog: A Hive serde table will be
created as there is no table provider specified. You can set
spark.sql.legacy.createHiveTableByDefault to false so that native data source
table will be created instead.
res0: org.apache.spark.sql.DataFrame = []
scala> spark.sql("INSERT INTO customer PARTITION (partCol1 = 'CA', partCol2
= 'i.j') VALUES (100, 'John')")
res1: org.apache.spark.sql.DataFrame = []
scala>
spark.sessionState.catalog.listPartitions(TableIdentifier("customer"),
Some(Map("partCol2" -> "i.j"))).foreach(println)
java.util.NoSuchElementException: key not found: partcol2
at scala.collection.immutable.Map$Map2.apply(Map.scala:227)
at
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$isPartialPartitionSpec$1(ExternalCatalogUtils.scala:205)
at
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$isPartialPartitionSpec$1$adapted(ExternalCatalogUtils.scala:202)
at scala.collection.immutable.Map$Map1.forall(Map.scala:196)
at
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.isPartialPartitionSpec(ExternalCatalogUtils.scala:202)
at
org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitions$6(HiveExternalCatalog.scala:1312)
at
org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitions$6$adapted(HiveExternalCatalog.scala:1312)
at
scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
at
org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitions$1(HiveExternalCatalog.scala:1312)
at
org.apache.spark.sql.hive.HiveExternalCatalog.withClientWrappingException(HiveExternalCatalog.scala:114)
at
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:103)
at
org.apache.spark.sql.hive.HiveExternalCatalog.listPartitions(HiveExternalCatalog.scala:1296)
at
org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitions(ExternalCatalogWithListener.scala:254)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitions(SessionCatalog.scala:1251)
... 47 elided
*******AFTER FIX*********
scala> import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.TableIdentifier
scala> spark.sql("CREATE TABLE customer(id INT, name STRING) PARTITIONED BY
(partCol1 STRING, partCol2 STRING)")
22/02/06 22:08:11 WARN ResolveSessionCatalog: A Hive serde table will be
created as there is no table provider specified. You can set
spark.sql.legacy.createHiveTableByDefault to false so that native data source
table will be created instead.
res1: org.apache.spark.sql.DataFrame = []
scala> spark.sql("INSERT INTO customer PARTITION (partCol1 = 'CA', partCol2
= 'i.j') VALUES (100, 'John')")
res2: org.apache.spark.sql.DataFrame = []
scala>
spark.sessionState.catalog.listPartitions(TableIdentifier("customer"),
Some(Map("partCol2" -> "i.j"))).foreach(println)
CatalogPartition(
Partition Values: [partCol1=CA, partCol2=i.j]
Location:
file:/home/khalid/dev/oss/test/spark-warehouse/customer/partcol1=CA/partcol2=i.j
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Storage Properties: [serialization.format=1]
Partition Parameters: {rawDataSize=0, numFiles=1,
transient_lastDdlTime=1644185314, totalSize=9,
COLUMN_STATS_ACCURATE={"BASIC_STATS":"true"}, numRows=0}
Created Time: Sun Feb 06 22:08:34 GMT 2022
Last Access: UNKNOWN
Partition Statistics: 9 bytes)
```
### Why are the changes needed?
It fixes the bug
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
`build/sbt -v -d "test:testOnly *CatalogSuite"`
Closes #35409 from khalidmammadov/fix_list_partitions_bug2.
Authored-by: khalidmammadov <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 2ba8a4e263933e7500cbc7c38badb6cb059803c9)
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 4b1ff1180c740294d834e829451f8e4fc78668d6)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../catalyst/catalog/ExternalCatalogSuite.scala | 23 ++++++++++++++++++++++
.../spark/sql/hive/HiveExternalCatalog.scala | 2 +-
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index d310538..f791f77 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -481,6 +481,29 @@ abstract class ExternalCatalogSuite extends SparkFunSuite
with BeforeAndAfterEac
assert(catalog.listPartitions("db2", "tbl1", Some(part2.spec)).map(_.spec)
== Seq(part2.spec))
}
+ test("SPARK-38120: list partitions with special chars and mixed case column
name") {
+ val catalog = newBasicCatalog()
+ val table = CatalogTable(
+ identifier = TableIdentifier("tbl", Some("db1")),
+ tableType = CatalogTableType.EXTERNAL,
+ storage = storageFormat.copy(locationUri =
Some(Utils.createTempDir().toURI)),
+ schema = new StructType()
+ .add("col1", "int")
+ .add("col2", "string")
+ .add("partCol1", "int")
+ .add("partCol2", "string"),
+ provider = Some(defaultProvider),
+ partitionColumnNames = Seq("partCol1", "partCol2"))
+ catalog.createTable(table, ignoreIfExists = false)
+
+ val part1 = CatalogTablePartition(Map("partCol1" -> "1", "partCol2" ->
"i+j"), storageFormat)
+ val part2 = CatalogTablePartition(Map("partCol1" -> "1", "partCol2" ->
"i.j"), storageFormat)
+ catalog.createPartitions("db1", "tbl", Seq(part1, part2), ignoreIfExists =
false)
+
+ assert(catalog.listPartitions("db1", "tbl", Some(part1.spec)).map(_.spec)
== Seq(part1.spec))
+ assert(catalog.listPartitions("db1", "tbl", Some(part2.spec)).map(_.spec)
== Seq(part2.spec))
+ }
+
test("list partitions by filter") {
val tz = TimeZone.getDefault.getID
val catalog = newBasicCatalog()
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 1e43f2c..fa8bf87 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -1263,7 +1263,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf,
hadoopConf: Configurat
// treats dot as matching any single character and may return more
partitions than we
// expected. Here we do an extra filter to drop unexpected partitions.
case Some(spec) if spec.exists(_._2.contains(".")) =>
- res.filter(p => isPartialPartitionSpec(spec, p.spec))
+ res.filter(p => isPartialPartitionSpec(spec,
toMetaStorePartitionSpec(p.spec)))
case _ => res
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]