This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch branch-51
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-51 by this push:
     new 2677c27541 [branch-51] Revert rewrite for coalesce, `nvl` and `nvl2` 
simplification (#18567)
2677c27541 is described below

commit 2677c27541d9ec568434b8b99f136e45c3d383bf
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Nov 10 06:53:52 2025 -0500

    [branch-51] Revert rewrite for coalesce, `nvl` and `nvl2` simplification 
(#18567)
    
    Note this targets the branch-51 release branch
    
    ## Which issue does this PR close?
    
    - part of https://github.com/apache/datafusion/issues/17558
    - resolves https://github.com/apache/datafusion/issues/17801 in the 51
    release branch
    
    ## Rationale for this change
    
    - We merged some clever rewrites for `coalesce` and `nvl2` to use `CASE`
    which are faster and more correct (👏 @chenkovsky @kosiew )
    - However, these rewrites cause subtle schema mismatches in some cases
    planning (b/c the CASE simplification nullability logic can't determine
    the correct nullability in some cases - see
    https://github.com/apache/datafusion/issues/17801)
    - @pepijnve has some heroic efforts to fix the schema mismatch in
    https://github.com/apache/datafusion/pull/17813#issuecomment-3508025785,
    but it is non trivial and I am worried about merging it so close to the
    51 release and introducing new edge cases
    
    ## What changes are included in this PR?
    
    1. Revert https://github.com/apache/datafusion/pull/17357 /
    e5dcc8c04f9559f8af6efea3c7ff8202f3c1c618
    3. Revert https://github.com/apache/datafusion/pull/17991 /
    ea83c2644eb559e55401ce2f7f975032e8d7845d
    2. Revert https://github.com/apache/datafusion/pull/18191 /
    22c4214fe1ca3953932f3f12ccd5b68dbfbefdf3
    2. Cherry-pick 62022545af09346ed578e2c7c61a7c897e2ccfc0, a test that
    reproduces the schema mismatch issue (from
    https://github.com/apache/datafusion/pull/18536)
    3. Cherry-pick 735cacf71d8632d786bfb920126ce6719c42156d, a fix for the
    benchmarks that regressed due to the revert (from
    https://github.com/apache/datafusion/pull/17833)
    4. Update datafusion-testing (see separate PR here) for extended tests
    (see https://github.com/apache/datafusion-testing/pull/15)
    
    ## Are these changes tested?
    
    Yes I added a new test
    
    ## Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
---
 datafusion-testing                                 |   2 +-
 datafusion/core/benches/sql_planner.rs             |   3 -
 .../core/tests/dataframe/dataframe_functions.rs    |  27 ---
 datafusion/core/tests/expr_api/mod.rs              |  20 --
 datafusion/core/tests/tpcds_planning.rs            |  11 +-
 datafusion/expr/src/udf.rs                         |  55 +----
 datafusion/functions/src/core/coalesce.rs          |  99 +++++----
 datafusion/functions/src/core/nvl.rs               | 240 +++++++++++++++++----
 datafusion/functions/src/core/nvl2.rs              |  85 ++++----
 .../optimizer/src/common_subexpr_eliminate.rs      |   6 +-
 datafusion/sqllogictest/test_files/nvl.slt         |  35 ---
 datafusion/sqllogictest/test_files/select.slt      |  18 +-
 .../sqllogictest/test_files/string/string_view.slt |   2 +-
 docs/source/user-guide/sql/scalar_functions.md     |   2 +-
 14 files changed, 329 insertions(+), 276 deletions(-)

diff --git a/datafusion-testing b/datafusion-testing
index eccb0e4a42..8ad3ac00c1 160000
--- a/datafusion-testing
+++ b/datafusion-testing
@@ -1 +1 @@
-Subproject commit eccb0e4a426344ef3faf534cd60e02e9c3afd3ac
+Subproject commit 8ad3ac00c1990d44a99fb6738d7e444f0ccf76a0
diff --git a/datafusion/core/benches/sql_planner.rs 
b/datafusion/core/benches/sql_planner.rs
index 6266a7184c..7f11899af6 100644
--- a/datafusion/core/benches/sql_planner.rs
+++ b/datafusion/core/benches/sql_planner.rs
@@ -477,9 +477,6 @@ fn criterion_benchmark(c: &mut Criterion) {
     };
 
     let raw_tpcds_sql_queries = (1..100)
-        // skip query 75 until it is fixed
-        // https://github.com/apache/datafusion/issues/17801
-        .filter(|q| *q != 75)
         .map(|q| 
std::fs::read_to_string(format!("{tests_path}tpc-ds/{q}.sql")).unwrap())
         .collect::<Vec<_>>();
 
diff --git a/datafusion/core/tests/dataframe/dataframe_functions.rs 
b/datafusion/core/tests/dataframe/dataframe_functions.rs
index 265862ff9a..e7f2603604 100644
--- a/datafusion/core/tests/dataframe/dataframe_functions.rs
+++ b/datafusion/core/tests/dataframe/dataframe_functions.rs
@@ -274,33 +274,6 @@ async fn test_nvl2() -> Result<()> {
 
     Ok(())
 }
-
-#[tokio::test]
-async fn test_nvl2_short_circuit() -> Result<()> {
-    let expr = nvl2(
-        col("a"),
-        arrow_cast(lit("1"), lit("Int32")),
-        arrow_cast(col("a"), lit("Int32")),
-    );
-
-    let batches = get_batches(expr).await?;
-
-    assert_snapshot!(
-        batches_to_string(&batches),
-        @r#"
-    
+-----------------------------------------------------------------------------------+
-    | 
nvl2(test.a,arrow_cast(Utf8("1"),Utf8("Int32")),arrow_cast(test.a,Utf8("Int32")))
 |
-    
+-----------------------------------------------------------------------------------+
-    | 1                                                                        
         |
-    | 1                                                                        
         |
-    | 1                                                                        
         |
-    | 1                                                                        
         |
-    
+-----------------------------------------------------------------------------------+
-    "#
-    );
-
-    Ok(())
-}
 #[tokio::test]
 async fn test_fn_arrow_typeof() -> Result<()> {
     let expr = arrow_typeof(col("l"));
diff --git a/datafusion/core/tests/expr_api/mod.rs 
b/datafusion/core/tests/expr_api/mod.rs
index 84e644480a..4aee274de9 100644
--- a/datafusion/core/tests/expr_api/mod.rs
+++ b/datafusion/core/tests/expr_api/mod.rs
@@ -320,26 +320,6 @@ async fn test_create_physical_expr() {
     create_simplified_expr_test(lit(1i32) + lit(2i32), "3");
 }
 
-#[test]
-fn test_create_physical_expr_nvl2() {
-    let batch = &TEST_BATCH;
-    let df_schema = DFSchema::try_from(batch.schema()).unwrap();
-    let ctx = SessionContext::new();
-
-    let expect_err = |expr| {
-        let physical_expr = ctx.create_physical_expr(expr, 
&df_schema).unwrap();
-        let err = physical_expr.evaluate(batch).unwrap_err();
-        assert!(
-            err.to_string()
-                .contains("nvl2 should have been simplified to case"),
-            "unexpected error: {err:?}"
-        );
-    };
-
-    expect_err(nvl2(col("i"), lit(1i64), lit(0i64)));
-    expect_err(nvl2(lit(1i64), col("i"), lit(0i64)));
-}
-
 #[tokio::test]
 async fn test_create_physical_expr_coercion() {
     // create_physical_expr does apply type coercion and unwrapping in cast
diff --git a/datafusion/core/tests/tpcds_planning.rs 
b/datafusion/core/tests/tpcds_planning.rs
index 252d76d0f9..00e1b8724d 100644
--- a/datafusion/core/tests/tpcds_planning.rs
+++ b/datafusion/core/tests/tpcds_planning.rs
@@ -1051,10 +1051,13 @@ async fn regression_test(query_no: u8, create_physical: 
bool) -> Result<()> {
 
     for sql in &sql {
         let df = ctx.sql(sql).await?;
-        let (state, plan) = df.into_parts();
-        let plan = state.optimize(&plan)?;
-        if create_physical {
-            let _ = state.create_physical_plan(&plan).await?;
+        // attempt to mimic planning steps
+        if !create_physical {
+            let (state, plan) = df.into_parts();
+            let _ = state.optimize(&plan)?;
+        } else {
+            // this is what df.execute() does internally
+            let _ = df.create_physical_plan().await?;
         }
     }
 
diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs
index fd54bb13a6..c1a55bcfd4 100644
--- a/datafusion/expr/src/udf.rs
+++ b/datafusion/expr/src/udf.rs
@@ -252,21 +252,7 @@ impl ScalarUDF {
         Ok(result)
     }
 
-    /// Determines which of the arguments passed to this function are 
evaluated eagerly
-    /// and which may be evaluated lazily.
-    ///
-    /// See [ScalarUDFImpl::conditional_arguments] for more information.
-    pub fn conditional_arguments<'a>(
-        &self,
-        args: &'a [Expr],
-    ) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
-        self.inner.conditional_arguments(args)
-    }
-
-    /// Returns true if some of this `exprs` subexpressions may not be 
evaluated
-    /// and thus any side effects (like divide by zero) may not be encountered.
-    ///
-    /// See [ScalarUDFImpl::short_circuits] for more information.
+    /// Get the circuits of inner implementation
     pub fn short_circuits(&self) -> bool {
         self.inner.short_circuits()
     }
@@ -696,42 +682,10 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + 
Sync {
     ///
     /// Setting this to true prevents certain optimizations such as common
     /// subexpression elimination
-    ///
-    /// When overriding this function to return `true`, 
[ScalarUDFImpl::conditional_arguments] can also be
-    /// overridden to report more accurately which arguments are eagerly 
evaluated and which ones
-    /// lazily.
     fn short_circuits(&self) -> bool {
         false
     }
 
-    /// Determines which of the arguments passed to this function are 
evaluated eagerly
-    /// and which may be evaluated lazily.
-    ///
-    /// If this function returns `None`, all arguments are eagerly evaluated.
-    /// Returning `None` is a micro optimization that saves a needless `Vec`
-    /// allocation.
-    ///
-    /// If the function returns `Some`, returns (`eager`, `lazy`) where `eager`
-    /// are the arguments that are always evaluated, and `lazy` are the
-    /// arguments that may be evaluated lazily (i.e. may not be evaluated at 
all
-    /// in some cases).
-    ///
-    /// Implementations must ensure that the two returned `Vec`s are disjunct,
-    /// and that each argument from `args` is present in one the two `Vec`s.
-    ///
-    /// When overriding this function, [ScalarUDFImpl::short_circuits] must
-    /// be overridden to return `true`.
-    fn conditional_arguments<'a>(
-        &self,
-        args: &'a [Expr],
-    ) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
-        if self.short_circuits() {
-            Some((vec![], args.iter().collect()))
-        } else {
-            None
-        }
-    }
-
     /// Computes the output [`Interval`] for a [`ScalarUDFImpl`], given the 
input
     /// intervals.
     ///
@@ -921,13 +875,6 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
         self.inner.simplify(args, info)
     }
 
-    fn conditional_arguments<'a>(
-        &self,
-        args: &'a [Expr],
-    ) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
-        self.inner.conditional_arguments(args)
-    }
-
     fn short_circuits(&self) -> bool {
         self.inner.short_circuits()
     }
diff --git a/datafusion/functions/src/core/coalesce.rs 
b/datafusion/functions/src/core/coalesce.rs
index aab1f445d5..b0f3483513 100644
--- a/datafusion/functions/src/core/coalesce.rs
+++ b/datafusion/functions/src/core/coalesce.rs
@@ -15,13 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use arrow::array::{new_null_array, BooleanArray};
+use arrow::compute::kernels::zip::zip;
+use arrow::compute::{and, is_not_null, is_null};
 use arrow::datatypes::{DataType, Field, FieldRef};
-use datafusion_common::{exec_err, internal_err, plan_err, Result};
+use datafusion_common::{exec_err, internal_err, Result};
 use datafusion_expr::binary::try_type_union_resolution;
-use datafusion_expr::conditional_expressions::CaseBuilder;
-use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
 use datafusion_expr::{
-    ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs,
+    ColumnarValue, Documentation, ReturnFieldArgs, ScalarFunctionArgs,
 };
 use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
 use datafusion_macros::user_doc;
@@ -47,7 +48,7 @@ use std::any::Any;
 )]
 #[derive(Debug, PartialEq, Eq, Hash)]
 pub struct CoalesceFunc {
-    pub(super) signature: Signature,
+    signature: Signature,
 }
 
 impl Default for CoalesceFunc {
@@ -94,45 +95,61 @@ impl ScalarUDFImpl for CoalesceFunc {
         Ok(Field::new(self.name(), return_type, nullable).into())
     }
 
-    fn simplify(
-        &self,
-        args: Vec<Expr>,
-        _info: &dyn SimplifyInfo,
-    ) -> Result<ExprSimplifyResult> {
+    /// coalesce evaluates to the first value which is not NULL
+    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+        let args = args.args;
+        // do not accept 0 arguments.
         if args.is_empty() {
-            return plan_err!("coalesce must have at least one argument");
-        }
-        if args.len() == 1 {
-            return Ok(ExprSimplifyResult::Simplified(
-                args.into_iter().next().unwrap(),
-            ));
+            return exec_err!(
+                "coalesce was called with {} arguments. It requires at least 
1.",
+                args.len()
+            );
         }
 
-        let n = args.len();
-        let (init, last_elem) = args.split_at(n - 1);
-        let whens = init
-            .iter()
-            .map(|x| x.clone().is_not_null())
-            .collect::<Vec<_>>();
-        let cases = init.to_vec();
-        Ok(ExprSimplifyResult::Simplified(
-            CaseBuilder::new(None, whens, cases, 
Some(Box::new(last_elem[0].clone())))
-                .end()?,
-        ))
-    }
-
-    /// coalesce evaluates to the first value which is not NULL
-    fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
-        internal_err!("coalesce should have been simplified to case")
-    }
-
-    fn conditional_arguments<'a>(
-        &self,
-        args: &'a [Expr],
-    ) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
-        let eager = vec![&args[0]];
-        let lazy = args[1..].iter().collect();
-        Some((eager, lazy))
+        let return_type = args[0].data_type();
+        let mut return_array = args.iter().filter_map(|x| match x {
+            ColumnarValue::Array(array) => Some(array.len()),
+            _ => None,
+        });
+
+        if let Some(size) = return_array.next() {
+            // start with nulls as default output
+            let mut current_value = new_null_array(&return_type, size);
+            let mut remainder = BooleanArray::from(vec![true; size]);
+
+            for arg in args {
+                match arg {
+                    ColumnarValue::Array(ref array) => {
+                        let to_apply = and(&remainder, 
&is_not_null(array.as_ref())?)?;
+                        current_value = zip(&to_apply, array, &current_value)?;
+                        remainder = and(&remainder, &is_null(array)?)?;
+                    }
+                    ColumnarValue::Scalar(value) => {
+                        if value.is_null() {
+                            continue;
+                        } else {
+                            let last_value = value.to_scalar()?;
+                            current_value = zip(&remainder, &last_value, 
&current_value)?;
+                            break;
+                        }
+                    }
+                }
+                if remainder.iter().all(|x| x == Some(false)) {
+                    break;
+                }
+            }
+            Ok(ColumnarValue::Array(current_value))
+        } else {
+            let result = args
+                .iter()
+                .filter_map(|x| match x {
+                    ColumnarValue::Scalar(s) if !s.is_null() => 
Some(x.clone()),
+                    _ => None,
+                })
+                .next()
+                .unwrap_or_else(|| args[0].clone());
+            Ok(result)
+        }
     }
 
     fn short_circuits(&self) -> bool {
diff --git a/datafusion/functions/src/core/nvl.rs 
b/datafusion/functions/src/core/nvl.rs
index 0b9968a88f..c8b34c4b17 100644
--- a/datafusion/functions/src/core/nvl.rs
+++ b/datafusion/functions/src/core/nvl.rs
@@ -15,19 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::core::coalesce::CoalesceFunc;
-use arrow::datatypes::{DataType, FieldRef};
-use datafusion_common::Result;
-use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
+use arrow::array::Array;
+use arrow::compute::is_not_null;
+use arrow::compute::kernels::zip::zip;
+use arrow::datatypes::DataType;
+use datafusion_common::{utils::take_function_args, Result};
 use datafusion_expr::{
-    ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs,
-    ScalarUDFImpl, Signature, Volatility,
+    ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
+    Volatility,
 };
 use datafusion_macros::user_doc;
+use std::sync::Arc;
 
 #[user_doc(
     doc_section(label = "Conditional Functions"),
-    description = "Returns _expression2_ if _expression1_ is NULL otherwise it 
returns _expression1_ and _expression2_ is not evaluated. This function can be 
used to substitute a default value for NULL values.",
+    description = "Returns _expression2_ if _expression1_ is NULL otherwise it 
returns _expression1_.",
     syntax_example = "nvl(expression1, expression2)",
     sql_example = r#"```sql
 > select nvl(null, 'a');
@@ -55,7 +57,7 @@ use datafusion_macros::user_doc;
 )]
 #[derive(Debug, PartialEq, Eq, Hash)]
 pub struct NVLFunc {
-    coalesce: CoalesceFunc,
+    signature: Signature,
     aliases: Vec<String>,
 }
 
