This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c02cad2b5 Support `IS NULL` and `IS NOT NULL` on Unions (#11321)
0c02cad2b5 is described below

commit 0c02cad2b5f9cd3adfc7acf6b7bba32a222c50b0
Author: Samuel Colvin <[email protected]>
AuthorDate: Mon Jul 8 17:58:14 2024 +0100

    Support `IS NULL` and `IS NOT NULL` on Unions (#11321)
    
    * Demonstrate unions can't be null
    
    * add scalar test cases
    
    * support "IS NULL" and "IS NOT NULL" on unions
    
    * formatting
    
    * fix comments from @alamb
    
    * fix docstring
---
 datafusion/common/src/scalar/mod.rs                |  34 ++++-
 datafusion/core/tests/dataframe/mod.rs             | 165 ++++++++++++++++++++-
 .../physical-expr/src/expressions/is_not_null.rs   |  54 ++++++-
 .../physical-expr/src/expressions/is_null.rs       | 129 +++++++++++++++-
 4 files changed, 373 insertions(+), 9 deletions(-)

diff --git a/datafusion/common/src/scalar/mod.rs 
b/datafusion/common/src/scalar/mod.rs
index 55ce76c4b9..26e03a3b98 100644
--- a/datafusion/common/src/scalar/mod.rs
+++ b/datafusion/common/src/scalar/mod.rs
@@ -1459,7 +1459,10 @@ impl ScalarValue {
             ScalarValue::DurationMillisecond(v) => v.is_none(),
             ScalarValue::DurationMicrosecond(v) => v.is_none(),
             ScalarValue::DurationNanosecond(v) => v.is_none(),
-            ScalarValue::Union(v, _, _) => v.is_none(),
+            ScalarValue::Union(v, _, _) => match v {
+                Some((_, s)) => s.is_null(),
+                None => true,
+            },
             ScalarValue::Dictionary(_, v) => v.is_null(),
         }
     }
@@ -6514,4 +6517,33 @@ mod tests {
         }
         intervals
     }
+
+    fn union_fields() -> UnionFields {
+        [
+            (0, Arc::new(Field::new("A", DataType::Int32, true))),
+            (1, Arc::new(Field::new("B", DataType::Float64, true))),
+        ]
+        .into_iter()
+        .collect()
+    }
+
+    #[test]
+    fn sparse_scalar_union_is_null() {
+        let sparse_scalar = ScalarValue::Union(
+            Some((0_i8, Box::new(ScalarValue::Int32(None)))),
+            union_fields(),
+            UnionMode::Sparse,
+        );
+        assert!(sparse_scalar.is_null());
+    }
+
+    #[test]
+    fn dense_scalar_union_is_null() {
+        let dense_scalar = ScalarValue::Union(
+            Some((0_i8, Box::new(ScalarValue::Int32(None)))),
+            union_fields(),
+            UnionMode::Dense,
+        );
+        assert!(dense_scalar.is_null());
+    }
 }
diff --git a/datafusion/core/tests/dataframe/mod.rs 
b/datafusion/core/tests/dataframe/mod.rs
index e46a92e928..2d1904d9e1 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -29,8 +29,9 @@ use arrow::{
     },
     record_batch::RecordBatch,
 };
-use arrow_array::Float32Array;
-use arrow_schema::ArrowError;
+use arrow_array::{Array, Float32Array, Float64Array, UnionArray};
+use arrow_buffer::ScalarBuffer;
+use arrow_schema::{ArrowError, UnionFields, UnionMode};
 use datafusion_functions_aggregate::count::count_udaf;
 use object_store::local::LocalFileSystem;
 use std::fs;
@@ -2195,3 +2196,163 @@ async fn write_parquet_results() -> Result<()> {
 
     Ok(())
 }
