This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e5dcc8c04f fix: lazy evaluation for coalesce (#17357)
e5dcc8c04f is described below

commit e5dcc8c04f9559f8af6efea3c7ff8202f3c1c618
Author: Chen Chongchen <[email protected]>
AuthorDate: Mon Sep 8 18:47:03 2025 +0800

    fix: lazy evaluation for coalesce (#17357)
    
    * fix: lazy evaluation for coalesce
    
    * update test
---
 datafusion/functions/src/core/coalesce.rs     | 88 ++++++++++-----------------
 datafusion/sqllogictest/test_files/select.slt | 16 +++--
 2 files changed, 42 insertions(+), 62 deletions(-)

diff --git a/datafusion/functions/src/core/coalesce.rs 
b/datafusion/functions/src/core/coalesce.rs
index b0f3483513..3fba539dd0 100644
--- a/datafusion/functions/src/core/coalesce.rs
+++ b/datafusion/functions/src/core/coalesce.rs
@@ -15,14 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::array::{new_null_array, BooleanArray};
-use arrow::compute::kernels::zip::zip;
-use arrow::compute::{and, is_not_null, is_null};
 use arrow::datatypes::{DataType, Field, FieldRef};
-use datafusion_common::{exec_err, internal_err, Result};
+use datafusion_common::{exec_err, internal_err, plan_err, Result};
 use datafusion_expr::binary::try_type_union_resolution;
+use datafusion_expr::conditional_expressions::CaseBuilder;
+use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
 use datafusion_expr::{
-    ColumnarValue, Documentation, ReturnFieldArgs, ScalarFunctionArgs,
+    ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs,
 };
 use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
 use datafusion_macros::user_doc;
@@ -95,61 +94,36 @@ impl ScalarUDFImpl for CoalesceFunc {
         Ok(Field::new(self.name(), return_type, nullable).into())
     }
 
-    /// coalesce evaluates to the first value which is not NULL
-    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
-        let args = args.args;
-        // do not accept 0 arguments.
+    fn simplify(
+        &self,
+        args: Vec<Expr>,
+        _info: &dyn SimplifyInfo,
+    ) -> Result<ExprSimplifyResult> {
         if args.is_empty() {
-            return exec_err!(
-                "coalesce was called with {} arguments. It requires at least 
1.",
-                args.len()
-            );
+            return plan_err!("coalesce must have at least one argument");
         }
-
-        let return_type = args[0].data_type();
-        let mut return_array = args.iter().filter_map(|x| match x {
-            ColumnarValue::Array(array) => Some(array.len()),
-            _ => None,
-        });
-
-        if let Some(size) = return_array.next() {
-            // start with nulls as default output
-            let mut current_value = new_null_array(&return_type, size);
-            let mut remainder = BooleanArray::from(vec![true; size]);
-
-            for arg in args {
-                match arg {
-                    ColumnarValue::Array(ref array) => {
-                        let to_apply = and(&remainder, 
&is_not_null(array.as_ref())?)?;
-                        current_value = zip(&to_apply, array, &current_value)?;
-                        remainder = and(&remainder, &is_null(array)?)?;
-                    }
-                    ColumnarValue::Scalar(value) => {
-                        if value.is_null() {
-                            continue;
-                        } else {
-                            let last_value = value.to_scalar()?;
-                            current_value = zip(&remainder, &last_value, 
&current_value)?;
-                            break;
-                        }
-                    }
-                }
-                if remainder.iter().all(|x| x == Some(false)) {
-                    break;
-                }
-            }
-            Ok(ColumnarValue::Array(current_value))
-        } else {
-            let result = args
-                .iter()
-                .filter_map(|x| match x {
-                    ColumnarValue::Scalar(s) if !s.is_null() => 
Some(x.clone()),
-                    _ => None,
-                })
-                .next()
-                .unwrap_or_else(|| args[0].clone());
-            Ok(result)
+        if args.len() == 1 {
+            return Ok(ExprSimplifyResult::Simplified(
+                args.into_iter().next().unwrap(),
+            ));
         }
+
+        let n = args.len();
+        let (init, last_elem) = args.split_at(n - 1);
+        let whens = init
+            .iter()
+            .map(|x| x.clone().is_not_null())
+            .collect::<Vec<_>>();
+        let cases = init.to_vec();
+        Ok(ExprSimplifyResult::Simplified(
+            CaseBuilder::new(None, whens, cases, 
Some(Box::new(last_elem[0].clone())))
+                .end()?,
+        ))
+    }
+
+    /// coalesce evaluates to the first value which is not NULL
+    fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+        internal_err!("coalesce should have been simplified to case")
     }
 
     fn short_circuits(&self) -> bool {
diff --git a/datafusion/sqllogictest/test_files/select.slt 
b/datafusion/sqllogictest/test_files/select.slt
index 109c2f209a..989f7df7b4 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -1656,10 +1656,10 @@ query TT
 explain select coalesce(1, y/x), coalesce(2, y/x) from t;
 ----
 logical_plan
-01)Projection: coalesce(Int64(1), CAST(t.y / t.x AS Int64)), 
coalesce(Int64(2), CAST(t.y / t.x AS Int64))
+01)Projection: CASE WHEN Boolean(true) THEN Int64(1) ELSE CAST(t.y / t.x AS 
Int64) END AS coalesce(Int64(1),t.y / t.x), CASE WHEN Boolean(true) THEN 
Int64(2) ELSE CAST(t.y / t.x AS Int64) END AS coalesce(Int64(2),t.y / t.x)
 02)--TableScan: t projection=[x, y]
 physical_plan
-01)ProjectionExec: expr=[coalesce(1, CAST(y@1 / x@0 AS Int64)) as 
coalesce(Int64(1),t.y / t.x), coalesce(2, CAST(y@1 / x@0 AS Int64)) as 
coalesce(Int64(2),t.y / t.x)]
+01)ProjectionExec: expr=[CASE WHEN true THEN 1 ELSE CAST(y@1 / x@0 AS Int64) 
END as coalesce(Int64(1),t.y / t.x), CASE WHEN true THEN 2 ELSE CAST(y@1 / x@0 
AS Int64) END as coalesce(Int64(2),t.y / t.x)]
 02)--DataSourceExec: partitions=1, partition_sizes=[1]
 
 query TT
@@ -1686,11 +1686,17 @@ physical_plan
 02)--ProjectionExec: expr=[y@1 = 0 as __common_expr_1, x@0 as x, y@1 as y]
 03)----DataSourceExec: partitions=1, partition_sizes=[1]
 
-# due to the reason describe in 
https://github.com/apache/datafusion/issues/8927,
-# the following queries will fail
-query error
+query II
 select coalesce(1, y/x), coalesce(2, y/x) from t;
+----
+1 2
+1 2
+1 2
+1 2
+1 2
 
+# due to the reason describe in 
https://github.com/apache/datafusion/issues/8927,
+# the following queries will fail
 query error
 SELECT y > 0 and 1 / y < 1, x > 0 and y > 0 and 1 / y < 1 / x from t;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to