@@ -88,13 +90,11 @@ impl Default for NVLFunc {
 impl NVLFunc {
     pub fn new() -> Self {
         Self {
-            coalesce: CoalesceFunc {
-                signature: Signature::uniform(
-                    2,
-                    SUPPORTED_NVL_TYPES.to_vec(),
-                    Volatility::Immutable,
-                ),
-            },
+            signature: Signature::uniform(
+                2,
+                SUPPORTED_NVL_TYPES.to_vec(),
+                Volatility::Immutable,
+            ),
             aliases: vec![String::from("ifnull")],
         }
     }
@@ -110,45 +110,209 @@ impl ScalarUDFImpl for NVLFunc {
     }
 
     fn signature(&self) -> &Signature {
-        &self.coalesce.signature
+        &self.signature
     }
 
     fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
-        self.coalesce.return_type(arg_types)
+        Ok(arg_types[0].clone())
     }
 
-    fn return_field_from_args(&self, args: ReturnFieldArgs) -> 
Result<FieldRef> {
-        self.coalesce.return_field_from_args(args)
+    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+        nvl_func(&args.args)
     }
 
-    fn simplify(
-        &self,
-        args: Vec<Expr>,
-        info: &dyn SimplifyInfo,
-    ) -> Result<ExprSimplifyResult> {
-        self.coalesce.simplify(args, info)
+    fn aliases(&self) -> &[String] {
+        &self.aliases
     }
 
-    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
-        self.coalesce.invoke_with_args(args)
+    fn documentation(&self) -> Option<&Documentation> {
+        self.doc()
     }
+}
+
+fn nvl_func(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    let [lhs, rhs] = take_function_args("nvl/ifnull", args)?;
+    let (lhs_array, rhs_array) = match (lhs, rhs) {
+        (ColumnarValue::Array(lhs), ColumnarValue::Scalar(rhs)) => {
+            (Arc::clone(lhs), rhs.to_array_of_size(lhs.len())?)
+        }
+        (ColumnarValue::Array(lhs), ColumnarValue::Array(rhs)) => {
+            (Arc::clone(lhs), Arc::clone(rhs))
+        }
+        (ColumnarValue::Scalar(lhs), ColumnarValue::Array(rhs)) => {
+            (lhs.to_array_of_size(rhs.len())?, Arc::clone(rhs))
+        }
+        (ColumnarValue::Scalar(lhs), ColumnarValue::Scalar(rhs)) => {
+            let mut current_value = lhs;
+            if lhs.is_null() {
+                current_value = rhs;
+            }
+            return Ok(ColumnarValue::Scalar(current_value.clone()));
+        }
+    };
+    let to_apply = is_not_null(&lhs_array)?;
+    let value = zip(&to_apply, &lhs_array, &rhs_array)?;
+    Ok(ColumnarValue::Array(value))
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use arrow::array::*;
+
+    use super::*;
+    use datafusion_common::ScalarValue;
+
+    #[test]
+    fn nvl_int32() -> Result<()> {
+        let a = Int32Array::from(vec![
+            Some(1),
+            Some(2),
+            None,
+            None,
+            Some(3),
+            None,
+            None,
+            Some(4),
+            Some(5),
+        ]);
+        let a = ColumnarValue::Array(Arc::new(a));
+
+        let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(6i32)));
 
-    fn conditional_arguments<'a>(
-        &self,
-        args: &'a [Expr],
-    ) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
-        self.coalesce.conditional_arguments(args)
+        let result = nvl_func(&[a, lit_array])?;
+        let result = result.into_array(0).expect("Failed to convert to array");
+
+        let expected = Arc::new(Int32Array::from(vec![
+            Some(1),
+            Some(2),
+            Some(6),
+            Some(6),
+            Some(3),
+            Some(6),
+            Some(6),
+            Some(4),
+            Some(5),
+        ])) as ArrayRef;
+        assert_eq!(expected.as_ref(), result.as_ref());
+        Ok(())
     }
 