+
+fn union_fields() -> UnionFields {
+    [
+        (0, Arc::new(Field::new("A", DataType::Int32, true))),
+        (1, Arc::new(Field::new("B", DataType::Float64, true))),
+        (2, Arc::new(Field::new("C", DataType::Utf8, true))),
+    ]
+    .into_iter()
+    .collect()
+}
+
+#[tokio::test]
+async fn sparse_union_is_null() {
+    // union of [{A=1}, {A=}, {B=3.2}, {B=}, {C="a"}, {C=}]
+    let int_array = Int32Array::from(vec![Some(1), None, None, None, None, 
None]);
+    let float_array = Float64Array::from(vec![None, None, Some(3.2), None, 
None, None]);
+    let str_array = StringArray::from(vec![None, None, None, None, Some("a"), 
None]);
+    let type_ids = [0, 0, 1, 1, 2, 
2].into_iter().collect::<ScalarBuffer<i8>>();
+
+    let children = vec![
+        Arc::new(int_array) as Arc<dyn Array>,
+        Arc::new(float_array),
+        Arc::new(str_array),
+    ];
+
+    let array = UnionArray::try_new(union_fields(), type_ids, None, 
children).unwrap();
+
+    let field = Field::new(
+        "my_union",
+        DataType::Union(union_fields(), UnionMode::Sparse),
+        true,
+    );
+    let schema = Arc::new(Schema::new(vec![field]));
+
+    let batch = RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap();
+
+    let ctx = SessionContext::new();
+
+    ctx.register_batch("union_batch", batch).unwrap();
+
+    let df = ctx.table("union_batch").await.unwrap();
+
+    // view_all
+    let expected = [
+        "+----------+",
+        "| my_union |",
+        "+----------+",
+        "| {A=1}    |",
+        "| {A=}     |",
+        "| {B=3.2}  |",
+        "| {B=}     |",
+        "| {C=a}    |",
+        "| {C=}     |",
+        "+----------+",
+    ];
+    assert_batches_sorted_eq!(expected, &df.clone().collect().await.unwrap());
+
+    // filter where is null
+    let result_df = df.clone().filter(col("my_union").is_null()).unwrap();
+    let expected = [
+        "+----------+",
+        "| my_union |",
+        "+----------+",
+        "| {A=}     |",
+        "| {B=}     |",
+        "| {C=}     |",
+        "+----------+",
+    ];
+    assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap());
+
+    // filter where is not null
+    let result_df = df.filter(col("my_union").is_not_null()).unwrap();
+    let expected = [
+        "+----------+",
+        "| my_union |",
+        "+----------+",
+        "| {A=1}    |",
+        "| {B=3.2}  |",
+        "| {C=a}    |",
+        "+----------+",
+    ];
+    assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap());
+}
+
+#[tokio::test]
+async fn dense_union_is_null() {
+    // union of [{A=1}, null, {B=3.2}, {A=34}]
+    let int_array = Int32Array::from(vec![Some(1), None]);
+    let float_array = Float64Array::from(vec![Some(3.2), None]);
+    let str_array = StringArray::from(vec![Some("a"), None]);
+    let type_ids = [0, 0, 1, 1, 2, 
2].into_iter().collect::<ScalarBuffer<i8>>();
+    let offsets = [0, 1, 0, 1, 0, 1]
+        .into_iter()
+        .collect::<ScalarBuffer<i32>>();
+
+    let children = vec![
+        Arc::new(int_array) as Arc<dyn Array>,
+        Arc::new(float_array),
+        Arc::new(str_array),
+    ];
+
+    let array =
+        UnionArray::try_new(union_fields(), type_ids, Some(offsets), 
children).unwrap();
+
+    let field = Field::new(
+        "my_union",
+        DataType::Union(union_fields(), UnionMode::Dense),
+        true,
+    );
+    let schema = Arc::new(Schema::new(vec![field]));
+
+    let batch = RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap();
+
+    let ctx = SessionContext::new();
+
+    ctx.register_batch("union_batch", batch).unwrap();
+
+    let df = ctx.table("union_batch").await.unwrap();
+
+    // view_all
+    let expected = [
+        "+----------+",
+        "| my_union |",
+        "+----------+",
+        "| {A=1}    |",
+        "| {A=}     |",
+        "| {B=3.2}  |",
+        "| {B=}     |",
+        "| {C=a}    |",
+        "| {C=}     |",
+        "+----------+",
+    ];
+    assert_batches_sorted_eq!(expected, &df.clone().collect().await.unwrap());
+
+    // filter where is null
+    let result_df = df.clone().filter(col("my_union").is_null()).unwrap();
+    let expected = [
+        "+----------+",
+        "| my_union |",
+        "+----------+",
+        "| {A=}     |",
+        "| {B=}     |",
+        "| {C=}     |",
+        "+----------+",
+    ];
+    assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap());
+
+    // filter where is not null
+    let result_df = df.filter(col("my_union").is_not_null()).unwrap();
+    let expected = [
+        "+----------+",
+        "| my_union |",
+        "+----------+",
+        "| {A=1}    |",
+        "| {B=3.2}  |",
+        "| {C=a}    |",
+        "+----------+",
+    ];
+    assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap());
+}
diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs 
b/datafusion/physical-expr/src/expressions/is_not_null.rs
index d8fa77585b..9f7438d13e 100644
--- a/datafusion/physical-expr/src/expressions/is_not_null.rs
+++ b/datafusion/physical-expr/src/expressions/is_not_null.rs
@@ -73,9 +73,11 @@ impl PhysicalExpr for IsNotNullExpr {
     fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
         let arg = self.arg.evaluate(batch)?;
         match arg {
-            ColumnarValue::Array(array) => Ok(ColumnarValue::Array(Arc::new(
-                compute::is_not_null(array.as_ref())?,
-            ))),
+            ColumnarValue::Array(array) => {
+                let is_null = super::is_null::compute_is_null(array)?;
+                let is_not_null = compute::not(&is_null)?;
+                Ok(ColumnarValue::Array(Arc::new(is_not_null)))
+            }
             ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(
                 ScalarValue::Boolean(Some(!scalar.is_null())),
             )),
@@ -120,6 +122,8 @@ mod tests {
         array::{BooleanArray, StringArray},
         datatypes::*,
     };
+    use arrow_array::{Array, Float64Array, Int32Array, UnionArray};
+    use arrow_buffer::ScalarBuffer;
     use datafusion_common::cast::as_boolean_array;
 
     #[test]
@@ -143,4 +147,48 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn union_is_not_null_op() {
+        // union of [{A=1}, {A=}, {B=1.1}, {B=1.2}, {B=}]
+        let int_array = Int32Array::from(vec![Some(1), None, None, None, 
None]);
+        let float_array =
+            Float64Array::from(vec![None, None, Some(1.1), Some(1.2), None]);
+        let type_ids = [0, 0, 1, 1, 
1].into_iter().collect::<ScalarBuffer<i8>>();
+
+        let children = vec![Arc::new(int_array) as Arc<dyn Array>, 
Arc::new(float_array)];
+
+        let union_fields: UnionFields = [
+            (0, Arc::new(Field::new("A", DataType::Int32, true))),
+            (1, Arc::new(Field::new("B", DataType::Float64, true))),
+        ]
+        .into_iter()
+        .collect();
+
+        let array =
+            UnionArray::try_new(union_fields.clone(), type_ids, None, 
children).unwrap();
+
+        let field = Field::new(
+            "my_union",
+            DataType::Union(union_fields, UnionMode::Sparse),
+            true,
+        );
+
+        let schema = Schema::new(vec![field]);
+        let expr = is_not_null(col("my_union", &schema).unwrap()).unwrap();
+        let batch =
+            RecordBatch::try_new(Arc::new(schema), 
vec![Arc::new(array)]).unwrap();
+
+        // expression: "a is not null"
+        let actual = expr
+            .evaluate(&batch)
+            .unwrap()
+            .into_array(batch.num_rows())
+            .expect("Failed to convert to array");
+        let actual = as_boolean_array(&actual).unwrap();
+
+        let expected = &BooleanArray::from(vec![true, false, true, true, 
false]);
+
+        assert_eq!(expected, actual);
+    }
 }
