This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new f8eb533774a8 [SPARK-46466][SQL][3.5] Vectorized parquet reader should never do rebase for timestamp ntz f8eb533774a8 is described below commit f8eb533774a847ea3aeaa5063022513a6e89fb2b Author: Wenchen Fan <cloud0...@gmail.com> AuthorDate: Fri Dec 22 23:25:12 2023 +0800 [SPARK-46466][SQL][3.5] Vectorized parquet reader should never do rebase for timestamp ntz backport https://github.com/apache/spark/pull/44428 ### What changes were proposed in this pull request? This fixes a correctness bug. The TIMESTAMP_NTZ is a new data type in Spark and has no legacy files that need to do calendar rebase. However, the vectorized parquet reader treat it the same as LTZ and may do rebase if the parquet file was written with the legacy rebase mode. This PR fixes it to never do rebase for NTZ. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? Yes, now we can correctly write and read back NTZ value even if the date is before 1582. ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? No Closes #44446 from cloud-fan/ntz2. Authored-by: Wenchen Fan <cloud0...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 0948e24c30f6f7a05110f6e45b6723897e095aeb) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../parquet/ParquetVectorUpdaterFactory.java | 31 ++++++++++++---------- .../datasources/parquet/ParquetQuerySuite.scala | 12 +++++++++ 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index 42442cf8ea8a..8c4fe2085387 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -109,24 +109,32 @@ public class ParquetVectorUpdaterFactory { // For unsigned int64, it stores as plain signed int64 in Parquet when dictionary // fallbacks. We read them as decimal values. return new UnsignedLongUpdater(); - } else if (isTimestamp(sparkType) && - isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) { - validateTimestampType(sparkType); + } else if (sparkType == DataTypes.TimestampType && + isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) { if ("CORRECTED".equals(datetimeRebaseMode)) { return new LongUpdater(); } else { boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); return new LongWithRebaseUpdater(failIfRebase, datetimeRebaseTz); } - } else if (isTimestamp(sparkType) && - isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) { - validateTimestampType(sparkType); + } else if (sparkType == DataTypes.TimestampType && + isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) { if ("CORRECTED".equals(datetimeRebaseMode)) { return new LongAsMicrosUpdater(); } else { final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); return new LongAsMicrosRebaseUpdater(failIfRebase, datetimeRebaseTz); } + } else if (sparkType == DataTypes.TimestampNTZType && + isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) { + validateTimestampNTZType(); + // TIMESTAMP_NTZ is a new data type and has no legacy files that need to do rebase. + return new LongUpdater(); + } else if (sparkType == DataTypes.TimestampNTZType && + isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) { + validateTimestampNTZType(); + // TIMESTAMP_NTZ is a new data type and has no legacy files that need to do rebase. + return new LongAsMicrosUpdater(); } else if (sparkType instanceof DayTimeIntervalType) { return new LongUpdater(); } @@ -196,12 +204,11 @@ public class ParquetVectorUpdaterFactory { ((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).getUnit() == unit; } - void validateTimestampType(DataType sparkType) { + private void validateTimestampNTZType() { assert(logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation); - // Throw an exception if the Parquet type is TimestampLTZ and the Catalyst type is TimestampNTZ. + // Throw an exception if the Parquet type is TimestampLTZ as the Catalyst type is TimestampNTZ. // This is to avoid mistakes in reading the timestamp values. - if (((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).isAdjustedToUTC() && - sparkType == DataTypes.TimestampNTZType) { + if (((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).isAdjustedToUTC()) { convertErrorForTimestampNTZ("int64 time(" + logicalTypeAnnotation + ")"); } } @@ -1152,10 +1159,6 @@ public class ParquetVectorUpdaterFactory { return false; } - private static boolean isTimestamp(DataType dt) { - return dt == DataTypes.TimestampType || dt == DataTypes.TimestampNTZType; - } - private static boolean isDecimalTypeMatched(ColumnDescriptor descriptor, DataType dt) { DecimalType d = (DecimalType) dt; LogicalTypeAnnotation typeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index ea5444a1791f..828ec39c7d72 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -255,6 +255,18 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } + test("SPARK-46466: write and read TimestampNTZ with legacy rebase mode") { + withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> "LEGACY") { + withTable("ts") { + sql("create table ts (c1 timestamp_ntz) using parquet") + sql("insert into ts values (timestamp_ntz'0900-01-01 01:10:10')") + withAllParquetReaders { + checkAnswer(spark.table("ts"), sql("select timestamp_ntz'0900-01-01 01:10:10'")) + } + } + } + } + test("Enabling/disabling merging partfiles when merging parquet schema") { def testSchemaMerging(expectedColumnNumber: Int): Unit = { withTempDir { dir => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org