-    fn short_circuits(&self) -> bool {
-        self.coalesce.short_circuits()
+    #[test]
+    // Ensure that arrays with no nulls can also invoke nvl() correctly
+    fn nvl_int32_non_nulls() -> Result<()> {
+        let a = Int32Array::from(vec![1, 3, 10, 7, 8, 1, 2, 4, 5]);
+        let a = ColumnarValue::Array(Arc::new(a));
+
+        let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(20i32)));
+
+        let result = nvl_func(&[a, lit_array])?;
+        let result = result.into_array(0).expect("Failed to convert to array");
+
+        let expected = Arc::new(Int32Array::from(vec![
+            Some(1),
+            Some(3),
+            Some(10),
+            Some(7),
+            Some(8),
+            Some(1),
+            Some(2),
+            Some(4),
+            Some(5),
+        ])) as ArrayRef;
+        assert_eq!(expected.as_ref(), result.as_ref());
+        Ok(())
     }
 
-    fn aliases(&self) -> &[String] {
-        &self.aliases
+    #[test]
+    fn nvl_boolean() -> Result<()> {
+        let a = BooleanArray::from(vec![Some(true), Some(false), None]);
+        let a = ColumnarValue::Array(Arc::new(a));
+
+        let lit_array = 
ColumnarValue::Scalar(ScalarValue::Boolean(Some(false)));
+
+        let result = nvl_func(&[a, lit_array])?;
+        let result = result.into_array(0).expect("Failed to convert to array");
+
+        let expected = Arc::new(BooleanArray::from(vec![
+            Some(true),
+            Some(false),
+            Some(false),
+        ])) as ArrayRef;
+
+        assert_eq!(expected.as_ref(), result.as_ref());
+        Ok(())
     }
 
-    fn documentation(&self) -> Option<&Documentation> {
-        self.doc()
+    #[test]
+    fn nvl_string() -> Result<()> {
+        let a = StringArray::from(vec![Some("foo"), Some("bar"), None, 
Some("baz")]);
+        let a = ColumnarValue::Array(Arc::new(a));
+
+        let lit_array = ColumnarValue::Scalar(ScalarValue::from("bax"));
+
+        let result = nvl_func(&[a, lit_array])?;
+        let result = result.into_array(0).expect("Failed to convert to array");
+
+        let expected = Arc::new(StringArray::from(vec![
+            Some("foo"),
+            Some("bar"),
+            Some("bax"),
+            Some("baz"),
+        ])) as ArrayRef;
+
+        assert_eq!(expected.as_ref(), result.as_ref());
+        Ok(())
+    }
+
+    #[test]
+    fn nvl_literal_first() -> Result<()> {
+        let a = Int32Array::from(vec![Some(1), Some(2), None, None, Some(3), 
Some(4)]);
+        let a = ColumnarValue::Array(Arc::new(a));
+
+        let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32)));
+
+        let result = nvl_func(&[lit_array, a])?;
+        let result = result.into_array(0).expect("Failed to convert to array");
+
+        let expected = Arc::new(Int32Array::from(vec![
+            Some(2),
+            Some(2),
+            Some(2),
+            Some(2),
+            Some(2),
+            Some(2),
+        ])) as ArrayRef;
+        assert_eq!(expected.as_ref(), result.as_ref());
+        Ok(())
+    }
+
+    #[test]
+    fn nvl_scalar() -> Result<()> {
+        let a_null = ColumnarValue::Scalar(ScalarValue::Int32(None));
+        let b_null = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32)));
+
+        let result_null = nvl_func(&[a_null, b_null])?;
+        let result_null = result_null
+            .into_array(1)
+            .expect("Failed to convert to array");
+
+        let expected_null = Arc::new(Int32Array::from(vec![Some(2i32)])) as 
ArrayRef;
+
+        assert_eq!(expected_null.as_ref(), result_null.as_ref());
+
+        let a_nnull = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32)));
+        let b_nnull = ColumnarValue::Scalar(ScalarValue::Int32(Some(1i32)));
+
+        let result_nnull = nvl_func(&[a_nnull, b_nnull])?;
+        let result_nnull = result_nnull
+            .into_array(1)
+            .expect("Failed to convert to array");
+
+        let expected_nnull = Arc::new(Int32Array::from(vec![Some(2i32)])) as 
ArrayRef;
+        assert_eq!(expected_nnull.as_ref(), result_nnull.as_ref());
+
+        Ok(())
     }
 }
