This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 13b67ee8cc3 [SPARK-45604][SQL] Add LogicalType checking on INT64 -> 
DateTime conversion on Parquet Vectorized Reader
13b67ee8cc3 is described below

commit 13b67ee8cc377a5cc47d02b9addbc00eabfc8b6c
Author: Zamil Majdy <zamil.ma...@databricks.com>
AuthorDate: Sun Oct 22 10:53:22 2023 +0500

    [SPARK-45604][SQL] Add LogicalType checking on INT64 -> DateTime conversion 
on Parquet Vectorized Reader
    
    ### What changes were proposed in this pull request?
    
    Currently, the read logical type is not checked while converting physical 
types INT64 into DateTime. One valid scenario where this can break is where the 
physical type is `timestamp_ntz`, and the logical type is 
`array<timestamp_ntz>`, since the logical type check does not happen, this 
conversion is allowed. However, the vectorized reader does not support this and 
will produce NPE on on-heap memory mode and SEGFAULT on off-heap memory mode. 
Segmentation fault on off-heap memory mode c [...]
    
    ### Why are the changes needed?
    Prevent NPE or Segfault from happening.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    A new test is added in `ParquetSchemaSuite`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43451 from majdyz/SPARK-45604.
    
    Lead-authored-by: Zamil Majdy <zamil.ma...@databricks.com>
    Co-authored-by: Zamil Majdy <zamil.ma...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../parquet/ParquetVectorUpdaterFactory.java        | 10 ++++++++--
 .../datasources/parquet/ParquetSchemaSuite.scala    | 21 +++++++++++++++++++++
 2 files changed, 29 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index d5675db4c3a..26bef0fe3a6 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -109,7 +109,8 @@ public class ParquetVectorUpdaterFactory {
           // For unsigned int64, it stores as plain signed int64 in Parquet 
when dictionary
           // fallbacks. We read them as decimal values.
           return new UnsignedLongUpdater();
-        } else if 
(isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
+        } else if (isTimestamp(sparkType) &&
+            isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
           validateTimestampType(sparkType);
           if ("CORRECTED".equals(datetimeRebaseMode)) {
             return new LongUpdater();
@@ -117,7 +118,8 @@ public class ParquetVectorUpdaterFactory {
             boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
             return new LongWithRebaseUpdater(failIfRebase, datetimeRebaseTz);
           }
-        } else if 
(isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
+        } else if (isTimestamp(sparkType) &&
+            isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
           validateTimestampType(sparkType);
           if ("CORRECTED".equals(datetimeRebaseMode)) {
             return new LongAsMicrosUpdater();
@@ -1149,6 +1151,10 @@ public class ParquetVectorUpdaterFactory {
     return false;
   }
 
+  private static boolean isTimestamp(DataType dt) {
+    return dt == DataTypes.TimestampType || dt == DataTypes.TimestampNTZType;
+  }
+
   private static boolean isDecimalTypeMatched(ColumnDescriptor descriptor, 
DataType dt) {
     DecimalType d = (DecimalType) dt;
     LogicalTypeAnnotation typeAnnotation = 
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index ef06e64d2eb..19feb9b8bb5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -1087,6 +1087,27 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
     }
   }
 
+  test("SPARK-45604: schema mismatch failure error on timestamp_ntz to 
array<timestamp_ntz>") {
+    import testImplicits._
+
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val timestamp = java.time.LocalDateTime.of(1, 2, 3, 4, 5)
+      val df1 = Seq((1, timestamp)).toDF()
+      val df2 = Seq((2, Array(timestamp))).toDF()
+      df1.write.mode("overwrite").parquet(s"$path/parquet")
+      df2.write.mode("append").parquet(s"$path/parquet")
+
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+        val e = intercept[SparkException] {
+          spark.read.schema(df2.schema).parquet(s"$path/parquet").collect()
+        }
+        assert(e.getCause.isInstanceOf[SparkException])
+        
assert(e.getCause.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
+      }
+    }
+  }
+
   test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with 
nanosAsLong=true)") {
     val tsAttribute = "birthday"
     withSQLConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key -> "true") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to