diff --git a/datafusion/physical-expr/src/expressions/is_null.rs 
b/datafusion/physical-expr/src/expressions/is_null.rs
index 41becafde6..e2dc941e26 100644
--- a/datafusion/physical-expr/src/expressions/is_null.rs
+++ b/datafusion/physical-expr/src/expressions/is_null.rs
@@ -25,6 +25,9 @@ use arrow::{
     datatypes::{DataType, Schema},
     record_batch::RecordBatch,
 };
+use arrow_array::{Array, ArrayRef, BooleanArray, Int8Array, UnionArray};
+use arrow_buffer::{BooleanBuffer, ScalarBuffer};
+use arrow_ord::cmp;
 
 use crate::physical_expr::down_cast_any_ref;
 use crate::PhysicalExpr;
@@ -74,9 +77,9 @@ impl PhysicalExpr for IsNullExpr {
     fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
         let arg = self.arg.evaluate(batch)?;
         match arg {
-            ColumnarValue::Array(array) => Ok(ColumnarValue::Array(Arc::new(
-                compute::is_null(array.as_ref())?,
-            ))),
+            ColumnarValue::Array(array) => {
+                Ok(ColumnarValue::Array(Arc::new(compute_is_null(array)?)))
+            }
             ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(
                 ScalarValue::Boolean(Some(scalar.is_null())),
             )),
@@ -100,6 +103,55 @@ impl PhysicalExpr for IsNullExpr {
     }
 }
 