diff --git a/datafusion/functions/src/core/nvl2.rs 
b/datafusion/functions/src/core/nvl2.rs
index 45cb6760d0..82aa8d2a4c 100644
--- a/datafusion/functions/src/core/nvl2.rs
+++ b/datafusion/functions/src/core/nvl2.rs
@@ -15,16 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::datatypes::{DataType, Field, FieldRef};
+use arrow::array::Array;
+use arrow::compute::is_not_null;
+use arrow::compute::kernels::zip::zip;
+use arrow::datatypes::DataType;
 use datafusion_common::{internal_err, utils::take_function_args, Result};
 use datafusion_expr::{
-    conditional_expressions::CaseBuilder,
-    simplify::{ExprSimplifyResult, SimplifyInfo},
-    type_coercion::binary::comparison_coercion,
-    ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs,
-    ScalarUDFImpl, Signature, Volatility,
+    type_coercion::binary::comparison_coercion, ColumnarValue, Documentation,
+    ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
 };
 use datafusion_macros::user_doc;
+use std::sync::Arc;
 
 #[user_doc(
     doc_section(label = "Conditional Functions"),
@@ -94,37 +95,8 @@ impl ScalarUDFImpl for NVL2Func {
         Ok(arg_types[1].clone())
     }
 
-    fn return_field_from_args(&self, args: ReturnFieldArgs) -> 
Result<FieldRef> {
-        let nullable =
-            args.arg_fields[1].is_nullable() || 
args.arg_fields[2].is_nullable();
-        let return_type = args.arg_fields[1].data_type().clone();
-        Ok(Field::new(self.name(), return_type, nullable).into())
-    }
-
-    fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
-        internal_err!("nvl2 should have been simplified to case")
-    }
-
-    fn simplify(
-        &self,
-        args: Vec<Expr>,
-        _info: &dyn SimplifyInfo,
-    ) -> Result<ExprSimplifyResult> {
-        let [test, if_non_null, if_null] = take_function_args(self.name(), 
args)?;
-
-        let expr = CaseBuilder::new(
-            None,
-            vec![test.is_not_null()],
-            vec![if_non_null],
-            Some(Box::new(if_null)),
-        )
-        .end()?;
-
-        Ok(ExprSimplifyResult::Simplified(expr))
-    }
-
-    fn short_circuits(&self) -> bool {
-        true
+    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+        nvl2_func(&args.args)
     }
 
     fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
