This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c3a04fa59ce1 [SPARK-47447][SQL] Allow reading Parquet TimestampLTZ as 
TimestampNTZ
c3a04fa59ce1 is described below

commit c3a04fa59ce1aabe4818430ae294fb8d210c0e4b
Author: Gengliang Wang <gengli...@apache.org>
AuthorDate: Tue Mar 19 23:04:59 2024 -0700

    [SPARK-47447][SQL] Allow reading Parquet TimestampLTZ as TimestampNTZ
    
    ### What changes were proposed in this pull request?
    
    Currently, Parquet TimestampNTZ type columns can be read as TimestampLTZ, 
while reading TimestampLTZ as TimestampNTZ will cause errors. This makes it 
impossible to read parquet files containing both TimestampLTZ and TimestampNTZ 
as TimestampNTZ.
    
    To make the data type system on Parquet simpler, this PR allows reading 
TimestampLTZ as TimestampNTZ in the Parquet data source.
    
    ### Why are the changes needed?
    
    * Make it possible  to read parquet files containing both TimestampLTZ and 
TimestampNTZ as TimestampNTZ
    * Make the data type system on Parquet simpler
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, Parquet TimestampLTZ type column are now allowed to be read as 
TimestampNTZ
    
    ### How was this patch tested?
    
    UT
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #45571 from gengliangwang/allowReadLTZAsNTZ.
    
    Authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../parquet/ParquetVectorUpdaterFactory.java       | 19 ++-----------------
 .../datasources/parquet/ParquetRowConverter.scala  | 16 ++++++++++++----
 .../datasources/parquet/ParquetQuerySuite.scala    | 22 +++++++---------------
 3 files changed, 21 insertions(+), 36 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index abb44915cbcd..b6065c24f2ec 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -148,12 +148,10 @@ public class ParquetVectorUpdaterFactory {
           }
         } else if (sparkType == DataTypes.TimestampNTZType &&
           isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
-          validateTimestampNTZType();
           // TIMESTAMP_NTZ is a new data type and has no legacy files that 
need to do rebase.
           return new LongUpdater();
         } else if (sparkType == DataTypes.TimestampNTZType &&
           isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
-          validateTimestampNTZType();
           // TIMESTAMP_NTZ is a new data type and has no legacy files that 
need to do rebase.
           return new LongAsMicrosUpdater();
         } else if (sparkType instanceof DayTimeIntervalType) {
@@ -176,7 +174,8 @@ public class ParquetVectorUpdaterFactory {
       }
       case INT96 -> {
         if (sparkType == DataTypes.TimestampNTZType) {
-          convertErrorForTimestampNTZ(typeName.name());
+          // TimestampNTZ type does not require rebasing due to its lack of 
time zone context.
+          return new BinaryToSQLTimestampUpdater();
         } else if (sparkType == DataTypes.TimestampType) {
           final boolean failIfRebase = "EXCEPTION".equals(int96RebaseMode);
           if (!shouldConvertTimestamps()) {
@@ -232,20 +231,6 @@ public class ParquetVectorUpdaterFactory {
       annotation.getUnit() == unit;
   }
 
-  private void validateTimestampNTZType() {
-    assert(logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation);
-    // Throw an exception if the Parquet type is TimestampLTZ as the Catalyst 
type is TimestampNTZ.
-    // This is to avoid mistakes in reading the timestamp values.
-    if (((TimestampLogicalTypeAnnotation) 
logicalTypeAnnotation).isAdjustedToUTC()) {
-      convertErrorForTimestampNTZ("int64 time(" + logicalTypeAnnotation + ")");
-    }
-  }
-
-  void convertErrorForTimestampNTZ(String parquetType) {
-    throw new RuntimeException("Unable to create Parquet converter for data 
type " +
-      DataTypes.TimestampNTZType.json() + " whose Parquet type is " + 
parquetType);
-  }
-
   boolean isUnsignedIntTypeMatched(int bitWidth) {
     return logicalTypeAnnotation instanceof IntLogicalTypeAnnotation 
annotation &&
       !annotation.isSigned() && annotation.getBitWidth() == bitWidth;
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 1f4522aef2bb..3f5754f27ae3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -436,6 +436,17 @@ private[parquet] class ParquetRowConverter(
           }
         }
 
+      // INT96 timestamp doesn't have a logical type, here we check the 
physical type instead.
+      case TimestampNTZType if 
parquetType.asPrimitiveType().getPrimitiveTypeName == INT96 =>
+        new ParquetPrimitiveConverter(updater) {
+          // Converts nanosecond timestamps stored as INT96.
+          // TimestampNTZ type does not require rebasing due to its lack of 
time zone context.
+          override def addBinary(value: Binary): Unit = {
+            val julianMicros = ParquetRowConverter.binaryToSQLTimestamp(value)
+            this.updater.setLong(julianMicros)
+          }
+        }
+
       case TimestampNTZType
         if canReadAsTimestampNTZ(parquetType) &&
           parquetType.getLogicalTypeAnnotation
@@ -536,10 +547,7 @@ private[parquet] class ParquetRowConverter(
   // can be read as Spark's TimestampNTZ type. This is to avoid mistakes in 
reading the timestamp
   // values.
   private def canReadAsTimestampNTZ(parquetType: Type): Boolean =
-    parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 &&
-    
parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation]
 &&
-    !parquetType.getLogicalTypeAnnotation
-      .asInstanceOf[TimestampLogicalTypeAnnotation].isAdjustedToUTC
+    
parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation]
 
   /**
    * Parquet converter for strings. A dictionary is used to minimize string 
decoding cost.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 02937ae0fea8..26641cd18d9c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -185,29 +185,21 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
     }
   }
 
-  test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") {
-    val data = (1 to 1000).map { i =>
-      val ts = new java.sql.Timestamp(i)
-      Row(ts)
-    }
-    val actualSchema = StructType(Seq(StructField("time", TimestampType, 
false)))
+  test("SPARK-47447: read TimestampLTZ as TimestampNTZ") {
     val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, 
false)))
 
     Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType =>
       Seq(true, false).foreach { dictionaryEnabled =>
         withSQLConf(
-            SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsType,
-            ParquetOutputFormat.ENABLE_DICTIONARY -> 
dictionaryEnabled.toString) {
+          SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsType,
+          ParquetOutputFormat.ENABLE_DICTIONARY -> dictionaryEnabled.toString,
+          SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") {
           withTempPath { file =>
-            val df = spark.createDataFrame(sparkContext.parallelize(data), 
actualSchema)
+            val df = sql("select timestamp'2021-02-02 16:00:00' as time")
             df.write.parquet(file.getCanonicalPath)
             withAllParquetReaders {
-              val e = intercept[SparkException] {
-                
spark.read.schema(providedSchema).parquet(file.getCanonicalPath).collect()
-              }
-              assert(e.getErrorClass == "FAILED_READ_FILE")
-              assert(e.getCause.getMessage.contains(
-                "Unable to create Parquet converter for data type 
\"timestamp_ntz\""))
+              val df2 = 
spark.read.schema(providedSchema).parquet(file.getCanonicalPath)
+              checkAnswer(df2, Row(LocalDateTime.parse("2021-02-03T00:00:00")))
             }
           }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to