This is an automated email from the ASF dual-hosted git repository.

jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new c9bd2910c3 ScalarUDF: Remove `supports_zero_argument` and avoid 
creating null array for empty args (#10193)
c9bd2910c3 is described below

commit c9bd2910c353ffe4efcf888737b6ee4011f172a6
Author: Jay Zhan <[email protected]>
AuthorDate: Fri Apr 26 20:37:58 2024 +0800

    ScalarUDF: Remove `supports_zero_argument` and avoid creating null array 
for empty args (#10193)
    
    * Avoid create null array for empty args
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * fix test
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * fix test
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * return scalar instead of array
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * remove supports 0 args in scalarudf
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * cleanup
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * rm test1
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * invoke no args and support randomness
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * rm randomness
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * add func with no args
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * array
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    ---------
    
    Signed-off-by: jayzhan211 <[email protected]>
---
 .../src/physical_optimizer/projection_pushdown.rs  |   4 -
 .../user_defined/user_defined_scalar_functions.rs  | 202 +++++++--------------
 datafusion/expr/src/type_coercion/functions.rs     |   1 +
 datafusion/expr/src/udf.rs                         |  29 ++-
 datafusion/functions-array/src/make_array.rs       |   4 +
 datafusion/functions/src/math/pi.rs                |  18 +-
 datafusion/functions/src/math/random.rs            |  48 +----
 datafusion/functions/src/string/uuid.rs            |  16 +-
 datafusion/physical-expr/src/functions.rs          |   1 -
 datafusion/physical-expr/src/scalar_function.rs    |  34 ++--
 datafusion/physical-expr/src/udf.rs                |   1 -
 datafusion/proto/src/physical_plan/from_proto.rs   |   2 -
 .../proto/tests/cases/roundtrip_physical_plan.rs   |   2 -
 13 files changed, 130 insertions(+), 232 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs 
b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index 359916de0f..f07cf1fc6f 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -1379,7 +1379,6 @@ mod tests {
                 ],
                 DataType::Int32,
                 None,
-                false,
             )),
             Arc::new(CaseExpr::try_new(
                 Some(Arc::new(Column::new("d", 2))),
@@ -1448,7 +1447,6 @@ mod tests {
                 ],
                 DataType::Int32,
                 None,
-                false,
             )),
             Arc::new(CaseExpr::try_new(
                 Some(Arc::new(Column::new("d", 3))),
@@ -1520,7 +1518,6 @@ mod tests {
                 ],
                 DataType::Int32,
                 None,
-                false,
             )),
             Arc::new(CaseExpr::try_new(
                 Some(Arc::new(Column::new("d", 2))),
@@ -1589,7 +1586,6 @@ mod tests {
                 ],
                 DataType::Int32,
                 None,
-                false,
             )),
             Arc::new(CaseExpr::try_new(
                 Some(Arc::new(Column::new("d_new", 3))),
diff --git 
a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs 
b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
index c40573a8df..4f262b54fb 100644
--- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
+++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
@@ -16,10 +16,7 @@
 // under the License.
 
 use arrow::compute::kernels::numeric::add;
-use arrow_array::{
-    Array, ArrayRef, Float32Array, Float64Array, Int32Array, RecordBatch, 
UInt8Array,
-};
-use arrow_schema::DataType::Float64;
+use arrow_array::{ArrayRef, Float32Array, Float64Array, Int32Array, 
RecordBatch};
 use arrow_schema::{DataType, Field, Schema};
 use datafusion::execution::context::{FunctionFactory, RegisterFunction, 
SessionState};
 use datafusion::prelude::*;
@@ -36,9 +33,7 @@ use datafusion_expr::{
     Accumulator, ColumnarValue, CreateFunction, ExprSchemable, 
LogicalPlanBuilder,
     ScalarUDF, ScalarUDFImpl, Signature, Volatility,
 };
-use rand::{thread_rng, Rng};
 use std::any::Any;
-use std::iter;
 use std::sync::Arc;
 
 /// test that casting happens on udfs.
@@ -168,6 +163,48 @@ async fn scalar_udf() -> Result<()> {
     Ok(())
 }
 
+struct Simple0ArgsScalarUDF {
+    name: String,
+    signature: Signature,
+    return_type: DataType,
+}
+
+impl std::fmt::Debug for Simple0ArgsScalarUDF {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        f.debug_struct("ScalarUDF")
+            .field("name", &self.name)
+            .field("signature", &self.signature)
+            .field("fun", &"<FUNC>")
+            .finish()
+    }
+}
+
+impl ScalarUDFImpl for Simple0ArgsScalarUDF {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+        Ok(self.return_type.clone())
+    }
+
+    fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        not_impl_err!("{} function does not accept arguments", self.name())
+    }
+
+    fn invoke_no_args(&self, _number_rows: usize) -> Result<ColumnarValue> {
+        Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(100))))
+    }
+}
+
 #[tokio::test]
 async fn scalar_udf_zero_params() -> Result<()> {
     let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
@@ -179,20 +216,14 @@ async fn scalar_udf_zero_params() -> Result<()> {
     let ctx = SessionContext::new();
 
     ctx.register_batch("t", batch)?;
-    // create function just returns 100 regardless of inp
-    let myfunc = Arc::new(|_args: &[ColumnarValue]| {
-        Ok(ColumnarValue::Array(
-            Arc::new((0..1).map(|_| 100).collect::<Int32Array>()) as ArrayRef,
-        ))
-    });
 
-    ctx.register_udf(create_udf(
-        "get_100",
-        vec![],
-        Arc::new(DataType::Int32),
-        Volatility::Immutable,
-        myfunc,
-    ));
+    let get_100_udf = Simple0ArgsScalarUDF {
+        name: "get_100".to_string(),
+        signature: Signature::exact(vec![], Volatility::Immutable),
+        return_type: DataType::Int32,
+    };
+
+    ctx.register_udf(ScalarUDF::from(get_100_udf));
 
     let result = plan_and_collect(&ctx, "select get_100() a from t").await?;
     let expected = [
@@ -403,123 +434,6 @@ async fn test_user_defined_functions_with_alias() -> 
Result<()> {
     Ok(())
 }
 
-#[derive(Debug)]
-pub struct RandomUDF {
-    signature: Signature,
-}
-
-impl RandomUDF {
-    pub fn new() -> Self {
-        Self {
-            signature: Signature::any(0, Volatility::Volatile),
-        }
-    }
-}
-
-impl ScalarUDFImpl for RandomUDF {
-    fn as_any(&self) -> &dyn std::any::Any {
-        self
-    }
-
-    fn name(&self) -> &str {
-        "random_udf"
-    }
-
-    fn signature(&self) -> &Signature {
-        &self.signature
-    }
-
-    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
-        Ok(Float64)
-    }
-
-    fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
-        let len: usize = match &args[0] {
-            // This udf is always invoked with zero argument so its argument
-            // is a null array indicating the batch size.
-            ColumnarValue::Array(array) if array.data_type().is_null() => 
array.len(),
-            _ => {
-                return Err(datafusion::error::DataFusionError::Internal(
-                    "Invalid argument type".to_string(),
-                ))
-            }
-        };
-        let mut rng = thread_rng();
-        let values = iter::repeat_with(|| rng.gen_range(0.1..1.0)).take(len);
-        let array = Float64Array::from_iter_values(values);
-        Ok(ColumnarValue::Array(Arc::new(array)))
-    }
-}
-
-/// Ensure that a user defined function with zero argument will be invoked
-/// with a null array indicating the batch size.
-#[tokio::test]
-async fn test_user_defined_functions_zero_argument() -> Result<()> {
-    let ctx = SessionContext::new();
-
-    let schema = Arc::new(Schema::new(vec![Field::new(
-        "index",
-        DataType::UInt8,
-        false,
-    )]));
-
-    let batch = RecordBatch::try_new(
-        schema,
-        vec![Arc::new(UInt8Array::from_iter_values([1, 2, 3]))],
-    )?;
-
-    ctx.register_batch("data_table", batch)?;
-
-    let random_normal_udf = ScalarUDF::from(RandomUDF::new());
-    ctx.register_udf(random_normal_udf);
-
-    let result = plan_and_collect(
-        &ctx,
-        "SELECT random_udf() AS random_udf, random() AS native_random FROM 
data_table",
-    )
-    .await?;
-
-    assert_eq!(result.len(), 1);
-    let batch = &result[0];
-    let random_udf = batch
-        .column(0)
-        .as_any()
-        .downcast_ref::<Float64Array>()
-        .unwrap();
-    let native_random = batch
-        .column(1)
-        .as_any()
-        .downcast_ref::<Float64Array>()
-        .unwrap();
-
-    assert_eq!(random_udf.len(), native_random.len());
-
-    let mut previous = -1.0;
-    for i in 0..random_udf.len() {
-        assert!(random_udf.value(i) >= 0.0 && random_udf.value(i) < 1.0);
-        assert!(random_udf.value(i) != previous);
-        previous = random_udf.value(i);
-    }
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn deregister_udf() -> Result<()> {
-    let random_normal_udf = ScalarUDF::from(RandomUDF::new());
-    let ctx = SessionContext::new();
-
-    ctx.register_udf(random_normal_udf.clone());
-
-    assert!(ctx.udfs().contains("random_udf"));
-
-    ctx.deregister_udf("random_udf");
-
-    assert!(!ctx.udfs().contains("random_udf"));
-
-    Ok(())
-}
-
 #[derive(Debug)]
 struct CastToI64UDF {
     signature: Signature,
@@ -615,6 +529,22 @@ async fn test_user_defined_functions_cast_to_i64() -> 
Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn deregister_udf() -> Result<()> {
+    let cast2i64 = ScalarUDF::from(CastToI64UDF::new());
+    let ctx = SessionContext::new();
+
+    ctx.register_udf(cast2i64.clone());
+
+    assert!(ctx.udfs().contains("cast_to_i64"));
+
+    ctx.deregister_udf("cast_to_i64");
+
+    assert!(!ctx.udfs().contains("cast_to_i64"));
+
+    Ok(())
+}
+
 #[derive(Debug)]
 struct TakeUDF {
     signature: Signature,
diff --git a/datafusion/expr/src/type_coercion/functions.rs 
b/datafusion/expr/src/type_coercion/functions.rs
index 07516c1f6f..eb4f325ff8 100644
--- a/datafusion/expr/src/type_coercion/functions.rs
+++ b/datafusion/expr/src/type_coercion/functions.rs
@@ -52,6 +52,7 @@ pub fn data_types(
             );
         }
     }
+
     let valid_types = get_valid_types(&signature.type_signature, 
current_types)?;
 
     if valid_types
diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs
index 4557fe60a4..c9c11a6bbf 100644
--- a/datafusion/expr/src/udf.rs
+++ b/datafusion/expr/src/udf.rs
@@ -23,7 +23,7 @@ use crate::{
     ScalarFunctionImplementation, Signature,
 };
 use arrow::datatypes::DataType;
-use datafusion_common::{ExprSchema, Result};
+use datafusion_common::{not_impl_err, ExprSchema, Result};
 use std::any::Any;
 use std::fmt;
 use std::fmt::Debug;
@@ -180,6 +180,13 @@ impl ScalarUDF {
         self.inner.invoke(args)
     }
 
+    /// Invoke the function without `args` but number of rows, returning the 
appropriate result.
+    ///
+    /// See [`ScalarUDFImpl::invoke_no_args`] for more details.
+    pub fn invoke_no_args(&self, number_rows: usize) -> Result<ColumnarValue> {
+        self.inner.invoke_no_args(number_rows)
+    }
+
     /// Returns a `ScalarFunctionImplementation` that can invoke the function
     /// during execution
     pub fn fun(&self) -> ScalarFunctionImplementation {
@@ -322,10 +329,9 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
     /// The function will be invoked passed with the slice of [`ColumnarValue`]
     /// (either scalar or array).
     ///
-    /// # Zero Argument Functions
-    /// If the function has zero parameters (e.g. `now()`) it will be passed a
-    /// single element slice which is a a null array to indicate the batch's 
row
-    /// count (so the function can know the resulting array size).
+    /// If the function does not take any arguments, please use 
[invoke_no_args]
+    /// instead and return [not_impl_err] for this function.
+    ///
     ///
     /// # Performance
     ///
@@ -335,7 +341,18 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
     ///
     /// [`ColumnarValue::values_to_arrays`] can be used to convert the 
arguments
     /// to arrays, which will likely be simpler code, but be slower.
-    fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue>;
+    ///
+    /// [invoke_no_args]: ScalarUDFImpl::invoke_no_args
+    fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue>;
+
+    /// Invoke the function without `args`, instead the number of rows are 
provided,
+    /// returning the appropriate result.
+    fn invoke_no_args(&self, _number_rows: usize) -> Result<ColumnarValue> {
+        not_impl_err!(
+            "Function {} does not implement invoke_no_args but called",
+            self.name()
+        )
+    }
 
     /// Returns any aliases (alternate names) for this function.
     ///
diff --git a/datafusion/functions-array/src/make_array.rs 
b/datafusion/functions-array/src/make_array.rs
index 0439a736ee..770276938f 100644
--- a/datafusion/functions-array/src/make_array.rs
+++ b/datafusion/functions-array/src/make_array.rs
@@ -104,6 +104,10 @@ impl ScalarUDFImpl for MakeArray {
         make_scalar_function(make_array_inner)(args)
     }
 
+    fn invoke_no_args(&self, _number_rows: usize) -> Result<ColumnarValue> {
+        make_scalar_function(make_array_inner)(&[])
+    }
+
     fn aliases(&self) -> &[String] {
         &self.aliases
     }
diff --git a/datafusion/functions/src/math/pi.rs 
b/datafusion/functions/src/math/pi.rs
index 0801e79751..f9403e411f 100644
--- a/datafusion/functions/src/math/pi.rs
+++ b/datafusion/functions/src/math/pi.rs
@@ -16,13 +16,11 @@
 // under the License.
 
 use std::any::Any;
-use std::sync::Arc;
 
-use arrow::array::Float64Array;
 use arrow::datatypes::DataType;
 use arrow::datatypes::DataType::Float64;
 
-use datafusion_common::{exec_err, Result};
+use datafusion_common::{not_impl_err, Result, ScalarValue};
 use datafusion_expr::{ColumnarValue, FuncMonotonicity, Volatility};
 use datafusion_expr::{ScalarUDFImpl, Signature};
 
@@ -62,12 +60,14 @@ impl ScalarUDFImpl for PiFunc {
         Ok(Float64)
     }
 
-    fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
-        if !matches!(&args[0], ColumnarValue::Array(_)) {
-            return exec_err!("Expect pi function to take no param");
-        }
-        let array = Float64Array::from_value(std::f64::consts::PI, 1);
-        Ok(ColumnarValue::Array(Arc::new(array)))
+    fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        not_impl_err!("{} function does not accept arguments", self.name())
+    }
+
+    fn invoke_no_args(&self, _number_rows: usize) -> Result<ColumnarValue> {
+        Ok(ColumnarValue::Scalar(ScalarValue::Float64(Some(
+            std::f64::consts::PI,
+        ))))
     }
 
     fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
diff --git a/datafusion/functions/src/math/random.rs 
b/datafusion/functions/src/math/random.rs
index 2c1ad41367..b5eece212a 100644
--- a/datafusion/functions/src/math/random.rs
+++ b/datafusion/functions/src/math/random.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use std::any::Any;
-use std::iter;
 use std::sync::Arc;
 
 use arrow::array::Float64Array;
@@ -24,7 +23,7 @@ use arrow::datatypes::DataType;
 use arrow::datatypes::DataType::Float64;
 use rand::{thread_rng, Rng};
 
-use datafusion_common::{exec_err, Result};
+use datafusion_common::{not_impl_err, Result};
 use datafusion_expr::ColumnarValue;
 use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
 
@@ -64,45 +63,14 @@ impl ScalarUDFImpl for RandomFunc {
         Ok(Float64)
     }
 
-    fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
-        random(args)
+    fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        not_impl_err!("{} function does not accept arguments", self.name())
     }
-}
-
-/// Random SQL function
-fn random(args: &[ColumnarValue]) -> Result<ColumnarValue> {
-    let len: usize = match &args[0] {
-        ColumnarValue::Array(array) => array.len(),
-        _ => return exec_err!("Expect random function to take no param"),
-    };
-    let mut rng = thread_rng();
-    let values = iter::repeat_with(|| rng.gen_range(0.0..1.0)).take(len);
-    let array = Float64Array::from_iter_values(values);
-    Ok(ColumnarValue::Array(Arc::new(array)))
-}
-
-#[cfg(test)]
-mod test {
-    use std::sync::Arc;
-
-    use arrow::array::NullArray;
-
-    use datafusion_common::cast::as_float64_array;
-    use datafusion_expr::ColumnarValue;
-
-    use crate::math::random::random;
-
-    #[test]
-    fn test_random_expression() {
-        let args = vec![ColumnarValue::Array(Arc::new(NullArray::new(1)))];
-        let array = random(&args)
-            .expect("failed to initialize function random")
-            .into_array(1)
-            .expect("Failed to convert to array");
-        let floats =
-            as_float64_array(&array).expect("failed to initialize function 
random");
 
-        assert_eq!(floats.len(), 1);
-        assert!(0.0 <= floats.value(0) && floats.value(0) < 1.0);
+    fn invoke_no_args(&self, num_rows: usize) -> Result<ColumnarValue> {
+        let mut rng = thread_rng();
+        let values = std::iter::repeat_with(|| 
rng.gen_range(0.0..1.0)).take(num_rows);
+        let array = Float64Array::from_iter_values(values);
+        Ok(ColumnarValue::Array(Arc::new(array)))
     }
 }
diff --git a/datafusion/functions/src/string/uuid.rs 
b/datafusion/functions/src/string/uuid.rs
index c68871d42e..9c97b4dd74 100644
--- a/datafusion/functions/src/string/uuid.rs
+++ b/datafusion/functions/src/string/uuid.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use std::any::Any;
-use std::iter;
 use std::sync::Arc;
 
 use arrow::array::GenericStringArray;
@@ -24,7 +23,7 @@ use arrow::datatypes::DataType;
 use arrow::datatypes::DataType::Utf8;
 use uuid::Uuid;
 
-use datafusion_common::{exec_err, Result};
+use datafusion_common::{not_impl_err, Result};
 use datafusion_expr::{ColumnarValue, Volatility};
 use datafusion_expr::{ScalarUDFImpl, Signature};
 
@@ -58,15 +57,14 @@ impl ScalarUDFImpl for UuidFunc {
         Ok(Utf8)
     }
 
+    fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        not_impl_err!("{} function does not accept arguments", self.name())
+    }
+
     /// Prints random (v4) uuid values per row
     /// uuid() = 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'
-    fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
-        let len: usize = match &args[0] {
-            ColumnarValue::Array(array) => array.len(),
-            _ => return exec_err!("Expect uuid function to take no param"),
-        };
-
-        let values = iter::repeat_with(|| 
Uuid::new_v4().to_string()).take(len);
+    fn invoke_no_args(&self, num_rows: usize) -> Result<ColumnarValue> {
+        let values = std::iter::repeat_with(|| 
Uuid::new_v4().to_string()).take(num_rows);
         let array = GenericStringArray::<i32>::from_iter_values(values);
         Ok(ColumnarValue::Array(Arc::new(array)))
     }
diff --git a/datafusion/physical-expr/src/functions.rs 
b/datafusion/physical-expr/src/functions.rs
index ac5b87e701..06c4bd1c95 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -74,7 +74,6 @@ pub fn create_physical_expr(
         input_phy_exprs.to_vec(),
         return_type,
         fun.monotonicity()?,
-        fun.signature().type_signature.supports_zero_argument(),
     )))
 }
 
