This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new c1d0aedf3537 [SPARK-51418][SQL] Fix DataSource PARTITON TABLE w/ Hive
type incompatible partition columns
c1d0aedf3537 is described below
commit c1d0aedf35373330ad56af1b7eb4c1d2e2659c6f
Author: Kent Yao <[email protected]>
AuthorDate: Mon Mar 10 13:30:43 2025 +0800
[SPARK-51418][SQL] Fix DataSource PARTITON TABLE w/ Hive type incompatible
partition columns
### What changes were proposed in this pull request?
```
25/03/06 08:25:17 WARN HiveExternalCatalog: Hive incompatible types found:
timestamp_ntz. Persisting data source table `spark_catalog`.`default`.`c` into
Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
org.apache.spark.sql.AnalysisException:
org.apache.hadoop.hive.ql.metadata.HiveException:
InvalidObjectException(message:Invalid partition column type: timestamp_ntz)
```
The partition columns are duplicated and stored both in the HMS column meta
and the table properties. If they contain incompatible data types, the HMS Meta
API will fail the process.
We can rely on the table properties to read/write
### Why are the changes needed?
bugfix, otherwise, newly added spark data types are not able to be used as
partition columns
### Does this PR introduce _any_ user-facing change?
Yes, More type cases are supported for partitioned datasouce tables stored
in HMS
### How was this patch tested?
new tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #50182 from yaooqinn/SPARK-51418.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
(cherry picked from commit f11bc758bafc23c1f9eaf89320c5765ac3a24eac)
Signed-off-by: Kent Yao <[email protected]>
---
.../analyzer-results/timestamp-ntz.sql.out | 36 ++++++++++++++
.../resources/sql-tests/inputs/timestamp-ntz.sql | 6 +++
.../sql-tests/results/timestamp-ntz.sql.out | 56 ++++++++++++++++++++++
.../spark/sql/execution/command/DDLSuite.scala | 11 +++++
.../spark/sql/hive/client/HiveClientImpl.scala | 24 ++++++++--
5 files changed, 129 insertions(+), 4 deletions(-)
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ntz.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ntz.sql.out
index e92a392e22b6..9ab5b2445fc3 100644
---
a/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ntz.sql.out
+++
b/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ntz.sql.out
@@ -137,3 +137,39 @@ select timestamp_ntz'2022-01-01 00:00:00' >
timestamp_ltz'2022-01-01 00:00:00'
select timestamp_ntz'2022-01-01 00:00:00' < timestamp_ltz'2022-01-01 00:00:00'
-- !query analysis
[Analyzer test output redacted due to nondeterminism]
+
+
+-- !query
+CREATE TABLE a (a timestamp_ntz, b int) using parquet PARTITIONED BY(a)
+-- !query analysis
+CreateDataSourceTableCommand `spark_catalog`.`default`.`a`, false
+
+
+-- !query
+INSERT INTO a PARTITION(a=timestamp_ntz'2018-11-17 13:33:33') VALUES (1)
+-- !query analysis
+InsertIntoHadoopFsRelationCommand file:[not included in
comparison]/{warehouse_dir}/a, [a=2018-11-17 13:33:33], false, [a#x], Parquet,
[path=file:[not included in comparison]/{warehouse_dir}/a], Append,
`spark_catalog`.`default`.`a`,
org.apache.spark.sql.execution.datasources.CatalogFileIndex(file:[not included
in comparison]/{warehouse_dir}/a), [b, a]
++- Project [b#x, cast(2018-11-17 13:33:33 as timestamp_ntz) AS a#x]
+ +- Project [cast(col1#x as int) AS b#x]
+ +- LocalRelation [col1#x]
+
+
+-- !query
+DESC FORMATTED a
+-- !query analysis
+DescribeTableCommand `spark_catalog`.`default`.`a`, true, [col_name#x,
data_type#x, comment#x]
+
+
+-- !query
+SELECT * FROM a
+-- !query analysis
+Project [b#x, a#x]
++- SubqueryAlias spark_catalog.default.a
+ +- Relation spark_catalog.default.a[b#x,a#x] parquet
+
+
+-- !query
+DROP TABLE a
+-- !query analysis
+DropTable false, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.a
diff --git a/sql/core/src/test/resources/sql-tests/inputs/timestamp-ntz.sql
b/sql/core/src/test/resources/sql-tests/inputs/timestamp-ntz.sql
index 07901093cfba..7996f5879bf7 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/timestamp-ntz.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/timestamp-ntz.sql
@@ -31,3 +31,9 @@ select timestamp_ntz'2022-01-01 00:00:00' < date'2022-01-01';
select timestamp_ntz'2022-01-01 00:00:00' = timestamp_ltz'2022-01-01 00:00:00';
select timestamp_ntz'2022-01-01 00:00:00' > timestamp_ltz'2022-01-01 00:00:00';
select timestamp_ntz'2022-01-01 00:00:00' < timestamp_ltz'2022-01-01 00:00:00';
+
+CREATE TABLE a (a timestamp_ntz, b int) using parquet PARTITIONED BY(a);
+INSERT INTO a PARTITION(a=timestamp_ntz'2018-11-17 13:33:33') VALUES (1);
+DESC FORMATTED a;
+SELECT * FROM a;
+DROP TABLE a;
diff --git
a/sql/core/src/test/resources/sql-tests/results/timestamp-ntz.sql.out
b/sql/core/src/test/resources/sql-tests/results/timestamp-ntz.sql.out
index 3a473dad828a..9e37bf4e9caa 100644
--- a/sql/core/src/test/resources/sql-tests/results/timestamp-ntz.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/timestamp-ntz.sql.out
@@ -173,3 +173,59 @@ select timestamp_ntz'2022-01-01 00:00:00' <
timestamp_ltz'2022-01-01 00:00:00'
struct<(TIMESTAMP_NTZ '2022-01-01 00:00:00' < TIMESTAMP '2022-01-01
00:00:00'):boolean>
-- !query output
false
+
+
+-- !query
+CREATE TABLE a (a timestamp_ntz, b int) using parquet PARTITIONED BY(a)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+INSERT INTO a PARTITION(a=timestamp_ntz'2018-11-17 13:33:33') VALUES (1)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DESC FORMATTED a
+-- !query schema
+struct<col_name:string,data_type:string,comment:string>
+-- !query output
+b int
+a timestamp_ntz
+# Partition Information
+# col_name data_type comment
+a timestamp_ntz
+
+# Detailed Table Information
+Catalog spark_catalog
+Database default
+Table a
+Created Time [not included in comparison]
+Last Access [not included in comparison]
+Created By [not included in comparison]
+Type MANAGED
+Provider parquet
+Location [not included in comparison]/{warehouse_dir}/a
+Partition Provider Catalog
+
+
+-- !query
+SELECT * FROM a
+-- !query schema
+struct<b:int,a:timestamp_ntz>
+-- !query output
+1 2018-11-17 13:33:33
+
+
+-- !query
+DROP TABLE a
+-- !query schema
+struct<>
+-- !query output
+
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index d91d762048d2..21bdbd40caa8 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -2418,6 +2418,17 @@ abstract class DDLSuite extends QueryTest with
DDLSuiteBase {
)
}
}
+
+ test("SPARK-51418: Partitioned by Hive type incompatible columns") {
+ withTable("t1") {
+ sql("CREATE TABLE t1(a timestamp_ntz, b INTEGER) USING parquet
PARTITIONED BY (a)")
+ sql("INSERT INTO t1 PARTITION(a=timestamp_ntz'2018-11-17 13:33:33')
VALUES (1)")
+ checkAnswer(sql("SELECT * FROM t1"), sql("select 1,
timestamp_ntz'2018-11-17 13:33:33'"))
+ sql("ALTER TABLE t1 ADD COLUMN (c string)")
+ checkAnswer(sql("SELECT * FROM t1"),
+ sql("select 1, null, timestamp_ntz'2018-11-17 13:33:33'"))
+ }
+ }
}
object FakeLocalFsFileSystem {
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 90f8a3a85d70..57f6f999b6ad 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -468,7 +468,9 @@ private[hive] class HiveClientImpl(
// Note: Hive separates partition columns and the schema, but for us the
// partition columns are part of the schema
val (cols, partCols) = try {
- (h.getCols.asScala.map(fromHiveColumn),
h.getPartCols.asScala.map(fromHiveColumn))
+ (h.getCols.asScala.map(fromHiveColumn),
+ h.getPartCols.asScala.filter(_.getType !=
INCOMPATIBLE_PARTITION_TYPE_PLACEHOLDER)
+ .map(fromHiveColumn))
} catch {
case ex: SparkException =>
throw QueryExecutionErrors.convertHiveTableToCatalogTableError(
@@ -1093,6 +1095,13 @@ private[hive] class HiveClientImpl(
}
private[hive] object HiveClientImpl extends Logging {
+ // We can not pass raw catalogString of Hive incompatible types to Hive
metastore.
+ // For regular columns, we have already empty the schema and read/write
using table properties.
+ // For partition columns, we need to set them to the hive table and also
avoid verification
+ // failures from HMS. We use the TYPE_PLACEHOLDER below to bypass the
verification.
+ // See org.apache.hadoop.hive.metastore.MetaStoreUtils#validateColumnType
for more details.
+
+ lazy val INCOMPATIBLE_PARTITION_TYPE_PLACEHOLDER = "<derived from
deserializer>"
/** Converts the native StructField to Hive's FieldSchema. */
def toHiveColumn(c: StructField): FieldSchema = {
// For Hive Serde, we still need to to restore the raw type for char and
varchar type.
@@ -1167,10 +1176,17 @@ private[hive] object HiveClientImpl extends Logging {
hiveTable.setProperty("EXTERNAL", "TRUE")
}
// Note: In Hive the schema and partition columns must be disjoint sets
- val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
- table.partitionColumnNames.contains(c.getName)
+ val (partSchema, schema) = table.schema.partition { c =>
+ table.partitionColumnNames.contains(c.name)
+ }
+
+ val partCols = partSchema.map {
+ case c if !HiveExternalCatalog.isHiveCompatibleDataType(c.dataType) =>
+ new FieldSchema(c.name, INCOMPATIBLE_PARTITION_TYPE_PLACEHOLDER,
c.getComment().orNull)
+ case c => toHiveColumn(c)
}
- hiveTable.setFields(schema.asJava)
+
+ hiveTable.setFields(schema.map(toHiveColumn).asJava)
hiveTable.setPartCols(partCols.asJava)
Option(table.owner).filter(_.nonEmpty).orElse(userName).foreach(hiveTable.setOwner)
hiveTable.setCreateTime(MILLISECONDS.toSeconds(table.createTime).toInt)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]