This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new baad1970 fix: Overflow when reading Timestamp from parquet file (#542)
baad1970 is described below

commit baad19707002a9cb279c56111b898b6f0c19ba9b
Author: Emil Ejbyfeldt <[email protected]>
AuthorDate: Wed Jun 12 17:53:30 2024 +0200

    fix: Overflow when reading Timestamp from parquet file (#542)
    
    * fix: Overflow when reading Timestamp from parquet file
    
    * Add helper method
---
 core/src/parquet/read/values.rs                    | 60 ++++++++++------------
 .../scala/org/apache/comet/CometCastSuite.scala    | 10 +++-
 2 files changed, 37 insertions(+), 33 deletions(-)

diff --git a/core/src/parquet/read/values.rs b/core/src/parquet/read/values.rs
index c2b1b6e6..76c8a4a1 100644
--- a/core/src/parquet/read/values.rs
+++ b/core/src/parquet/read/values.rs
@@ -727,6 +727,17 @@ const INT96_SRC_BYTE_WIDTH: usize = 12;
 // We convert INT96 to micros and store using i64
 const INT96_DST_BYTE_WIDTH: usize = 8;
 
+fn int96_to_microsecond(v: &[u8]) -> i64 {
+    let nanos = &v[..INT96_DST_BYTE_WIDTH] as *const [u8] as *const u8 as 
*const i64;
+    let day = &v[INT96_DST_BYTE_WIDTH..] as *const [u8] as *const u8 as *const 
i32;
+
+    unsafe {
+        ((day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as i64)
+            .wrapping_mul(MICROS_PER_DAY)
+            .wrapping_add(nanos.read_unaligned() / 1000)
+    }
+}
+
 /// Decode timestamps represented as INT96 into i64 with micros precision
 impl PlainDecoding for Int96TimestampMicrosType {
     #[inline]
@@ -736,51 +747,36 @@ impl PlainDecoding for Int96TimestampMicrosType {
         if !src.read_options.use_legacy_date_timestamp_or_ntz {
             let mut offset = src.offset;
             for _ in 0..num {
-                let v = &src_data[offset..offset + INT96_SRC_BYTE_WIDTH];
-                let nanos = &v[..INT96_DST_BYTE_WIDTH] as *const [u8] as 
*const u8 as *const i64;
-                let day = &v[INT96_DST_BYTE_WIDTH..] as *const [u8] as *const 
u8 as *const i32;
-
                 // TODO: optimize this further as checking value one by one is 
not very efficient
-                unsafe {
-                    let micros = (day.read_unaligned() - JULIAN_DAY_OF_EPOCH) 
as i64
-                        * MICROS_PER_DAY
-                        + nanos.read_unaligned() / 1000;
+                let micros = int96_to_microsecond(&src_data[offset..offset + 
INT96_SRC_BYTE_WIDTH]);
 
-                    if unlikely(micros < JULIAN_GREGORIAN_SWITCH_OFF_TS) {
-                        panic!(
-                            "Encountered timestamp value {}, which is before 
1582-10-15 (counting \
+                if unlikely(micros < JULIAN_GREGORIAN_SWITCH_OFF_TS) {
+                    panic!(
+                        "Encountered timestamp value {}, which is before 
1582-10-15 (counting \
                          backwards from Unix eopch date 1970-01-01), and could 
be ambigous \
                          depending on whether a legacy Julian/Gregorian hybrid 
calendar is used, \
                          or a Proleptic Gregorian calendar is used.",
-                            micros
-                        );
-                    }
-
-                    offset += INT96_SRC_BYTE_WIDTH;
+                        micros
+                    );
                 }
+
+                offset += INT96_SRC_BYTE_WIDTH;
             }
         }
 
         let mut offset = src.offset;
         let mut dst_offset = INT96_DST_BYTE_WIDTH * dst.num_values;
-        unsafe {
-            for _ in 0..num {
-                let v = &src_data[offset..offset + INT96_SRC_BYTE_WIDTH];
-                let nanos = &v[..INT96_DST_BYTE_WIDTH] as *const [u8] as 
*const u8 as *const i64;
-                let day = &v[INT96_DST_BYTE_WIDTH..] as *const [u8] as *const 
u8 as *const i32;
-
-                let micros = (day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as 
i64 * MICROS_PER_DAY
-                    + nanos.read_unaligned() / 1000;
+        for _ in 0..num {
+            let micros = int96_to_microsecond(&src_data[offset..offset + 
INT96_SRC_BYTE_WIDTH]);
 
-                bit::memcpy_value(
-                    &micros,
-                    INT96_DST_BYTE_WIDTH,
-                    &mut dst.value_buffer[dst_offset..],
-                );
+            bit::memcpy_value(
+                &micros,
+                INT96_DST_BYTE_WIDTH,
+                &mut dst.value_buffer[dst_offset..],
+            );
 
-                offset += INT96_SRC_BYTE_WIDTH;
-                dst_offset += INT96_DST_BYTE_WIDTH;
-            }
+            offset += INT96_SRC_BYTE_WIDTH;
+            dst_offset += INT96_DST_BYTE_WIDTH;
         }
 
         src.offset = offset;
diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
index 71134e55..31d718d4 100644
--- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
@@ -780,7 +780,7 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
 
   test("cast TimestampType to LongType") {
     assume(CometSparkSessionExtensions.isSpark33Plus)
-    castTest(generateTimestamps(), DataTypes.LongType)
+    castTest(generateTimestampsExtended(), DataTypes.LongType)
   }
 
   ignore("cast TimestampType to FloatType") {
@@ -884,6 +884,14 @@ class CometCastSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     withNulls(values).toDF("b").withColumn("a", 
col("b").cast(DataTypes.DateType)).drop("b")
   }
 
+  // Extended values are Timestamps that are outside dates supported 
chrono::DateTime and
+  // therefore not supported by operations using it.
+  private def generateTimestampsExtended(): DataFrame = {
+    val values = Seq("290000-12-31T01:00:00+02:00")
+    generateTimestamps().unionByName(
+      
values.toDF("str").select(col("str").cast(DataTypes.TimestampType).as("a")))
+  }
+
   private def generateTimestamps(): DataFrame = {
     val values =
       Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to