@@ -151,3 +123,42 @@ impl ScalarUDFImpl for NVL2Func {
         self.doc()
     }
 }
+
+fn nvl2_func(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+    let mut len = 1;
+    let mut is_array = false;
+    for arg in args {
+        if let ColumnarValue::Array(array) = arg {
+            len = array.len();
+            is_array = true;
+            break;
+        }
+    }
+    if is_array {
+        let args = args
+            .iter()
+            .map(|arg| match arg {
+                ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(len),
+                ColumnarValue::Array(array) => Ok(Arc::clone(array)),
+            })
+            .collect::<Result<Vec<_>>>()?;
+        let [tested, if_non_null, if_null] = take_function_args("nvl2", args)?;
+        let to_apply = is_not_null(&tested)?;
+        let value = zip(&to_apply, &if_non_null, &if_null)?;
+        Ok(ColumnarValue::Array(value))
+    } else {
+        let [tested, if_non_null, if_null] = take_function_args("nvl2", args)?;
+        match &tested {
+            ColumnarValue::Array(_) => {
+                internal_err!("except Scalar value, but got Array")
+            }
+            ColumnarValue::Scalar(scalar) => {
+                if scalar.is_null() {
+                    Ok(if_null.clone())
+                } else {
+                    Ok(if_non_null.clone())
+                }
+            }
+        }
+    }
+}
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs 
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index 2510068494..ec1f8f991a 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -652,8 +652,10 @@ impl CSEController for ExprCSEController<'_> {
             // In case of `ScalarFunction`s we don't know which children are 
