This is an automated email from the ASF dual-hosted git repository.

jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new f1f6e5e463 Optimize `gcd` for array and scalar case by avoiding 
`make_scalar_function` where has unnecessary conversion between scalar and 
array (#14834)
f1f6e5e463 is described below

commit f1f6e5e46320094a8ef8c3e9ac710d7ff98c4d57
Author: Jay Zhan <jayzhan...@gmail.com>
AuthorDate: Tue Feb 25 22:39:46 2025 +0800

    Optimize `gcd` for array and scalar case by avoiding `make_scalar_function` 
where has unnecessary conversion between scalar and array (#14834)
    
    * optimize gcd
    
    * fmt
    
    * add feature
    
    * Use try_binary to make gcd even faster
    
    * rm length check
    
    ---------
    
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
---
 datafusion/functions/Cargo.toml             |   5 ++
 datafusion/functions/benches/gcd.rs         |  92 +++++++++++++++++++
 datafusion/functions/src/math/gcd.rs        | 135 ++++++++++++----------------
 datafusion/sqllogictest/test_files/math.slt |   4 +-
 4 files changed, 158 insertions(+), 78 deletions(-)

diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml
index b44127d6a1..3208f2dd16 100644
--- a/datafusion/functions/Cargo.toml
+++ b/datafusion/functions/Cargo.toml
@@ -113,6 +113,11 @@ harness = false
 name = "chr"
 required-features = ["string_expressions"]
 
+[[bench]]
+harness = false
+name = "gcd"
+required-features = ["math_expressions"]
+
 [[bench]]
 harness = false
 name = "uuid"
diff --git a/datafusion/functions/benches/gcd.rs 
b/datafusion/functions/benches/gcd.rs
new file mode 100644
index 0000000000..f8c855c82a
--- /dev/null
+++ b/datafusion/functions/benches/gcd.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+extern crate criterion;
+
+use arrow::{
+    array::{ArrayRef, Int64Array},
+    datatypes::DataType,
+};
+use criterion::{black_box, criterion_group, criterion_main, Criterion};
+use datafusion_common::ScalarValue;
+use datafusion_expr::{ColumnarValue, ScalarFunctionArgs};
+use datafusion_functions::math::gcd;
+use rand::Rng;
+use std::sync::Arc;
+
+fn generate_i64_array(n_rows: usize) -> ArrayRef {
+    let mut rng = rand::thread_rng();
+    let values = (0..n_rows)
+        .map(|_| rng.gen_range(0..1000))
+        .collect::<Vec<_>>();
+    Arc::new(Int64Array::from(values)) as ArrayRef
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+    let n_rows = 100000;
+    let array_a = ColumnarValue::Array(generate_i64_array(n_rows));
+    let array_b = ColumnarValue::Array(generate_i64_array(n_rows));
+    let udf = gcd();
+
+    c.bench_function("gcd both array", |b| {
+        b.iter(|| {
+            black_box(
+                udf.invoke_with_args(ScalarFunctionArgs {
+                    args: vec![array_a.clone(), array_b.clone()],
+                    number_rows: 0,
+                    return_type: &DataType::Int64,
+                })
+                .expect("date_bin should work on valid values"),
+            )
+        })
+    });
+
+    // 10! = 3628800
+    let scalar_b = ColumnarValue::Scalar(ScalarValue::Int64(Some(3628800)));
+
+    c.bench_function("gcd array and scalar", |b| {
+        b.iter(|| {
+            black_box(
+                udf.invoke_with_args(ScalarFunctionArgs {
+                    args: vec![array_a.clone(), scalar_b.clone()],
+                    number_rows: 0,
+                    return_type: &DataType::Int64,
+                })
+                .expect("date_bin should work on valid values"),
+            )
+        })
+    });
+
+    // scalar and scalar
+    let scalar_a = ColumnarValue::Scalar(ScalarValue::Int64(Some(3628800)));
+
+    c.bench_function("gcd both scalar", |b| {
+        b.iter(|| {
+            black_box(
+                udf.invoke_with_args(ScalarFunctionArgs {
+                    args: vec![scalar_a.clone(), scalar_b.clone()],
+                    number_rows: 0,
+                    return_type: &DataType::Int64,
+                })
+                .expect("date_bin should work on valid values"),
+            )
+        })
+    });
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/datafusion/functions/src/math/gcd.rs 
b/datafusion/functions/src/math/gcd.rs
index 911e00308a..7fe253b4af 100644
--- a/datafusion/functions/src/math/gcd.rs
+++ b/datafusion/functions/src/math/gcd.rs
@@ -15,19 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::array::{ArrayRef, Int64Array};
+use arrow::array::{new_null_array, ArrayRef, AsArray, Int64Array, 
PrimitiveArray};
+use arrow::compute::try_binary;
+use arrow::datatypes::{DataType, Int64Type};
 use arrow::error::ArrowError;
 use std::any::Any;
 use std::mem::swap;
 use std::sync::Arc;
 
-use arrow::datatypes::DataType;
-use arrow::datatypes::DataType::Int64;
-
-use crate::utils::make_scalar_function;
-use datafusion_common::{
-    arrow_datafusion_err, exec_err, internal_datafusion_err, DataFusionError, 
Result,
-};
+use datafusion_common::{exec_err, internal_datafusion_err, Result, 
ScalarValue};
 use datafusion_expr::{
     ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
     Volatility,
@@ -54,9 +50,12 @@ impl Default for GcdFunc {
 
 impl GcdFunc {
     pub fn new() -> Self {
-        use DataType::*;
         Self {
-            signature: Signature::uniform(2, vec![Int64], 
Volatility::Immutable),
+            signature: Signature::uniform(
+                2,
+                vec![DataType::Int64],
+                Volatility::Immutable,
+            ),
         }
     }
 }
@@ -75,11 +74,34 @@ impl ScalarUDFImpl for GcdFunc {
     }
 
     fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
-        Ok(Int64)
+        Ok(DataType::Int64)
     }
 
     fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
-        make_scalar_function(gcd, vec![])(&args.args)
+        let args: [ColumnarValue; 2] = args.args.try_into().map_err(|_| {
+            internal_datafusion_err!("Expected 2 arguments for function gcd")
+        })?;
+
+        match args {
+            [ColumnarValue::Array(a), ColumnarValue::Array(b)] => {
+                compute_gcd_for_arrays(&a, &b)
+            }
+            [ColumnarValue::Scalar(ScalarValue::Int64(a)), 
ColumnarValue::Scalar(ScalarValue::Int64(b))] => {
+                match (a, b) {
+                    (Some(a), Some(b)) => 
Ok(ColumnarValue::Scalar(ScalarValue::Int64(
+                        Some(compute_gcd(a, b)?),
+                    ))),
+                    _ => Ok(ColumnarValue::Scalar(ScalarValue::Int64(None))),
+                }
+            }
+            [ColumnarValue::Array(a), 
ColumnarValue::Scalar(ScalarValue::Int64(b))] => {
+                compute_gcd_with_scalar(&a, b)
+            }
+            [ColumnarValue::Scalar(ScalarValue::Int64(a)), 
ColumnarValue::Array(b)] => {
+                compute_gcd_with_scalar(&b, a)
+            }
+            _ => exec_err!("Unsupported argument types for function gcd"),
+        }
     }
 
     fn documentation(&self) -> Option<&Documentation> {
@@ -87,24 +109,34 @@ impl ScalarUDFImpl for GcdFunc {
     }
 }
 
-/// Gcd SQL function
-fn gcd(args: &[ArrayRef]) -> Result<ArrayRef> {
-    match args[0].data_type() {
-        Int64 => {
-            let arg1 = downcast_named_arg!(&args[0], "x", Int64Array);
-            let arg2 = downcast_named_arg!(&args[1], "y", Int64Array);
+fn compute_gcd_for_arrays(a: &ArrayRef, b: &ArrayRef) -> Result<ColumnarValue> 
{
+    let a = a.as_primitive::<Int64Type>();
+    let b = b.as_primitive::<Int64Type>();
+    try_binary(a, b, compute_gcd)
+        .map(|arr: PrimitiveArray<Int64Type>| {
+            ColumnarValue::Array(Arc::new(arr) as ArrayRef)
+        })
+        .map_err(Into::into) // convert ArrowError to DataFusionError
+}
 
-            Ok(arg1
+fn compute_gcd_with_scalar(arr: &ArrayRef, scalar: Option<i64>) -> 
Result<ColumnarValue> {
+    match scalar {
+        Some(scalar_value) => {
+            let result: Result<Int64Array> = arr
+                .as_primitive::<Int64Type>()
                 .iter()
-                .zip(arg2.iter())
-                .map(|(a1, a2)| match (a1, a2) {
-                    (Some(a1), Some(a2)) => Ok(Some(compute_gcd(a1, a2)?)),
+                .map(|val| match val {
+                    Some(val) => Ok(Some(compute_gcd(val, scalar_value)?)),
                     _ => Ok(None),
                 })
-                .collect::<Result<Int64Array>>()
-                .map(Arc::new)? as ArrayRef)
+                .collect();
+
+            result.map(|arr| ColumnarValue::Array(Arc::new(arr) as ArrayRef))
         }
-        other => exec_err!("Unsupported data type {other:?} for function gcd"),
+        None => Ok(ColumnarValue::Array(new_null_array(
+            &DataType::Int64,
+            arr.len(),
+        ))),
     }
 }
 
@@ -132,61 +164,12 @@ pub(super) fn unsigned_gcd(mut a: u64, mut b: u64) -> u64 
{
 }
 
 /// Computes greatest common divisor using Binary GCD algorithm.
-pub fn compute_gcd(x: i64, y: i64) -> Result<i64> {
+pub fn compute_gcd(x: i64, y: i64) -> Result<i64, ArrowError> {
     let a = x.unsigned_abs();
     let b = y.unsigned_abs();
     let r = unsigned_gcd(a, b);
     // gcd(i64::MIN, i64::MIN) = i64::MIN.unsigned_abs() cannot fit into i64
     r.try_into().map_err(|_| {
-        arrow_datafusion_err!(ArrowError::ComputeError(format!(
-            "Signed integer overflow in GCD({x}, {y})"
-        )))
+        ArrowError::ComputeError(format!("Signed integer overflow in GCD({x}, 
{y})"))
     })
 }
-
-#[cfg(test)]
-mod test {
-    use std::sync::Arc;
-
-    use arrow::{
-        array::{ArrayRef, Int64Array},
-        error::ArrowError,
-    };
-
-    use crate::math::gcd::gcd;
-    use datafusion_common::{cast::as_int64_array, DataFusionError};
-
-    #[test]
-    fn test_gcd_i64() {
-        let args: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![0, 3, 25, -16])), // x
-            Arc::new(Int64Array::from(vec![0, -2, 15, 8])),  // y
-        ];
-
-        let result = gcd(&args).expect("failed to initialize function gcd");
-        let ints = as_int64_array(&result).expect("failed to initialize 
function gcd");
-
-        assert_eq!(ints.len(), 4);
-        assert_eq!(ints.value(0), 0);
-        assert_eq!(ints.value(1), 1);
-        assert_eq!(ints.value(2), 5);
-        assert_eq!(ints.value(3), 8);
-    }
-
-    #[test]
-    fn overflow_on_both_param_i64_min() {
-        let args: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![i64::MIN])), // x
-            Arc::new(Int64Array::from(vec![i64::MIN])), // y
-        ];
-
-        match gcd(&args) {
-            // we expect a overflow
-            Err(DataFusionError::ArrowError(ArrowError::ComputeError(_), _)) 
=> {}
-            Err(_) => {
-                panic!("failed to initialize function gcd")
-            }
-            Ok(_) => panic!("GCD({0}, {0}) should have overflown", i64::MIN),
-        };
-    }
-}
diff --git a/datafusion/sqllogictest/test_files/math.slt 
b/datafusion/sqllogictest/test_files/math.slt
index a3cf1a4e57..a49e0a6421 100644
--- a/datafusion/sqllogictest/test_files/math.slt
+++ b/datafusion/sqllogictest/test_files/math.slt
@@ -623,12 +623,12 @@ select
 1 1 1
 
 # gcd with columns and expresions
-query II rowsort
+query II
 select gcd(a, b), gcd(c*d + 1, abs(e)) + f from signed_integers;
 ----
 1 11
-1 13
 2 -10
+1 13
 NULL NULL
 
 # gcd(i64::MIN, i64::MIN)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to