This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new baad1970 fix: Overflow when reading Timestamp from parquet file (#542)
baad1970 is described below
commit baad19707002a9cb279c56111b898b6f0c19ba9b
Author: Emil Ejbyfeldt <[email protected]>
AuthorDate: Wed Jun 12 17:53:30 2024 +0200
fix: Overflow when reading Timestamp from parquet file (#542)
* fix: Overflow when reading Timestamp from parquet file
* Add helper method
---
core/src/parquet/read/values.rs | 60 ++++++++++------------
.../scala/org/apache/comet/CometCastSuite.scala | 10 +++-
2 files changed, 37 insertions(+), 33 deletions(-)
diff --git a/core/src/parquet/read/values.rs b/core/src/parquet/read/values.rs
index c2b1b6e6..76c8a4a1 100644
--- a/core/src/parquet/read/values.rs
+++ b/core/src/parquet/read/values.rs
@@ -727,6 +727,17 @@ const INT96_SRC_BYTE_WIDTH: usize = 12;
// We convert INT96 to micros and store using i64
const INT96_DST_BYTE_WIDTH: usize = 8;
+fn int96_to_microsecond(v: &[u8]) -> i64 {
+ let nanos = &v[..INT96_DST_BYTE_WIDTH] as *const [u8] as *const u8 as
*const i64;
+ let day = &v[INT96_DST_BYTE_WIDTH..] as *const [u8] as *const u8 as *const
i32;
+
+ unsafe {
+ ((day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as i64)
+ .wrapping_mul(MICROS_PER_DAY)
+ .wrapping_add(nanos.read_unaligned() / 1000)
+ }
+}
+
/// Decode timestamps represented as INT96 into i64 with micros precision
impl PlainDecoding for Int96TimestampMicrosType {
#[inline]
@@ -736,51 +747,36 @@ impl PlainDecoding for Int96TimestampMicrosType {
if !src.read_options.use_legacy_date_timestamp_or_ntz {
let mut offset = src.offset;
for _ in 0..num {
- let v = &src_data[offset..offset + INT96_SRC_BYTE_WIDTH];
- let nanos = &v[..INT96_DST_BYTE_WIDTH] as *const [u8] as
*const u8 as *const i64;
- let day = &v[INT96_DST_BYTE_WIDTH..] as *const [u8] as *const
u8 as *const i32;
-
// TODO: optimize this further as checking value one by one is
not very efficient
- unsafe {
- let micros = (day.read_unaligned() - JULIAN_DAY_OF_EPOCH)
as i64
- * MICROS_PER_DAY
- + nanos.read_unaligned() / 1000;
+ let micros = int96_to_microsecond(&src_data[offset..offset +
INT96_SRC_BYTE_WIDTH]);
- if unlikely(micros < JULIAN_GREGORIAN_SWITCH_OFF_TS) {
- panic!(
- "Encountered timestamp value {}, which is before
1582-10-15 (counting \
+ if unlikely(micros < JULIAN_GREGORIAN_SWITCH_OFF_TS) {
+ panic!(
+ "Encountered timestamp value {}, which is before
1582-10-15 (counting \
backwards from Unix eopch date 1970-01-01), and could
be ambigous \
depending on whether a legacy Julian/Gregorian hybrid
calendar is used, \
or a Proleptic Gregorian calendar is used.",
- micros
- );
- }
-
- offset += INT96_SRC_BYTE_WIDTH;
+ micros
+ );
}
+
+ offset += INT96_SRC_BYTE_WIDTH;
}
}
let mut offset = src.offset;
let mut dst_offset = INT96_DST_BYTE_WIDTH * dst.num_values;
- unsafe {
- for _ in 0..num {
- let v = &src_data[offset..offset + INT96_SRC_BYTE_WIDTH];
- let nanos = &v[..INT96_DST_BYTE_WIDTH] as *const [u8] as
*const u8 as *const i64;
- let day = &v[INT96_DST_BYTE_WIDTH..] as *const [u8] as *const
u8 as *const i32;
-
- let micros = (day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as
i64 * MICROS_PER_DAY
- + nanos.read_unaligned() / 1000;
+ for _ in 0..num {
+ let micros = int96_to_microsecond(&src_data[offset..offset +
INT96_SRC_BYTE_WIDTH]);
- bit::memcpy_value(
- µs,
- INT96_DST_BYTE_WIDTH,
- &mut dst.value_buffer[dst_offset..],
- );
+ bit::memcpy_value(
+ µs,
+ INT96_DST_BYTE_WIDTH,
+ &mut dst.value_buffer[dst_offset..],
+ );
- offset += INT96_SRC_BYTE_WIDTH;
- dst_offset += INT96_DST_BYTE_WIDTH;
- }
+ offset += INT96_SRC_BYTE_WIDTH;
+ dst_offset += INT96_DST_BYTE_WIDTH;
}
src.offset = offset;
diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
index 71134e55..31d718d4 100644
--- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala
@@ -780,7 +780,7 @@ class CometCastSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
test("cast TimestampType to LongType") {
assume(CometSparkSessionExtensions.isSpark33Plus)
- castTest(generateTimestamps(), DataTypes.LongType)
+ castTest(generateTimestampsExtended(), DataTypes.LongType)
}
ignore("cast TimestampType to FloatType") {
@@ -884,6 +884,14 @@ class CometCastSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
withNulls(values).toDF("b").withColumn("a",
col("b").cast(DataTypes.DateType)).drop("b")
}
+ // Extended values are Timestamps that are outside dates supported
chrono::DateTime and
+ // therefore not supported by operations using it.
+ private def generateTimestampsExtended(): DataFrame = {
+ val values = Seq("290000-12-31T01:00:00+02:00")
+ generateTimestamps().unionByName(
+
values.toDF("str").select(col("str").cast(DataTypes.TimestampType).as("a")))
+ }
+
private def generateTimestamps(): DataFrame = {
val values =
Seq(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]