surely
             // executed so start visiting all children conditionally and stop 
the
             // recursion with `TreeNodeRecursion::Jump`.
-            Expr::ScalarFunction(ScalarFunction { func, args }) => {
-                func.conditional_arguments(args)
+            Expr::ScalarFunction(ScalarFunction { func, args })
+                if func.short_circuits() =>
+            {
+                Some((vec![], args.iter().collect()))
             }
 
             // In case of `And` and `Or` the first child is surely executed, 
but we
diff --git a/datafusion/sqllogictest/test_files/nvl.slt 
b/datafusion/sqllogictest/test_files/nvl.slt
index f4225148ab..daab54307c 100644
--- a/datafusion/sqllogictest/test_files/nvl.slt
+++ b/datafusion/sqllogictest/test_files/nvl.slt
@@ -148,38 +148,3 @@ query T
 SELECT NVL(arrow_cast('a', 'Utf8View'), NULL);
 ----
 a
-
-# nvl is implemented as a case, and short-circuits evaluation
-# so the following query should not error
-query I
-SELECT NVL(1, 1/0);
-----
-1
-
-# but this one should
-query error DataFusion error: Arrow error: Divide by zero error
-SELECT NVL(NULL, 1/0);
-
-# Expect the query plan to show nvl as a case expression
-query I
-select NVL(int_field, 9999) FROM test;
-----
-1
-2
-3
-9999
-4
-9999
-
-# Expect the query plan to show nvl as a case expression
-query TT
-EXPLAIN select NVL(int_field, 9999) FROM test;
-----
-logical_plan
-01)Projection: CASE WHEN __common_expr_1 IS NOT NULL THEN __common_expr_1 ELSE 
Int64(9999) END AS nvl(test.int_field,Int64(9999))
-02)--Projection: CAST(test.int_field AS Int64) AS __common_expr_1
-03)----TableScan: test projection=[int_field]
-physical_plan
-01)ProjectionExec: expr=[CASE WHEN __common_expr_1@0 IS NOT NULL THEN 
__common_expr_1@0 ELSE 9999 END as nvl(test.int_field,Int64(9999))]
-02)--ProjectionExec: expr=[CAST(int_field@0 AS Int64) as __common_expr_1]
-03)----DataSourceExec: partitions=1, partition_sizes=[1]
diff --git a/datafusion/sqllogictest/test_files/select.slt 
b/datafusion/sqllogictest/test_files/select.slt
index 5c684eb83d..598a587bfe 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -1656,10 +1656,10 @@ query TT
 explain select coalesce(1, y/x), coalesce(2, y/x) from t;
 ----
 logical_plan
