This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 4dac7450c feat: Improve performance of date truncate (#2997)
4dac7450c is described below
commit 4dac7450cd8fafba63afc00e322be07e2c116b7c
Author: Andy Grove <[email protected]>
AuthorDate: Tue Jan 6 11:49:51 2026 -0700
feat: Improve performance of date truncate (#2997)
---
native/spark-expr/Cargo.toml | 4 +
native/spark-expr/benches/date_trunc.rs | 55 +++++++++
native/spark-expr/src/kernels/mod.rs | 2 +-
native/spark-expr/src/kernels/temporal.rs | 197 ++++++++++++++++--------------
native/spark-expr/src/lib.rs | 3 +-
5 files changed, 164 insertions(+), 97 deletions(-)
diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml
index b056e1b29..94653d886 100644
--- a/native/spark-expr/Cargo.toml
+++ b/native/spark-expr/Cargo.toml
@@ -80,6 +80,10 @@ harness = false
name = "padding"
harness = false
+[[bench]]
+name = "date_trunc"
+harness = false
+
[[bench]]
name = "normalize_nan"
harness = false
diff --git a/native/spark-expr/benches/date_trunc.rs
b/native/spark-expr/benches/date_trunc.rs
new file mode 100644
index 000000000..a3dd0900c
--- /dev/null
+++ b/native/spark-expr/benches/date_trunc.rs
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{ArrayRef, Date32Array};
+use criterion::{criterion_group, criterion_main, Criterion};
+use datafusion_comet_spark_expr::date_trunc_dyn;
+use std::sync::Arc;
+
+fn criterion_benchmark(c: &mut Criterion) {
+ let date_array = create_date_array();
+
+ let mut group = c.benchmark_group("date_trunc");
+
+ // Benchmark each truncation format
+ for format in ["YEAR", "QUARTER", "MONTH", "WEEK"] {
+ let array_ref: ArrayRef = Arc::new(date_array.clone());
+ group.bench_function(format!("date_trunc_{}", format.to_lowercase()),
|b| {
+ b.iter(|| date_trunc_dyn(&array_ref, format.to_string()).unwrap());
+ });
+ }
+
+ group.finish();
+}
+
+fn create_date_array() -> Date32Array {
+ // Create 10000 dates spanning several years (more realistic workload)
+ // Days since Unix epoch: range from 0 (1970-01-01) to ~19000 (2022)
+ let dates: Vec<i32> = (0..10000).map(|i| (i * 2) % 19000).collect();
+ Date32Array::from(dates)
+}
+
+fn config() -> Criterion {
+ Criterion::default()
+}
+
+criterion_group! {
+ name = benches;
+ config = config();
+ targets = criterion_benchmark
+}
+criterion_main!(benches);
diff --git a/native/spark-expr/src/kernels/mod.rs
b/native/spark-expr/src/kernels/mod.rs
index 3669ff13a..0092f016c 100644
--- a/native/spark-expr/src/kernels/mod.rs
+++ b/native/spark-expr/src/kernels/mod.rs
@@ -18,4 +18,4 @@
//! Kernels
pub mod strings;
-pub(crate) mod temporal;
+pub mod temporal;
diff --git a/native/spark-expr/src/kernels/temporal.rs
b/native/spark-expr/src/kernels/temporal.rs
index 09e2c905c..2668e5095 100644
--- a/native/spark-expr/src/kernels/temporal.rs
+++ b/native/spark-expr/src/kernels/temporal.rs
@@ -17,7 +17,7 @@
//! temporal kernels
-use chrono::{DateTime, Datelike, Duration, NaiveDateTime, Timelike, Utc};
+use chrono::{DateTime, Datelike, Duration, NaiveDate, Timelike, Utc};
use std::sync::Arc;
@@ -25,7 +25,7 @@ use arrow::array::{
downcast_dictionary_array, downcast_temporal_array,
temporal_conversions::*,
timezone::Tz,
- types::{ArrowDictionaryKeyType, ArrowTemporalType, Date32Type,
TimestampMicrosecondType},
+ types::{ArrowDictionaryKeyType, ArrowTemporalType,
TimestampMicrosecondType},
ArrowNumericType,
};
use arrow::{
@@ -46,47 +46,57 @@ macro_rules! return_compute_error_with {
// and the beginning of the Unix Epoch (1970-01-01)
const DAYS_TO_UNIX_EPOCH: i32 = 719_163;
-// Copied from arrow_arith/temporal.rs with modification to the output datatype
-// Transforms a array of NaiveDate to an array of Date32 after applying an
operation
-fn as_datetime_with_op<A: ArrayAccessor<Item = T::Native>, T:
ArrowTemporalType, F>(
- iter: ArrayIter<A>,
- mut builder: PrimitiveBuilder<Date32Type>,
- op: F,
-) -> Date32Array
-where
- F: Fn(NaiveDateTime) -> i32,
- i64: From<T::Native>,
-{
- iter.into_iter().for_each(|value| {
- if let Some(value) = value {
- match as_datetime::<T>(i64::from(value)) {
- Some(dt) => builder.append_value(op(dt)),
- None => builder.append_null(),
- }
- } else {
- builder.append_null();
- }
- });
+// Optimized date truncation functions that work directly with days since epoch
+// These avoid the overhead of converting to/from NaiveDateTime
- builder.finish()
+/// Convert days since Unix epoch to NaiveDate
+#[inline]
+fn days_to_date(days: i32) -> Option<NaiveDate> {
+ NaiveDate::from_num_days_from_ce_opt(days + DAYS_TO_UNIX_EPOCH)
}
+/// Truncate date to first day of year - optimized version
+/// Uses ordinal (day of year) to avoid creating a new date
#[inline]
-fn as_datetime_with_op_single<F>(
- value: Option<i32>,
- builder: &mut PrimitiveBuilder<Date32Type>,
- op: F,
-) where
- F: Fn(NaiveDateTime) -> i32,
-{
- if let Some(value) = value {
- match as_datetime::<Date32Type>(i64::from(value)) {
- Some(dt) => builder.append_value(op(dt)),
- None => builder.append_null(),
- }
- } else {
- builder.append_null();
- }
+fn trunc_days_to_year(days: i32) -> Option<i32> {
+ let date = days_to_date(days)?;
+ let day_of_year_offset = date.ordinal() as i32 - 1;
+ Some(days - day_of_year_offset)
+}
+
+/// Truncate date to first day of quarter - optimized version
+/// Computes offset from first day of quarter without creating a new date
+#[inline]
+fn trunc_days_to_quarter(days: i32) -> Option<i32> {
+ let date = days_to_date(days)?;
+ let month = date.month(); // 1-12
+ let quarter = (month - 1) / 3; // 0-3
+ let first_month_of_quarter = quarter * 3 + 1; // 1, 4, 7, or 10
+
+ // Find day of year for first day of quarter
+ let first_day_of_quarter = NaiveDate::from_ymd_opt(date.year(),
first_month_of_quarter, 1)?;
+ let quarter_start_ordinal = first_day_of_quarter.ordinal() as i32;
+ let current_ordinal = date.ordinal() as i32;
+
+ Some(days - (current_ordinal - quarter_start_ordinal))
+}
+
+/// Truncate date to first day of month - optimized version
+/// Instead of creating a new date, just subtract day offset
+#[inline]
+fn trunc_days_to_month(days: i32) -> Option<i32> {
+ let date = days_to_date(days)?;
+ let day_offset = date.day() as i32 - 1;
+ Some(days - day_offset)
+}
+
+/// Truncate date to first day of week (Monday) - optimized version
+#[inline]
+fn trunc_days_to_week(days: i32) -> Option<i32> {
+ let date = days_to_date(days)?;
+ // weekday().num_days_from_monday() gives 0 for Monday, 1 for Tuesday, etc.
+ let days_since_monday = date.weekday().num_days_from_monday() as i32;
+ Some(days - days_since_monday)
}
// Based on arrow_arith/temporal.rs:extract_component_from_datetime_array
@@ -143,11 +153,6 @@ where
Ok(())
}
-#[inline]
-fn as_days_from_unix_epoch(dt: Option<NaiveDateTime>) -> i32 {
- dt.unwrap().num_days_from_ce() - DAYS_TO_UNIX_EPOCH
-}
-
// Apply the Tz to the Naive Date Time,,convert to UTC, and return as
microseconds in Unix epoch
#[inline]
fn as_micros_from_unix_epoch_utc(dt: Option<DateTime<Tz>>) -> i64 {
@@ -251,7 +256,7 @@ fn trunc_date_to_microsec<T: Timelike>(dt: T) -> Option<T> {
/// array is an array of Date32 values. The array may be a dictionary array.
///
/// format is a scalar string specifying the format to apply to the
timestamp value.
-pub(crate) fn date_trunc_dyn(array: &dyn Array, format: String) ->
Result<ArrayRef, SparkError> {
+pub fn date_trunc_dyn(array: &dyn Array, format: String) -> Result<ArrayRef,
SparkError> {
match array.data_type().clone() {
DataType::Dictionary(_, _) => {
downcast_dictionary_array!(
@@ -282,34 +287,17 @@ where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
- let builder = Date32Builder::with_capacity(array.len());
- let iter = ArrayIter::new(array);
match array.data_type() {
- DataType::Date32 => match format.to_uppercase().as_str() {
- "YEAR" | "YYYY" | "YY" =>
Ok(as_datetime_with_op::<&PrimitiveArray<T>, T, _>(
- iter,
- builder,
- |dt| as_days_from_unix_epoch(trunc_date_to_year(dt)),
- )),
- "QUARTER" => Ok(as_datetime_with_op::<&PrimitiveArray<T>, T, _>(
- iter,
- builder,
- |dt| as_days_from_unix_epoch(trunc_date_to_quarter(dt)),
- )),
- "MONTH" | "MON" | "MM" =>
Ok(as_datetime_with_op::<&PrimitiveArray<T>, T, _>(
- iter,
- builder,
- |dt| as_days_from_unix_epoch(trunc_date_to_month(dt)),
- )),
- "WEEK" => Ok(as_datetime_with_op::<&PrimitiveArray<T>, T, _>(
- iter,
- builder,
- |dt| as_days_from_unix_epoch(trunc_date_to_week(dt)),
- )),
- _ => Err(SparkError::Internal(format!(
- "Unsupported format: {format:?} for function 'date_trunc'"
- ))),
- },
+ DataType::Date32 => {
+ // Use optimized path for Date32 that works directly with days
+ date_trunc_date32(
+ array
+ .as_any()
+ .downcast_ref::<Date32Array>()
+ .expect("Date32 type mismatch"),
+ format,
+ )
+ }
dt => return_compute_error_with!(
"Unsupported input type '{:?}' for function 'date_trunc'",
dt
@@ -317,6 +305,31 @@ where
}
}
+/// Optimized date truncation for Date32 arrays
+/// Works directly with days since epoch instead of converting to/from
NaiveDateTime
+fn date_trunc_date32(array: &Date32Array, format: String) ->
Result<Date32Array, SparkError> {
+ // Select the truncation function based on format
+ let trunc_fn: fn(i32) -> Option<i32> = match
format.to_uppercase().as_str() {
+ "YEAR" | "YYYY" | "YY" => trunc_days_to_year,
+ "QUARTER" => trunc_days_to_quarter,
+ "MONTH" | "MON" | "MM" => trunc_days_to_month,
+ "WEEK" => trunc_days_to_week,
+ _ => {
+ return Err(SparkError::Internal(format!(
+ "Unsupported format: {format:?} for function 'date_trunc'"
+ )))
+ }
+ };
+
+ // Apply truncation to each element
+ let result: Date32Array = array
+ .iter()
+ .map(|opt_days| opt_days.and_then(trunc_fn))
+ .collect();
+
+ Ok(result)
+}
+
///
/// Implements the spark
[TRUNC](https://spark.apache.org/docs/latest/api/sql/index.html#trunc)
/// function where the specified format may be an array
@@ -410,29 +423,23 @@ macro_rules! date_trunc_array_fmt_helper {
match $datatype {
DataType::Date32 => {
for (index, val) in iter.enumerate() {
- let op_result = match
$formats.value(index).to_uppercase().as_str() {
- "YEAR" | "YYYY" | "YY" => {
- Ok(as_datetime_with_op_single(val, &mut builder,
|dt| {
- as_days_from_unix_epoch(trunc_date_to_year(dt))
- }))
- }
- "QUARTER" => Ok(as_datetime_with_op_single(val, &mut
builder, |dt| {
- as_days_from_unix_epoch(trunc_date_to_quarter(dt))
- })),
- "MONTH" | "MON" | "MM" => {
- Ok(as_datetime_with_op_single(val, &mut builder,
|dt| {
-
as_days_from_unix_epoch(trunc_date_to_month(dt))
- }))
- }
- "WEEK" => Ok(as_datetime_with_op_single(val, &mut
builder, |dt| {
- as_days_from_unix_epoch(trunc_date_to_week(dt))
- })),
- _ => Err(SparkError::Internal(format!(
- "Unsupported format: {:?} for function
'date_trunc'",
- $formats.value(index)
- ))),
- };
- op_result?
+ let trunc_fn: fn(i32) -> Option<i32> =
+ match $formats.value(index).to_uppercase().as_str() {
+ "YEAR" | "YYYY" | "YY" => trunc_days_to_year,
+ "QUARTER" => trunc_days_to_quarter,
+ "MONTH" | "MON" | "MM" => trunc_days_to_month,
+ "WEEK" => trunc_days_to_week,
+ _ => {
+ return Err(SparkError::Internal(format!(
+ "Unsupported format: {:?} for function
'date_trunc'",
+ $formats.value(index)
+ )))
+ }
+ };
+ match val.and_then(trunc_fn) {
+ Some(days) => builder.append_value(days),
+ None => builder.append_null(),
+ }
}
Ok(builder.finish())
}
diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs
index 96e727ae5..f26fd911d 100644
--- a/native/spark-expr/src/lib.rs
+++ b/native/spark-expr/src/lib.rs
@@ -21,7 +21,8 @@
mod error;
-mod kernels;
+pub mod kernels;
+pub use kernels::temporal::date_trunc_dyn;
mod static_invoke;
pub use static_invoke::*;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]