diff --git a/datafusion/physical-expr/src/scalar_function.rs 
b/datafusion/physical-expr/src/scalar_function.rs
index 9ae9f3dee3..3b360fc20c 100644
--- a/datafusion/physical-expr/src/scalar_function.rs
+++ b/datafusion/physical-expr/src/scalar_function.rs
@@ -58,8 +58,6 @@ pub struct ScalarFunctionExpr {
     // and it specifies the effect of an increase or decrease in
     // the corresponding `arg` to the function value.
     monotonicity: Option<FuncMonotonicity>,
-    // Whether this function can be invoked with zero arguments
-    supports_zero_argument: bool,
 }
 
 impl Debug for ScalarFunctionExpr {
@@ -70,7 +68,6 @@ impl Debug for ScalarFunctionExpr {
             .field("args", &self.args)
             .field("return_type", &self.return_type)
             .field("monotonicity", &self.monotonicity)
-            .field("supports_zero_argument", &self.supports_zero_argument)
             .finish()
     }
 }
@@ -83,7 +80,6 @@ impl ScalarFunctionExpr {
         args: Vec<Arc<dyn PhysicalExpr>>,
         return_type: DataType,
         monotonicity: Option<FuncMonotonicity>,
-        supports_zero_argument: bool,
     ) -> Self {
         Self {
             fun,
@@ -91,7 +87,6 @@ impl ScalarFunctionExpr {
             args,
             return_type,
             monotonicity,
-            supports_zero_argument,
         }
     }
 
@@ -142,25 +137,21 @@ impl PhysicalExpr for ScalarFunctionExpr {
     }
 
     fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
-        // evaluate the arguments, if there are no arguments we'll instead 
pass in a null array
-        // indicating the batch size (as a convention)
-        let inputs = match self.args.is_empty() {
-            // If the function supports zero argument, we pass in a null array 
indicating the batch size.
-            // This is for user-defined functions.
-            // MakeArray support zero argument but has the different behavior 
from the array with one null.
-            true if self.supports_zero_argument && self.name != "make_array" 
=> {
-                vec![ColumnarValue::create_null_array(batch.num_rows())]
-            }
-            _ => self
-                .args
-                .iter()
-                .map(|e| e.evaluate(batch))
-                .collect::<Result<Vec<_>>>()?,
-        };
+        let inputs = self
+            .args
+            .iter()
+            .map(|e| e.evaluate(batch))
+            .collect::<Result<Vec<_>>>()?;
 
         // evaluate the function
         match self.fun {
-            ScalarFunctionDefinition::UDF(ref fun) => fun.invoke(&inputs),
+            ScalarFunctionDefinition::UDF(ref fun) => {
+                if self.args.is_empty() {
+                    fun.invoke_no_args(batch.num_rows())
+                } else {
+                    fun.invoke(&inputs)
+                }
+            }
             ScalarFunctionDefinition::Name(_) => {
                 internal_err!(
                     "Name function must be resolved to one of the other 
variants prior to physical planning"
@@ -183,7 +174,6 @@ impl PhysicalExpr for ScalarFunctionExpr {
             children,
             self.return_type().clone(),
             self.monotonicity.clone(),
-            self.supports_zero_argument,
         )))
     }
 
diff --git a/datafusion/physical-expr/src/udf.rs 
b/datafusion/physical-expr/src/udf.rs
index 368dfdf92f..aad78b7c2f 100644
--- a/datafusion/physical-expr/src/udf.rs
+++ b/datafusion/physical-expr/src/udf.rs
@@ -57,7 +57,6 @@ pub fn create_physical_expr(
         input_phy_exprs.to_vec(),
         return_type,
         fun.monotonicity()?,
-        fun.signature().type_signature.supports_zero_argument(),
     )))
 }
 
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs 
b/datafusion/proto/src/physical_plan/from_proto.rs
index 1d3edb7b60..e9728d8542 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -342,7 +342,6 @@ pub fn parse_physical_expr(
                 Some(buf) => codec.try_decode_udf(&e.name, buf)?,
                 None => registry.udf(e.name.as_str())?,
             };
-            let signature = udf.signature();
             let scalar_fun_def = ScalarFunctionDefinition::UDF(udf.clone());
 
             let args = parse_physical_exprs(&e.args, registry, input_schema, 
codec)?;
@@ -353,7 +352,6 @@ pub fn parse_physical_expr(
                 args,
                 convert_required!(e.return_type)?,
                 None,
-                signature.type_signature.supports_zero_argument(),
             ))
         }
         ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new(
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 642860d639..5e446f93fe 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -626,7 +626,6 @@ fn roundtrip_scalar_udf() -> Result<()> {
         vec![col("a", &schema)?],
         DataType::Int64,
         None,
-        false,
     );
 
     let project =
@@ -755,7 +754,6 @@ fn roundtrip_scalar_udf_extension_codec() -> Result<()> {
         vec![col("text", &schema)?],
         DataType::Int64,
         None,
-        false,
     ));
 
     let filter = Arc::new(FilterExec::try_new(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to