-01)Projection: Int64(1) AS coalesce(Int64(1),t.y / t.x), Int64(2) AS 
coalesce(Int64(2),t.y / t.x)
-02)--TableScan: t projection=[]
+01)Projection: coalesce(Int64(1), CAST(t.y / t.x AS Int64)), 
coalesce(Int64(2), CAST(t.y / t.x AS Int64))
+02)--TableScan: t projection=[x, y]
 physical_plan
-01)ProjectionExec: expr=[1 as coalesce(Int64(1),t.y / t.x), 2 as 
coalesce(Int64(2),t.y / t.x)]
+01)ProjectionExec: expr=[coalesce(1, CAST(y@1 / x@0 AS Int64)) as 
coalesce(Int64(1),t.y / t.x), coalesce(2, CAST(y@1 / x@0 AS Int64)) as 
coalesce(Int64(2),t.y / t.x)]
 02)--DataSourceExec: partitions=1, partition_sizes=[1]
 
 query TT
@@ -1686,17 +1686,11 @@ physical_plan
 02)--ProjectionExec: expr=[y@1 = 0 as __common_expr_1, x@0 as x, y@1 as y]
 03)----DataSourceExec: partitions=1, partition_sizes=[1]
 
-query II
-select coalesce(1, y/x), coalesce(2, y/x) from t;
-----
-1 2
-1 2
-1 2
-1 2
-1 2
-
 # due to the reason describe in 
