This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch branch-51
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-51 by this push:
new 2677c27541 [branch-51] Revert rewrite for coalesce, `nvl` and `nvl2`
simplification (#18567)
2677c27541 is described below
commit 2677c27541d9ec568434b8b99f136e45c3d383bf
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Nov 10 06:53:52 2025 -0500
[branch-51] Revert rewrite for coalesce, `nvl` and `nvl2` simplification
(#18567)
Note this targets the branch-51 release branch
## Which issue does this PR close?
- part of https://github.com/apache/datafusion/issues/17558
- resolves https://github.com/apache/datafusion/issues/17801 in the 51
release branch
## Rationale for this change
- We merged some clever rewrites for `coalesce` and `nvl2` to use `CASE`
which are faster and more correct (👏 @chenkovsky @kosiew )
- However, these rewrites cause subtle schema mismatches in some cases
planning (b/c the CASE simplification nullability logic can't determine
the correct nullability in some cases - see
https://github.com/apache/datafusion/issues/17801)
- @pepijnve has some heroic efforts to fix the schema mismatch in
https://github.com/apache/datafusion/pull/17813#issuecomment-3508025785,
but it is non trivial and I am worried about merging it so close to the
51 release and introducing new edge cases
## What changes are included in this PR?
1. Revert https://github.com/apache/datafusion/pull/17357 /
e5dcc8c04f9559f8af6efea3c7ff8202f3c1c618
3. Revert https://github.com/apache/datafusion/pull/17991 /
ea83c2644eb559e55401ce2f7f975032e8d7845d
2. Revert https://github.com/apache/datafusion/pull/18191 /
22c4214fe1ca3953932f3f12ccd5b68dbfbefdf3
2. Cherry-pick 62022545af09346ed578e2c7c61a7c897e2ccfc0, a test that
reproduces the schema mismatch issue (from
https://github.com/apache/datafusion/pull/18536)
3. Cherry-pick 735cacf71d8632d786bfb920126ce6719c42156d, a fix for the
benchmarks that regressed due to the revert (from
https://github.com/apache/datafusion/pull/17833)
4. Update datafusion-testing (see separate PR here) for extended tests
(see https://github.com/apache/datafusion-testing/pull/15)
## Are these changes tested?
Yes I added a new test
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
---
datafusion-testing | 2 +-
datafusion/core/benches/sql_planner.rs | 3 -
.../core/tests/dataframe/dataframe_functions.rs | 27 ---
datafusion/core/tests/expr_api/mod.rs | 20 --
datafusion/core/tests/tpcds_planning.rs | 11 +-
datafusion/expr/src/udf.rs | 55 +----
datafusion/functions/src/core/coalesce.rs | 99 +++++----
datafusion/functions/src/core/nvl.rs | 240 +++++++++++++++++----
datafusion/functions/src/core/nvl2.rs | 85 ++++----
.../optimizer/src/common_subexpr_eliminate.rs | 6 +-
datafusion/sqllogictest/test_files/nvl.slt | 35 ---
datafusion/sqllogictest/test_files/select.slt | 18 +-
.../sqllogictest/test_files/string/string_view.slt | 2 +-
docs/source/user-guide/sql/scalar_functions.md | 2 +-
14 files changed, 329 insertions(+), 276 deletions(-)
diff --git a/datafusion-testing b/datafusion-testing
index eccb0e4a42..8ad3ac00c1 160000
--- a/datafusion-testing
+++ b/datafusion-testing
@@ -1 +1 @@
-Subproject commit eccb0e4a426344ef3faf534cd60e02e9c3afd3ac
+Subproject commit 8ad3ac00c1990d44a99fb6738d7e444f0ccf76a0
diff --git a/datafusion/core/benches/sql_planner.rs
b/datafusion/core/benches/sql_planner.rs
index 6266a7184c..7f11899af6 100644
--- a/datafusion/core/benches/sql_planner.rs
+++ b/datafusion/core/benches/sql_planner.rs
@@ -477,9 +477,6 @@ fn criterion_benchmark(c: &mut Criterion) {
};
let raw_tpcds_sql_queries = (1..100)
- // skip query 75 until it is fixed
- // https://github.com/apache/datafusion/issues/17801
- .filter(|q| *q != 75)
.map(|q|
std::fs::read_to_string(format!("{tests_path}tpc-ds/{q}.sql")).unwrap())
.collect::<Vec<_>>();
diff --git a/datafusion/core/tests/dataframe/dataframe_functions.rs
b/datafusion/core/tests/dataframe/dataframe_functions.rs
index 265862ff9a..e7f2603604 100644
--- a/datafusion/core/tests/dataframe/dataframe_functions.rs
+++ b/datafusion/core/tests/dataframe/dataframe_functions.rs
@@ -274,33 +274,6 @@ async fn test_nvl2() -> Result<()> {
Ok(())
}
-
-#[tokio::test]
-async fn test_nvl2_short_circuit() -> Result<()> {
- let expr = nvl2(
- col("a"),
- arrow_cast(lit("1"), lit("Int32")),
- arrow_cast(col("a"), lit("Int32")),
- );
-
- let batches = get_batches(expr).await?;
-
- assert_snapshot!(
- batches_to_string(&batches),
- @r#"
-
+-----------------------------------------------------------------------------------+
- |
nvl2(test.a,arrow_cast(Utf8("1"),Utf8("Int32")),arrow_cast(test.a,Utf8("Int32")))
|
-
+-----------------------------------------------------------------------------------+
- | 1
|
- | 1
|
- | 1
|
- | 1
|
-
+-----------------------------------------------------------------------------------+
- "#
- );
-
- Ok(())
-}
#[tokio::test]
async fn test_fn_arrow_typeof() -> Result<()> {
let expr = arrow_typeof(col("l"));
diff --git a/datafusion/core/tests/expr_api/mod.rs
b/datafusion/core/tests/expr_api/mod.rs
index 84e644480a..4aee274de9 100644
--- a/datafusion/core/tests/expr_api/mod.rs
+++ b/datafusion/core/tests/expr_api/mod.rs
@@ -320,26 +320,6 @@ async fn test_create_physical_expr() {
create_simplified_expr_test(lit(1i32) + lit(2i32), "3");
}
-#[test]
-fn test_create_physical_expr_nvl2() {
- let batch = &TEST_BATCH;
- let df_schema = DFSchema::try_from(batch.schema()).unwrap();
- let ctx = SessionContext::new();
-
- let expect_err = |expr| {
- let physical_expr = ctx.create_physical_expr(expr,
&df_schema).unwrap();
- let err = physical_expr.evaluate(batch).unwrap_err();
- assert!(
- err.to_string()
- .contains("nvl2 should have been simplified to case"),
- "unexpected error: {err:?}"
- );
- };
-
- expect_err(nvl2(col("i"), lit(1i64), lit(0i64)));
- expect_err(nvl2(lit(1i64), col("i"), lit(0i64)));
-}
-
#[tokio::test]
async fn test_create_physical_expr_coercion() {
// create_physical_expr does apply type coercion and unwrapping in cast
diff --git a/datafusion/core/tests/tpcds_planning.rs
b/datafusion/core/tests/tpcds_planning.rs
index 252d76d0f9..00e1b8724d 100644
--- a/datafusion/core/tests/tpcds_planning.rs
+++ b/datafusion/core/tests/tpcds_planning.rs
@@ -1051,10 +1051,13 @@ async fn regression_test(query_no: u8, create_physical:
bool) -> Result<()> {
for sql in &sql {
let df = ctx.sql(sql).await?;
- let (state, plan) = df.into_parts();
- let plan = state.optimize(&plan)?;
- if create_physical {
- let _ = state.create_physical_plan(&plan).await?;
+ // attempt to mimic planning steps
+ if !create_physical {
+ let (state, plan) = df.into_parts();
+ let _ = state.optimize(&plan)?;
+ } else {
+ // this is what df.execute() does internally
+ let _ = df.create_physical_plan().await?;
}
}
diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs
index fd54bb13a6..c1a55bcfd4 100644
--- a/datafusion/expr/src/udf.rs
+++ b/datafusion/expr/src/udf.rs
@@ -252,21 +252,7 @@ impl ScalarUDF {
Ok(result)
}
- /// Determines which of the arguments passed to this function are
evaluated eagerly
- /// and which may be evaluated lazily.
- ///
- /// See [ScalarUDFImpl::conditional_arguments] for more information.
- pub fn conditional_arguments<'a>(
- &self,
- args: &'a [Expr],
- ) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
- self.inner.conditional_arguments(args)
- }
-
- /// Returns true if some of this `exprs` subexpressions may not be
evaluated
- /// and thus any side effects (like divide by zero) may not be encountered.
- ///
- /// See [ScalarUDFImpl::short_circuits] for more information.
+ /// Get the circuits of inner implementation
pub fn short_circuits(&self) -> bool {
self.inner.short_circuits()
}
@@ -696,42 +682,10 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send +
Sync {
///
/// Setting this to true prevents certain optimizations such as common
/// subexpression elimination
- ///
- /// When overriding this function to return `true`,
[ScalarUDFImpl::conditional_arguments] can also be
- /// overridden to report more accurately which arguments are eagerly
evaluated and which ones
- /// lazily.
fn short_circuits(&self) -> bool {
false
}
- /// Determines which of the arguments passed to this function are
evaluated eagerly
- /// and which may be evaluated lazily.
- ///
- /// If this function returns `None`, all arguments are eagerly evaluated.
- /// Returning `None` is a micro optimization that saves a needless `Vec`
- /// allocation.
- ///
- /// If the function returns `Some`, returns (`eager`, `lazy`) where `eager`
- /// are the arguments that are always evaluated, and `lazy` are the
- /// arguments that may be evaluated lazily (i.e. may not be evaluated at
all
- /// in some cases).
- ///
- /// Implementations must ensure that the two returned `Vec`s are disjunct,
- /// and that each argument from `args` is present in one the two `Vec`s.
- ///
- /// When overriding this function, [ScalarUDFImpl::short_circuits] must
- /// be overridden to return `true`.
- fn conditional_arguments<'a>(
- &self,
- args: &'a [Expr],
- ) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
- if self.short_circuits() {
- Some((vec![], args.iter().collect()))
- } else {
- None
- }
- }
-
/// Computes the output [`Interval`] for a [`ScalarUDFImpl`], given the
input
/// intervals.
///
@@ -921,13 +875,6 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
self.inner.simplify(args, info)
}
- fn conditional_arguments<'a>(
- &self,
- args: &'a [Expr],
- ) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
- self.inner.conditional_arguments(args)
- }
-
fn short_circuits(&self) -> bool {
self.inner.short_circuits()
}
diff --git a/datafusion/functions/src/core/coalesce.rs
b/datafusion/functions/src/core/coalesce.rs
index aab1f445d5..b0f3483513 100644
--- a/datafusion/functions/src/core/coalesce.rs
+++ b/datafusion/functions/src/core/coalesce.rs
@@ -15,13 +15,14 @@
// specific language governing permissions and limitations
// under the License.
+use arrow::array::{new_null_array, BooleanArray};
+use arrow::compute::kernels::zip::zip;
+use arrow::compute::{and, is_not_null, is_null};
use arrow::datatypes::{DataType, Field, FieldRef};
-use datafusion_common::{exec_err, internal_err, plan_err, Result};
+use datafusion_common::{exec_err, internal_err, Result};
use datafusion_expr::binary::try_type_union_resolution;
-use datafusion_expr::conditional_expressions::CaseBuilder;
-use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
use datafusion_expr::{
- ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs,
+ ColumnarValue, Documentation, ReturnFieldArgs, ScalarFunctionArgs,
};
use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
use datafusion_macros::user_doc;
@@ -47,7 +48,7 @@ use std::any::Any;
)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct CoalesceFunc {
- pub(super) signature: Signature,
+ signature: Signature,
}
impl Default for CoalesceFunc {
@@ -94,45 +95,61 @@ impl ScalarUDFImpl for CoalesceFunc {
Ok(Field::new(self.name(), return_type, nullable).into())
}
- fn simplify(
- &self,
- args: Vec<Expr>,
- _info: &dyn SimplifyInfo,
- ) -> Result<ExprSimplifyResult> {
+ /// coalesce evaluates to the first value which is not NULL
+ fn invoke_with_args(&self, args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
+ let args = args.args;
+ // do not accept 0 arguments.
if args.is_empty() {
- return plan_err!("coalesce must have at least one argument");
- }
- if args.len() == 1 {
- return Ok(ExprSimplifyResult::Simplified(
- args.into_iter().next().unwrap(),
- ));
+ return exec_err!(
+ "coalesce was called with {} arguments. It requires at least
1.",
+ args.len()
+ );
}
- let n = args.len();
- let (init, last_elem) = args.split_at(n - 1);
- let whens = init
- .iter()
- .map(|x| x.clone().is_not_null())
- .collect::<Vec<_>>();
- let cases = init.to_vec();
- Ok(ExprSimplifyResult::Simplified(
- CaseBuilder::new(None, whens, cases,
Some(Box::new(last_elem[0].clone())))
- .end()?,
- ))
- }
-
- /// coalesce evaluates to the first value which is not NULL
- fn invoke_with_args(&self, _args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
- internal_err!("coalesce should have been simplified to case")
- }
-
- fn conditional_arguments<'a>(
- &self,
- args: &'a [Expr],
- ) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
- let eager = vec![&args[0]];
- let lazy = args[1..].iter().collect();
- Some((eager, lazy))
+ let return_type = args[0].data_type();
+ let mut return_array = args.iter().filter_map(|x| match x {
+ ColumnarValue::Array(array) => Some(array.len()),
+ _ => None,
+ });
+
+ if let Some(size) = return_array.next() {
+ // start with nulls as default output
+ let mut current_value = new_null_array(&return_type, size);
+ let mut remainder = BooleanArray::from(vec![true; size]);
+
+ for arg in args {
+ match arg {
+ ColumnarValue::Array(ref array) => {
+ let to_apply = and(&remainder,
&is_not_null(array.as_ref())?)?;
+ current_value = zip(&to_apply, array, ¤t_value)?;
+ remainder = and(&remainder, &is_null(array)?)?;
+ }
+ ColumnarValue::Scalar(value) => {
+ if value.is_null() {
+ continue;
+ } else {
+ let last_value = value.to_scalar()?;
+ current_value = zip(&remainder, &last_value,
¤t_value)?;
+ break;
+ }
+ }
+ }
+ if remainder.iter().all(|x| x == Some(false)) {
+ break;
+ }
+ }
+ Ok(ColumnarValue::Array(current_value))
+ } else {
+ let result = args
+ .iter()
+ .filter_map(|x| match x {
+ ColumnarValue::Scalar(s) if !s.is_null() =>
Some(x.clone()),
+ _ => None,
+ })
+ .next()
+ .unwrap_or_else(|| args[0].clone());
+ Ok(result)
+ }
}
fn short_circuits(&self) -> bool {
diff --git a/datafusion/functions/src/core/nvl.rs
b/datafusion/functions/src/core/nvl.rs
index 0b9968a88f..c8b34c4b17 100644
--- a/datafusion/functions/src/core/nvl.rs
+++ b/datafusion/functions/src/core/nvl.rs
@@ -15,19 +15,21 @@
// specific language governing permissions and limitations
// under the License.
-use crate::core::coalesce::CoalesceFunc;
-use arrow::datatypes::{DataType, FieldRef};
-use datafusion_common::Result;
-use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
+use arrow::array::Array;
+use arrow::compute::is_not_null;
+use arrow::compute::kernels::zip::zip;
+use arrow::datatypes::DataType;
+use datafusion_common::{utils::take_function_args, Result};
use datafusion_expr::{
- ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs,
- ScalarUDFImpl, Signature, Volatility,
+ ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
+ Volatility,
};
use datafusion_macros::user_doc;
+use std::sync::Arc;
#[user_doc(
doc_section(label = "Conditional Functions"),
- description = "Returns _expression2_ if _expression1_ is NULL otherwise it
returns _expression1_ and _expression2_ is not evaluated. This function can be
used to substitute a default value for NULL values.",
+ description = "Returns _expression2_ if _expression1_ is NULL otherwise it
returns _expression1_.",
syntax_example = "nvl(expression1, expression2)",
sql_example = r#"```sql
> select nvl(null, 'a');
@@ -55,7 +57,7 @@ use datafusion_macros::user_doc;
)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct NVLFunc {
- coalesce: CoalesceFunc,
+ signature: Signature,
aliases: Vec<String>,
}
@@ -88,13 +90,11 @@ impl Default for NVLFunc {
impl NVLFunc {
pub fn new() -> Self {
Self {
- coalesce: CoalesceFunc {
- signature: Signature::uniform(
- 2,
- SUPPORTED_NVL_TYPES.to_vec(),
- Volatility::Immutable,
- ),
- },
+ signature: Signature::uniform(
+ 2,
+ SUPPORTED_NVL_TYPES.to_vec(),
+ Volatility::Immutable,
+ ),
aliases: vec![String::from("ifnull")],
}
}
@@ -110,45 +110,209 @@ impl ScalarUDFImpl for NVLFunc {
}
fn signature(&self) -> &Signature {
- &self.coalesce.signature
+ &self.signature
}
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
- self.coalesce.return_type(arg_types)
+ Ok(arg_types[0].clone())
}
- fn return_field_from_args(&self, args: ReturnFieldArgs) ->
Result<FieldRef> {
- self.coalesce.return_field_from_args(args)
+ fn invoke_with_args(&self, args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
+ nvl_func(&args.args)
}
- fn simplify(
- &self,
- args: Vec<Expr>,
- info: &dyn SimplifyInfo,
- ) -> Result<ExprSimplifyResult> {
- self.coalesce.simplify(args, info)
+ fn aliases(&self) -> &[String] {
+ &self.aliases
}
- fn invoke_with_args(&self, args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
- self.coalesce.invoke_with_args(args)
+ fn documentation(&self) -> Option<&Documentation> {
+ self.doc()
}
+}
+
+fn nvl_func(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ let [lhs, rhs] = take_function_args("nvl/ifnull", args)?;
+ let (lhs_array, rhs_array) = match (lhs, rhs) {
+ (ColumnarValue::Array(lhs), ColumnarValue::Scalar(rhs)) => {
+ (Arc::clone(lhs), rhs.to_array_of_size(lhs.len())?)
+ }
+ (ColumnarValue::Array(lhs), ColumnarValue::Array(rhs)) => {
+ (Arc::clone(lhs), Arc::clone(rhs))
+ }
+ (ColumnarValue::Scalar(lhs), ColumnarValue::Array(rhs)) => {
+ (lhs.to_array_of_size(rhs.len())?, Arc::clone(rhs))
+ }
+ (ColumnarValue::Scalar(lhs), ColumnarValue::Scalar(rhs)) => {
+ let mut current_value = lhs;
+ if lhs.is_null() {
+ current_value = rhs;
+ }
+ return Ok(ColumnarValue::Scalar(current_value.clone()));
+ }
+ };
+ let to_apply = is_not_null(&lhs_array)?;
+ let value = zip(&to_apply, &lhs_array, &rhs_array)?;
+ Ok(ColumnarValue::Array(value))
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use arrow::array::*;
+
+ use super::*;
+ use datafusion_common::ScalarValue;
+
+ #[test]
+ fn nvl_int32() -> Result<()> {
+ let a = Int32Array::from(vec![
+ Some(1),
+ Some(2),
+ None,
+ None,
+ Some(3),
+ None,
+ None,
+ Some(4),
+ Some(5),
+ ]);
+ let a = ColumnarValue::Array(Arc::new(a));
+
+ let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(6i32)));
- fn conditional_arguments<'a>(
- &self,
- args: &'a [Expr],
- ) -> Option<(Vec<&'a Expr>, Vec<&'a Expr>)> {
- self.coalesce.conditional_arguments(args)
+ let result = nvl_func(&[a, lit_array])?;
+ let result = result.into_array(0).expect("Failed to convert to array");
+
+ let expected = Arc::new(Int32Array::from(vec![
+ Some(1),
+ Some(2),
+ Some(6),
+ Some(6),
+ Some(3),
+ Some(6),
+ Some(6),
+ Some(4),
+ Some(5),
+ ])) as ArrayRef;
+ assert_eq!(expected.as_ref(), result.as_ref());
+ Ok(())
}
- fn short_circuits(&self) -> bool {
- self.coalesce.short_circuits()
+ #[test]
+ // Ensure that arrays with no nulls can also invoke nvl() correctly
+ fn nvl_int32_non_nulls() -> Result<()> {
+ let a = Int32Array::from(vec![1, 3, 10, 7, 8, 1, 2, 4, 5]);
+ let a = ColumnarValue::Array(Arc::new(a));
+
+ let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(20i32)));
+
+ let result = nvl_func(&[a, lit_array])?;
+ let result = result.into_array(0).expect("Failed to convert to array");
+
+ let expected = Arc::new(Int32Array::from(vec![
+ Some(1),
+ Some(3),
+ Some(10),
+ Some(7),
+ Some(8),
+ Some(1),
+ Some(2),
+ Some(4),
+ Some(5),
+ ])) as ArrayRef;
+ assert_eq!(expected.as_ref(), result.as_ref());
+ Ok(())
}
- fn aliases(&self) -> &[String] {
- &self.aliases
+ #[test]
+ fn nvl_boolean() -> Result<()> {
+ let a = BooleanArray::from(vec![Some(true), Some(false), None]);
+ let a = ColumnarValue::Array(Arc::new(a));
+
+ let lit_array =
ColumnarValue::Scalar(ScalarValue::Boolean(Some(false)));
+
+ let result = nvl_func(&[a, lit_array])?;
+ let result = result.into_array(0).expect("Failed to convert to array");
+
+ let expected = Arc::new(BooleanArray::from(vec![
+ Some(true),
+ Some(false),
+ Some(false),
+ ])) as ArrayRef;
+
+ assert_eq!(expected.as_ref(), result.as_ref());
+ Ok(())
}
- fn documentation(&self) -> Option<&Documentation> {
- self.doc()
+ #[test]
+ fn nvl_string() -> Result<()> {
+ let a = StringArray::from(vec![Some("foo"), Some("bar"), None,
Some("baz")]);
+ let a = ColumnarValue::Array(Arc::new(a));
+
+ let lit_array = ColumnarValue::Scalar(ScalarValue::from("bax"));
+
+ let result = nvl_func(&[a, lit_array])?;
+ let result = result.into_array(0).expect("Failed to convert to array");
+
+ let expected = Arc::new(StringArray::from(vec![
+ Some("foo"),
+ Some("bar"),
+ Some("bax"),
+ Some("baz"),
+ ])) as ArrayRef;
+
+ assert_eq!(expected.as_ref(), result.as_ref());
+ Ok(())
+ }
+
+ #[test]
+ fn nvl_literal_first() -> Result<()> {
+ let a = Int32Array::from(vec![Some(1), Some(2), None, None, Some(3),
Some(4)]);
+ let a = ColumnarValue::Array(Arc::new(a));
+
+ let lit_array = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32)));
+
+ let result = nvl_func(&[lit_array, a])?;
+ let result = result.into_array(0).expect("Failed to convert to array");
+
+ let expected = Arc::new(Int32Array::from(vec![
+ Some(2),
+ Some(2),
+ Some(2),
+ Some(2),
+ Some(2),
+ Some(2),
+ ])) as ArrayRef;
+ assert_eq!(expected.as_ref(), result.as_ref());
+ Ok(())
+ }
+
+ #[test]
+ fn nvl_scalar() -> Result<()> {
+ let a_null = ColumnarValue::Scalar(ScalarValue::Int32(None));
+ let b_null = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32)));
+
+ let result_null = nvl_func(&[a_null, b_null])?;
+ let result_null = result_null
+ .into_array(1)
+ .expect("Failed to convert to array");
+
+ let expected_null = Arc::new(Int32Array::from(vec![Some(2i32)])) as
ArrayRef;
+
+ assert_eq!(expected_null.as_ref(), result_null.as_ref());
+
+ let a_nnull = ColumnarValue::Scalar(ScalarValue::Int32(Some(2i32)));
+ let b_nnull = ColumnarValue::Scalar(ScalarValue::Int32(Some(1i32)));
+
+ let result_nnull = nvl_func(&[a_nnull, b_nnull])?;
+ let result_nnull = result_nnull
+ .into_array(1)
+ .expect("Failed to convert to array");
+
+ let expected_nnull = Arc::new(Int32Array::from(vec![Some(2i32)])) as
ArrayRef;
+ assert_eq!(expected_nnull.as_ref(), result_nnull.as_ref());
+
+ Ok(())
}
}
diff --git a/datafusion/functions/src/core/nvl2.rs
b/datafusion/functions/src/core/nvl2.rs
index 45cb6760d0..82aa8d2a4c 100644
--- a/datafusion/functions/src/core/nvl2.rs
+++ b/datafusion/functions/src/core/nvl2.rs
@@ -15,16 +15,17 @@
// specific language governing permissions and limitations
// under the License.
-use arrow::datatypes::{DataType, Field, FieldRef};
+use arrow::array::Array;
+use arrow::compute::is_not_null;
+use arrow::compute::kernels::zip::zip;
+use arrow::datatypes::DataType;
use datafusion_common::{internal_err, utils::take_function_args, Result};
use datafusion_expr::{
- conditional_expressions::CaseBuilder,
- simplify::{ExprSimplifyResult, SimplifyInfo},
- type_coercion::binary::comparison_coercion,
- ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs,
- ScalarUDFImpl, Signature, Volatility,
+ type_coercion::binary::comparison_coercion, ColumnarValue, Documentation,
+ ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
};
use datafusion_macros::user_doc;
+use std::sync::Arc;
#[user_doc(
doc_section(label = "Conditional Functions"),
@@ -94,37 +95,8 @@ impl ScalarUDFImpl for NVL2Func {
Ok(arg_types[1].clone())
}
- fn return_field_from_args(&self, args: ReturnFieldArgs) ->
Result<FieldRef> {
- let nullable =
- args.arg_fields[1].is_nullable() ||
args.arg_fields[2].is_nullable();
- let return_type = args.arg_fields[1].data_type().clone();
- Ok(Field::new(self.name(), return_type, nullable).into())
- }
-
- fn invoke_with_args(&self, _args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
- internal_err!("nvl2 should have been simplified to case")
- }
-
- fn simplify(
- &self,
- args: Vec<Expr>,
- _info: &dyn SimplifyInfo,
- ) -> Result<ExprSimplifyResult> {
- let [test, if_non_null, if_null] = take_function_args(self.name(),
args)?;
-
- let expr = CaseBuilder::new(
- None,
- vec![test.is_not_null()],
- vec![if_non_null],
- Some(Box::new(if_null)),
- )
- .end()?;
-
- Ok(ExprSimplifyResult::Simplified(expr))
- }
-
- fn short_circuits(&self) -> bool {
- true
+ fn invoke_with_args(&self, args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
+ nvl2_func(&args.args)
}
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
@@ -151,3 +123,42 @@ impl ScalarUDFImpl for NVL2Func {
self.doc()
}
}
+
+fn nvl2_func(args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ let mut len = 1;
+ let mut is_array = false;
+ for arg in args {
+ if let ColumnarValue::Array(array) = arg {
+ len = array.len();
+ is_array = true;
+ break;
+ }
+ }
+ if is_array {
+ let args = args
+ .iter()
+ .map(|arg| match arg {
+ ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(len),
+ ColumnarValue::Array(array) => Ok(Arc::clone(array)),
+ })
+ .collect::<Result<Vec<_>>>()?;
+ let [tested, if_non_null, if_null] = take_function_args("nvl2", args)?;
+ let to_apply = is_not_null(&tested)?;
+ let value = zip(&to_apply, &if_non_null, &if_null)?;
+ Ok(ColumnarValue::Array(value))
+ } else {
+ let [tested, if_non_null, if_null] = take_function_args("nvl2", args)?;
+ match &tested {
+ ColumnarValue::Array(_) => {
+ internal_err!("except Scalar value, but got Array")
+ }
+ ColumnarValue::Scalar(scalar) => {
+ if scalar.is_null() {
+ Ok(if_null.clone())
+ } else {
+ Ok(if_non_null.clone())
+ }
+ }
+ }
+ }
+}
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index 2510068494..ec1f8f991a 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -652,8 +652,10 @@ impl CSEController for ExprCSEController<'_> {
// In case of `ScalarFunction`s we don't know which children are
surely
// executed so start visiting all children conditionally and stop
the
// recursion with `TreeNodeRecursion::Jump`.
- Expr::ScalarFunction(ScalarFunction { func, args }) => {
- func.conditional_arguments(args)
+ Expr::ScalarFunction(ScalarFunction { func, args })
+ if func.short_circuits() =>
+ {
+ Some((vec![], args.iter().collect()))
}
// In case of `And` and `Or` the first child is surely executed,
but we
diff --git a/datafusion/sqllogictest/test_files/nvl.slt
b/datafusion/sqllogictest/test_files/nvl.slt
index f4225148ab..daab54307c 100644
--- a/datafusion/sqllogictest/test_files/nvl.slt
+++ b/datafusion/sqllogictest/test_files/nvl.slt
@@ -148,38 +148,3 @@ query T
SELECT NVL(arrow_cast('a', 'Utf8View'), NULL);
----
a
-
-# nvl is implemented as a case, and short-circuits evaluation
-# so the following query should not error
-query I
-SELECT NVL(1, 1/0);
-----
-1
-
-# but this one should
-query error DataFusion error: Arrow error: Divide by zero error
-SELECT NVL(NULL, 1/0);
-
-# Expect the query plan to show nvl as a case expression
-query I
-select NVL(int_field, 9999) FROM test;
-----
-1
-2
-3
-9999
-4
-9999
-
-# Expect the query plan to show nvl as a case expression
-query TT
-EXPLAIN select NVL(int_field, 9999) FROM test;
-----
-logical_plan
-01)Projection: CASE WHEN __common_expr_1 IS NOT NULL THEN __common_expr_1 ELSE
Int64(9999) END AS nvl(test.int_field,Int64(9999))
-02)--Projection: CAST(test.int_field AS Int64) AS __common_expr_1
-03)----TableScan: test projection=[int_field]
-physical_plan
-01)ProjectionExec: expr=[CASE WHEN __common_expr_1@0 IS NOT NULL THEN
__common_expr_1@0 ELSE 9999 END as nvl(test.int_field,Int64(9999))]
-02)--ProjectionExec: expr=[CAST(int_field@0 AS Int64) as __common_expr_1]
-03)----DataSourceExec: partitions=1, partition_sizes=[1]
diff --git a/datafusion/sqllogictest/test_files/select.slt
b/datafusion/sqllogictest/test_files/select.slt
index 5c684eb83d..598a587bfe 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -1656,10 +1656,10 @@ query TT
explain select coalesce(1, y/x), coalesce(2, y/x) from t;
----
logical_plan
-01)Projection: Int64(1) AS coalesce(Int64(1),t.y / t.x), Int64(2) AS
coalesce(Int64(2),t.y / t.x)
-02)--TableScan: t projection=[]
+01)Projection: coalesce(Int64(1), CAST(t.y / t.x AS Int64)),
coalesce(Int64(2), CAST(t.y / t.x AS Int64))
+02)--TableScan: t projection=[x, y]
physical_plan
-01)ProjectionExec: expr=[1 as coalesce(Int64(1),t.y / t.x), 2 as
coalesce(Int64(2),t.y / t.x)]
+01)ProjectionExec: expr=[coalesce(1, CAST(y@1 / x@0 AS Int64)) as
coalesce(Int64(1),t.y / t.x), coalesce(2, CAST(y@1 / x@0 AS Int64)) as
coalesce(Int64(2),t.y / t.x)]
02)--DataSourceExec: partitions=1, partition_sizes=[1]
query TT
@@ -1686,17 +1686,11 @@ physical_plan
02)--ProjectionExec: expr=[y@1 = 0 as __common_expr_1, x@0 as x, y@1 as y]
03)----DataSourceExec: partitions=1, partition_sizes=[1]
-query II
-select coalesce(1, y/x), coalesce(2, y/x) from t;
-----
-1 2
-1 2
-1 2
-1 2
-1 2
-
# due to the reason describe in
https://github.com/apache/datafusion/issues/8927,
# the following queries will fail
+query error
+select coalesce(1, y/x), coalesce(2, y/x) from t;
+
query error
SELECT y > 0 and 1 / y < 1, x > 0 and y > 0 and 1 / y < 1 / x from t;
diff --git a/datafusion/sqllogictest/test_files/string/string_view.slt
b/datafusion/sqllogictest/test_files/string/string_view.slt
index 4d30f572ad..fb67daa0b8 100644
--- a/datafusion/sqllogictest/test_files/string/string_view.slt
+++ b/datafusion/sqllogictest/test_files/string/string_view.slt
@@ -988,7 +988,7 @@ query TT
EXPLAIN SELECT NVL(column1_utf8view, 'a') as c2 FROM test;
----
logical_plan
-01)Projection: CASE WHEN test.column1_utf8view IS NOT NULL THEN
test.column1_utf8view ELSE Utf8View("a") END AS c2
+01)Projection: nvl(test.column1_utf8view, Utf8View("a")) AS c2
02)--TableScan: test projection=[column1_utf8view]
## Ensure no casts for nullif
diff --git a/docs/source/user-guide/sql/scalar_functions.md
b/docs/source/user-guide/sql/scalar_functions.md
index 7c88d1fd9c..6df14d13ee 100644
--- a/docs/source/user-guide/sql/scalar_functions.md
+++ b/docs/source/user-guide/sql/scalar_functions.md
@@ -1056,7 +1056,7 @@ nullif(expression1, expression2)
### `nvl`
-Returns _expression2_ if _expression1_ is NULL otherwise it returns
_expression1_ and _expression2_ is not evaluated. This function can be used to
substitute a default value for NULL values.
+Returns _expression2_ if _expression1_ is NULL otherwise it returns
_expression1_.
```sql
nvl(expression1, expression2)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]