+/// workaround <https://github.com/apache/arrow-rs/issues/6017>,
+/// this can be replaced with a direct call to `arrow::compute::is_null` once 
it's fixed.
+pub(crate) fn compute_is_null(array: ArrayRef) -> Result<BooleanArray> {
+    if let Some(union_array) = array.as_any().downcast_ref::<UnionArray>() {
+        if let Some(offsets) = union_array.offsets() {
+            dense_union_is_null(union_array, offsets)
+        } else {
+            sparse_union_is_null(union_array)
+        }
+    } else {
+        compute::is_null(array.as_ref()).map_err(Into::into)
+    }
+}
+
+fn dense_union_is_null(
+    union_array: &UnionArray,
+    offsets: &ScalarBuffer<i32>,
+) -> Result<BooleanArray> {
+    let child_arrays = (0..union_array.type_names().len())
+        .map(|type_id| {
+            compute::is_null(&union_array.child(type_id as 
i8)).map_err(Into::into)
+        })
+        .collect::<Result<Vec<BooleanArray>>>()?;
+
+    let buffer: BooleanBuffer = offsets
+        .iter()
+        .zip(union_array.type_ids())
+        .map(|(offset, type_id)| child_arrays[*type_id as usize].value(*offset 
as usize))
+        .collect();
+
+    Ok(BooleanArray::new(buffer, None))
+}
+
+fn sparse_union_is_null(union_array: &UnionArray) -> Result<BooleanArray> {
+    let type_ids = Int8Array::new(union_array.type_ids().clone(), None);
+
+    let mut union_is_null =
+        BooleanArray::new(BooleanBuffer::new_unset(union_array.len()), None);
+    for type_id in 0..union_array.type_names().len() {
+        let type_id = type_id as i8;
+        let union_is_child = cmp::eq(&type_ids, 
&Int8Array::new_scalar(type_id))?;
+        let child = union_array.child(type_id);
+        let child_array_is_null = compute::is_null(&child)?;
+        let child_is_null = compute::and(&union_is_child, 
&child_array_is_null)?;
+        union_is_null = compute::or(&union_is_null, &child_is_null)?;
+    }
+    Ok(union_is_null)
+}
+
 impl PartialEq<dyn Any> for IsNullExpr {
     fn eq(&self, other: &dyn Any) -> bool {
         down_cast_any_ref(other)
@@ -108,6 +160,7 @@ impl PartialEq<dyn Any> for IsNullExpr {
             .unwrap_or(false)
     }
 }
+
 /// Create an IS NULL expression
 pub fn is_null(arg: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
     Ok(Arc::new(IsNullExpr::new(arg)))
@@ -121,6 +174,8 @@ mod tests {
         array::{BooleanArray, StringArray},
         datatypes::*,
     };
+    use arrow_array::{Float64Array, Int32Array};
+    use arrow_buffer::ScalarBuffer;
     use datafusion_common::cast::as_boolean_array;
 
     #[test]
@@ -145,4 +200,72 @@ mod tests {
 
         Ok(())
     }
+
+    fn union_fields() -> UnionFields {
+        [
+            (0, Arc::new(Field::new("A", DataType::Int32, true))),
+            (1, Arc::new(Field::new("B", DataType::Float64, true))),
+            (2, Arc::new(Field::new("C", DataType::Utf8, true))),
+        ]
+        .into_iter()
+        .collect()
+    }
+
+    #[test]
+    fn sparse_union_is_null() {
+        // union of [{A=1}, {A=}, {B=1.1}, {B=1.2}, {B=}, {C=}, {C="a"}]
+        let int_array =
+            Int32Array::from(vec![Some(1), None, None, None, None, None, 
None]);
+        let float_array =
+            Float64Array::from(vec![None, None, Some(1.1), Some(1.2), None, 
None, None]);
+        let str_array =
+            StringArray::from(vec![None, None, None, None, None, None, 
Some("a")]);
+        let type_ids = [0, 0, 1, 1, 1, 2, 2]
+            .into_iter()
+            .collect::<ScalarBuffer<i8>>();
+
+        let children = vec![
+            Arc::new(int_array) as Arc<dyn Array>,
+            Arc::new(float_array),
+            Arc::new(str_array),
+        ];
+
+        let array =
+            UnionArray::try_new(union_fields(), type_ids, None, 
children).unwrap();
+
+        let array_ref = Arc::new(array) as ArrayRef;
+        let result = compute_is_null(array_ref).unwrap();
+
+        let expected =
+            &BooleanArray::from(vec![false, true, false, false, true, true, 
false]);
+        assert_eq!(expected, &result);
+    }
+
+    #[test]
+    fn dense_union_is_null() {
+        // union of [{A=1}, {A=}, {B=3.2}, {B=}, {C="a"}, {C=}]
+        let int_array = Int32Array::from(vec![Some(1), None]);
+        let float_array = Float64Array::from(vec![Some(3.2), None]);
+        let str_array = StringArray::from(vec![Some("a"), None]);
+        let type_ids = [0, 0, 1, 1, 2, 
2].into_iter().collect::<ScalarBuffer<i8>>();
+        let offsets = [0, 1, 0, 1, 0, 1]
+            .into_iter()
+            .collect::<ScalarBuffer<i32>>();
+
+        let children = vec![
+            Arc::new(int_array) as Arc<dyn Array>,
+            Arc::new(float_array),
+            Arc::new(str_array),
+        ];
+
+        let array =
+            UnionArray::try_new(union_fields(), type_ids, Some(offsets), 
children)
+                .unwrap();
+
+        let array_ref = Arc::new(array) as ArrayRef;
+        let result = compute_is_null(array_ref).unwrap();
+
+        let expected = &BooleanArray::from(vec![false, true, false, true, 
false, true]);
+        assert_eq!(expected, &result);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to