https://github.com/apache/datafusion/issues/8927,
 # the following queries will fail
+query error
+select coalesce(1, y/x), coalesce(2, y/x) from t;
+
 query error
 SELECT y > 0 and 1 / y < 1, x > 0 and y > 0 and 1 / y < 1 / x from t;
 
diff --git a/datafusion/sqllogictest/test_files/string/string_view.slt 
b/datafusion/sqllogictest/test_files/string/string_view.slt
index 4d30f572ad..fb67daa0b8 100644
--- a/datafusion/sqllogictest/test_files/string/string_view.slt
+++ b/datafusion/sqllogictest/test_files/string/string_view.slt
@@ -988,7 +988,7 @@ query TT
 EXPLAIN SELECT NVL(column1_utf8view, 'a') as c2 FROM test;
 ----
 logical_plan
-01)Projection: CASE WHEN test.column1_utf8view IS NOT NULL THEN 
test.column1_utf8view ELSE Utf8View("a") END AS c2
+01)Projection: nvl(test.column1_utf8view, Utf8View("a")) AS c2
 02)--TableScan: test projection=[column1_utf8view]
 
 ## Ensure no casts for nullif
diff --git a/docs/source/user-guide/sql/scalar_functions.md 
b/docs/source/user-guide/sql/scalar_functions.md
index 7c88d1fd9c..6df14d13ee 100644
--- a/docs/source/user-guide/sql/scalar_functions.md
+++ b/docs/source/user-guide/sql/scalar_functions.md
@@ -1056,7 +1056,7 @@ nullif(expression1, expression2)
 
 ### `nvl`
 
-Returns _expression2_ if _expression1_ is NULL otherwise it returns 
_expression1_ and _expression2_ is not evaluated. This function can be used to 
substitute a default value for NULL values.
+Returns _expression2_ if _expression1_ is NULL otherwise it returns 
_expression1_.
 
 ```sql
 nvl(expression1, expression2)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to