This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 6ec70aeaa perf: Improve performance of normalize_nan (#2999)
6ec70aeaa is described below
commit 6ec70aeaad85ebfc114ffc211199a8b07533f965
Author: Andy Grove <[email protected]>
AuthorDate: Fri Jan 2 10:56:04 2026 -0700
perf: Improve performance of normalize_nan (#2999)
---
native/spark-expr/Cargo.toml | 4 +
native/spark-expr/benches/normalize_nan.rs | 88 ++++++++++++++++++++++
.../src/math_funcs/internal/normalize_nan.rs | 80 ++++++--------------
3 files changed, 114 insertions(+), 58 deletions(-)
diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml
index ea89c4320..b056e1b29 100644
--- a/native/spark-expr/Cargo.toml
+++ b/native/spark-expr/Cargo.toml
@@ -80,6 +80,10 @@ harness = false
name = "padding"
harness = false
+[[bench]]
+name = "normalize_nan"
+harness = false
+
[[test]]
name = "test_udf_registration"
path = "tests/spark_expr_reg.rs"
diff --git a/native/spark-expr/benches/normalize_nan.rs
b/native/spark-expr/benches/normalize_nan.rs
new file mode 100644
index 000000000..17413e7f0
--- /dev/null
+++ b/native/spark-expr/benches/normalize_nan.rs
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmarks for NormalizeNaNAndZero expression
+
+use arrow::array::Float64Array;
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::record_batch::RecordBatch;
+use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
+use datafusion::physical_expr::expressions::Column;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion_comet_spark_expr::NormalizeNaNAndZero;
+use std::hint::black_box;
+use std::sync::Arc;
+
+const BATCH_SIZE: usize = 8192;
+
+fn make_col(name: &str, index: usize) -> Arc<dyn PhysicalExpr> {
+ Arc::new(Column::new(name, index))
+}
+
+/// Create a batch with float64 column containing various values including NaN
and -0.0
+fn create_float_batch(nan_pct: usize, neg_zero_pct: usize, null_pct: usize) ->
RecordBatch {
+ let mut values: Vec<Option<f64>> = Vec::with_capacity(BATCH_SIZE);
+
+ for i in 0..BATCH_SIZE {
+ if null_pct > 0 && i % (100 / null_pct.max(1)) == 0 {
+ values.push(None);
+ } else if nan_pct > 0 && i % (100 / nan_pct.max(1)) == 1 {
+ values.push(Some(f64::NAN));
+ } else if neg_zero_pct > 0 && i % (100 / neg_zero_pct.max(1)) == 2 {
+ values.push(Some(-0.0));
+ } else {
+ values.push(Some(i as f64 * 1.5));
+ }
+ }
+
+ let array = Float64Array::from(values);
+ let schema = Schema::new(vec![Field::new("c1", DataType::Float64, true)]);
+
+ RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
+}
+
+fn bench_normalize_nan_and_zero(c: &mut Criterion) {
+ let mut group = c.benchmark_group("normalize_nan_and_zero");
+
+ // Test with different percentages of special values
+ let test_cases = [
+ ("no_special", 0, 0, 0),
+ ("10pct_nan", 10, 0, 0),
+ ("10pct_neg_zero", 0, 10, 0),
+ ("10pct_null", 0, 0, 10),
+ ("mixed_10pct", 5, 5, 5),
+ ("all_normal", 0, 0, 0),
+ ];
+
+ for (name, nan_pct, neg_zero_pct, null_pct) in test_cases {
+ let batch = create_float_batch(nan_pct, neg_zero_pct, null_pct);
+
+ let normalize_expr = Arc::new(NormalizeNaNAndZero::new(
+ DataType::Float64,
+ make_col("c1", 0),
+ ));
+
+ group.bench_with_input(BenchmarkId::new("float64", name), &batch, |b,
batch| {
+ b.iter(||
black_box(normalize_expr.evaluate(black_box(batch)).unwrap()));
+ });
+ }
+
+ group.finish();
+}
+
+criterion_group!(benches, bench_normalize_nan_and_zero);
+criterion_main!(benches);
diff --git a/native/spark-expr/src/math_funcs/internal/normalize_nan.rs
b/native/spark-expr/src/math_funcs/internal/normalize_nan.rs
index 0bd556ed7..4094bd762 100644
--- a/native/spark-expr/src/math_funcs/internal/normalize_nan.rs
+++ b/native/spark-expr/src/math_funcs/internal/normalize_nan.rs
@@ -15,10 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+use arrow::compute::unary;
use arrow::datatypes::{DataType, Schema};
use arrow::{
- array::{as_primitive_array, ArrayAccessor, ArrayIter, Float32Array,
Float64Array},
- datatypes::{ArrowNativeType, Float32Type, Float64Type},
+ array::{as_primitive_array, Float32Array, Float64Array},
+ datatypes::{Float32Type, Float64Type},
record_batch::RecordBatch,
};
use datafusion::logical_expr::ColumnarValue;
@@ -78,14 +79,16 @@ impl PhysicalExpr for NormalizeNaNAndZero {
match &self.data_type {
DataType::Float32 => {
- let v = eval_typed(as_primitive_array::<Float32Type>(&array));
- let new_array = Float32Array::from(v);
- Ok(ColumnarValue::Array(Arc::new(new_array)))
+ let input = as_primitive_array::<Float32Type>(&array);
+ // Use unary which operates directly on values buffer without
intermediate allocation
+ let result: Float32Array = unary(input, normalize_float);
+ Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Float64 => {
- let v = eval_typed(as_primitive_array::<Float64Type>(&array));
- let new_array = Float64Array::from(v);
- Ok(ColumnarValue::Array(Arc::new(new_array)))
+ let input = as_primitive_array::<Float64Type>(&array);
+ // Use unary which operates directly on values buffer without
intermediate allocation
+ let result: Float64Array = unary(input, normalize_float);
+ Ok(ColumnarValue::Array(Arc::new(result)))
}
dt => panic!("Unexpected data type {dt:?}"),
}
@@ -106,20 +109,17 @@ impl PhysicalExpr for NormalizeNaNAndZero {
}
}
-fn eval_typed<V: FloatDouble, T: ArrayAccessor<Item = V>>(input: T) ->
Vec<Option<V>> {
- let iter = ArrayIter::new(input);
- iter.map(|o| {
- o.map(|v| {
- if v.is_nan() {
- v.nan()
- } else if v.is_neg_zero() {
- v.zero()
- } else {
- v
- }
- })
- })
- .collect()
+/// Normalize a floating point value by converting all NaN representations to
a canonical NaN
+/// and negative zero to positive zero. This is used for Spark's comparison
semantics.
+#[inline]
+fn normalize_float<T: num::Float>(v: T) -> T {
+ if v.is_nan() {
+ T::nan()
+ } else if v == T::neg_zero() {
+ T::zero()
+ } else {
+ v
+ }
}
impl Display for NormalizeNaNAndZero {
@@ -127,39 +127,3 @@ impl Display for NormalizeNaNAndZero {
write!(f, "FloatNormalize [child: {}]", self.child)
}
}
-
-trait FloatDouble: ArrowNativeType {
- fn is_nan(&self) -> bool;
- fn nan(&self) -> Self;
- fn is_neg_zero(&self) -> bool;
- fn zero(&self) -> Self;
-}
-
-impl FloatDouble for f32 {
- fn is_nan(&self) -> bool {
- f32::is_nan(*self)
- }
- fn nan(&self) -> Self {
- f32::NAN
- }
- fn is_neg_zero(&self) -> bool {
- *self == -0.0
- }
- fn zero(&self) -> Self {
- 0.0
- }
-}
-impl FloatDouble for f64 {
- fn is_nan(&self) -> bool {
- f64::is_nan(*self)
- }
- fn nan(&self) -> Self {
- f64::NAN
- }
- fn is_neg_zero(&self) -> bool {
- *self == -0.0
- }
- fn zero(&self) -> Self {
- 0.0
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]