This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new b24bc5c1a27 [SPARK-45604][SQL] Add LogicalType checking on INT64 -> 
DateTime conversion on Parquet Vectorized Reader
b24bc5c1a27 is described below

commit b24bc5c1a27e847143a8159d4291ddac87b7f387
Author: Zamil Majdy <zamil.ma...@databricks.com>
AuthorDate: Sun Oct 22 10:53:22 2023 +0500

    [SPARK-45604][SQL] Add LogicalType checking on INT64 -> DateTime conversion 
on Parquet Vectorized Reader
    
    ### What changes were proposed in this pull request?
    
    Currently, the read logical type is not checked while converting physical 
types INT64 into DateTime. One valid scenario where this can break is where the 
physical type is `timestamp_ntz`, and the logical type is 
`array<timestamp_ntz>`, since the logical type check does not happen, this 
conversion is allowed. However, the vectorized reader does not support this and 
will produce NPE on on-heap memory mode and SEGFAULT on off-heap memory mode. 
Segmentation fault on off-heap memory mode c [...]
    
    ### Why are the changes needed?
    Prevent NPE or Segfault from happening.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    A new test is added in `ParquetSchemaSuite`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43451 from majdyz/SPARK-45604.
    
    Lead-authored-by: Zamil Majdy <zamil.ma...@databricks.com>
    Co-authored-by: Zamil Majdy <zamil.ma...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
    (cherry picked from commit 13b67ee8cc377a5cc47d02b9addbc00eabfc8b6c)
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../parquet/ParquetVectorUpdaterFactory.java        | 10 ++++++++--
 .../datasources/parquet/ParquetSchemaSuite.scala    | 21 +++++++++++++++++++++
 2 files changed, 29 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 15d58f0c757..42442cf8ea8 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -109,7 +109,8 @@ public class ParquetVectorUpdaterFactory {
           // For unsigned int64, it stores as plain signed int64 in Parquet 
when dictionary
           // fallbacks. We read them as decimal values.
           return new UnsignedLongUpdater();
-        } else if 
(isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
+        } else if (isTimestamp(sparkType) &&
+            isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
           validateTimestampType(sparkType);
           if ("CORRECTED".equals(datetimeRebaseMode)) {
             return new LongUpdater();
@@ -117,7 +118,8 @@ public class ParquetVectorUpdaterFactory {
             boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
             return new LongWithRebaseUpdater(failIfRebase, datetimeRebaseTz);
           }
-        } else if 
(isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
+        } else if (isTimestamp(sparkType) &&
+            isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
           validateTimestampType(sparkType);
           if ("CORRECTED".equals(datetimeRebaseMode)) {
             return new LongAsMicrosUpdater();
@@ -1150,6 +1152,10 @@ public class ParquetVectorUpdaterFactory {
     return false;
   }
 
+  private static boolean isTimestamp(DataType dt) {
+    return dt == DataTypes.TimestampType || dt == DataTypes.TimestampNTZType;
+  }
+
   private static boolean isDecimalTypeMatched(ColumnDescriptor descriptor, 
DataType dt) {
     DecimalType d = (DecimalType) dt;
     LogicalTypeAnnotation typeAnnotation = 
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 5589c61be7a..bf5c51b89bb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -1067,6 +1067,27 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
     }
   }
 
+  test("SPARK-45604: schema mismatch failure error on timestamp_ntz to 
array<timestamp_ntz>") {
+    import testImplicits._
+
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val timestamp = java.time.LocalDateTime.of(1, 2, 3, 4, 5)
+      val df1 = Seq((1, timestamp)).toDF()
+      val df2 = Seq((2, Array(timestamp))).toDF()
+      df1.write.mode("overwrite").parquet(s"$path/parquet")
+      df2.write.mode("append").parquet(s"$path/parquet")
+
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+        val e = intercept[SparkException] {
+          spark.read.schema(df2.schema).parquet(s"$path/parquet").collect()
+        }
+        assert(e.getCause.isInstanceOf[SparkException])
+        
assert(e.getCause.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
+      }
+    }
+  }
+
   test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with 
nanosAsLong=true)") {
     val tsAttribute = "birthday"
     withSQLConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key -> "true") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to