This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 6ec70aeaa perf: Improve performance of normalize_nan (#2999)
6ec70aeaa is described below

commit 6ec70aeaad85ebfc114ffc211199a8b07533f965
Author: Andy Grove <[email protected]>
AuthorDate: Fri Jan 2 10:56:04 2026 -0700

    perf: Improve performance of normalize_nan (#2999)
---
 native/spark-expr/Cargo.toml                       |  4 +
 native/spark-expr/benches/normalize_nan.rs         | 88 ++++++++++++++++++++++
 .../src/math_funcs/internal/normalize_nan.rs       | 80 ++++++--------------
 3 files changed, 114 insertions(+), 58 deletions(-)

diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml
index ea89c4320..b056e1b29 100644
--- a/native/spark-expr/Cargo.toml
+++ b/native/spark-expr/Cargo.toml
@@ -80,6 +80,10 @@ harness = false
 name = "padding"
 harness = false
 
+[[bench]]
+name = "normalize_nan"
+harness = false
+
 [[test]]
 name = "test_udf_registration"
 path = "tests/spark_expr_reg.rs"
diff --git a/native/spark-expr/benches/normalize_nan.rs 
b/native/spark-expr/benches/normalize_nan.rs
new file mode 100644
index 000000000..17413e7f0
--- /dev/null
+++ b/native/spark-expr/benches/normalize_nan.rs
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmarks for NormalizeNaNAndZero expression
+
+use arrow::array::Float64Array;
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::record_batch::RecordBatch;
+use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
+use datafusion::physical_expr::expressions::Column;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion_comet_spark_expr::NormalizeNaNAndZero;
+use std::hint::black_box;
+use std::sync::Arc;
+
+const BATCH_SIZE: usize = 8192;
+
+fn make_col(name: &str, index: usize) -> Arc<dyn PhysicalExpr> {
+    Arc::new(Column::new(name, index))
+}
+
+/// Create a batch with float64 column containing various values including NaN 
and -0.0
+fn create_float_batch(nan_pct: usize, neg_zero_pct: usize, null_pct: usize) -> 
RecordBatch {
+    let mut values: Vec<Option<f64>> = Vec::with_capacity(BATCH_SIZE);
+
+    for i in 0..BATCH_SIZE {
+        if null_pct > 0 && i % (100 / null_pct.max(1)) == 0 {
+            values.push(None);
+        } else if nan_pct > 0 && i % (100 / nan_pct.max(1)) == 1 {
+            values.push(Some(f64::NAN));
+        } else if neg_zero_pct > 0 && i % (100 / neg_zero_pct.max(1)) == 2 {
+            values.push(Some(-0.0));
+        } else {
+            values.push(Some(i as f64 * 1.5));
+        }
+    }
+
+    let array = Float64Array::from(values);
+    let schema = Schema::new(vec![Field::new("c1", DataType::Float64, true)]);
+
+    RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
+}
+
+fn bench_normalize_nan_and_zero(c: &mut Criterion) {
+    let mut group = c.benchmark_group("normalize_nan_and_zero");
+
+    // Test with different percentages of special values
+    let test_cases = [
+        ("no_special", 0, 0, 0),
+        ("10pct_nan", 10, 0, 0),
+        ("10pct_neg_zero", 0, 10, 0),
+        ("10pct_null", 0, 0, 10),
+        ("mixed_10pct", 5, 5, 5),
+        ("all_normal", 0, 0, 0),
+    ];
+
+    for (name, nan_pct, neg_zero_pct, null_pct) in test_cases {
+        let batch = create_float_batch(nan_pct, neg_zero_pct, null_pct);
+
+        let normalize_expr = Arc::new(NormalizeNaNAndZero::new(
+            DataType::Float64,
+            make_col("c1", 0),
+        ));
+
+        group.bench_with_input(BenchmarkId::new("float64", name), &batch, |b, 
batch| {
+            b.iter(|| 
black_box(normalize_expr.evaluate(black_box(batch)).unwrap()));
+        });
+    }
+
+    group.finish();
+}
+
+criterion_group!(benches, bench_normalize_nan_and_zero);
+criterion_main!(benches);
diff --git a/native/spark-expr/src/math_funcs/internal/normalize_nan.rs 
b/native/spark-expr/src/math_funcs/internal/normalize_nan.rs
index 0bd556ed7..4094bd762 100644
--- a/native/spark-expr/src/math_funcs/internal/normalize_nan.rs
+++ b/native/spark-expr/src/math_funcs/internal/normalize_nan.rs
@@ -15,10 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use arrow::compute::unary;
 use arrow::datatypes::{DataType, Schema};
 use arrow::{
-    array::{as_primitive_array, ArrayAccessor, ArrayIter, Float32Array, 
Float64Array},
-    datatypes::{ArrowNativeType, Float32Type, Float64Type},
+    array::{as_primitive_array, Float32Array, Float64Array},
+    datatypes::{Float32Type, Float64Type},
     record_batch::RecordBatch,
 };
 use datafusion::logical_expr::ColumnarValue;
@@ -78,14 +79,16 @@ impl PhysicalExpr for NormalizeNaNAndZero {
 
         match &self.data_type {
             DataType::Float32 => {
-                let v = eval_typed(as_primitive_array::<Float32Type>(&array));
-                let new_array = Float32Array::from(v);
-                Ok(ColumnarValue::Array(Arc::new(new_array)))
+                let input = as_primitive_array::<Float32Type>(&array);
+                // Use unary which operates directly on values buffer without 
intermediate allocation
+                let result: Float32Array = unary(input, normalize_float);
+                Ok(ColumnarValue::Array(Arc::new(result)))
             }
             DataType::Float64 => {
-                let v = eval_typed(as_primitive_array::<Float64Type>(&array));
-                let new_array = Float64Array::from(v);
-                Ok(ColumnarValue::Array(Arc::new(new_array)))
+                let input = as_primitive_array::<Float64Type>(&array);
+                // Use unary which operates directly on values buffer without 
intermediate allocation
+                let result: Float64Array = unary(input, normalize_float);
+                Ok(ColumnarValue::Array(Arc::new(result)))
             }
             dt => panic!("Unexpected data type {dt:?}"),
         }
@@ -106,20 +109,17 @@ impl PhysicalExpr for NormalizeNaNAndZero {
     }
 }
 
-fn eval_typed<V: FloatDouble, T: ArrayAccessor<Item = V>>(input: T) -> 
Vec<Option<V>> {
-    let iter = ArrayIter::new(input);
-    iter.map(|o| {
-        o.map(|v| {
-            if v.is_nan() {
-                v.nan()
-            } else if v.is_neg_zero() {
-                v.zero()
-            } else {
-                v
-            }
-        })
-    })
-    .collect()
+/// Normalize a floating point value by converting all NaN representations to 
a canonical NaN
+/// and negative zero to positive zero. This is used for Spark's comparison 
semantics.
+#[inline]
+fn normalize_float<T: num::Float>(v: T) -> T {
+    if v.is_nan() {
+        T::nan()
+    } else if v == T::neg_zero() {
+        T::zero()
+    } else {
+        v
+    }
 }
 
 impl Display for NormalizeNaNAndZero {
@@ -127,39 +127,3 @@ impl Display for NormalizeNaNAndZero {
         write!(f, "FloatNormalize [child: {}]", self.child)
     }
 }
-
-trait FloatDouble: ArrowNativeType {
-    fn is_nan(&self) -> bool;
-    fn nan(&self) -> Self;
-    fn is_neg_zero(&self) -> bool;
-    fn zero(&self) -> Self;
-}
-
-impl FloatDouble for f32 {
-    fn is_nan(&self) -> bool {
-        f32::is_nan(*self)
-    }
-    fn nan(&self) -> Self {
-        f32::NAN
-    }
-    fn is_neg_zero(&self) -> bool {
-        *self == -0.0
-    }
-    fn zero(&self) -> Self {
-        0.0
-    }
-}
-impl FloatDouble for f64 {
-    fn is_nan(&self) -> bool {
-        f64::is_nan(*self)
-    }
-    fn nan(&self) -> Self {
-        f64::NAN
-    }
-    fn is_neg_zero(&self) -> bool {
-        *self == -0.0
-    }
-    fn zero(&self) -> Self {
-        0.0
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to