This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0c02cad2b5 Support `IS NULL` and `IS NOT NULL` on Unions (#11321)
0c02cad2b5 is described below
commit 0c02cad2b5f9cd3adfc7acf6b7bba32a222c50b0
Author: Samuel Colvin <[email protected]>
AuthorDate: Mon Jul 8 17:58:14 2024 +0100
Support `IS NULL` and `IS NOT NULL` on Unions (#11321)
* Demonstrate unions can't be null
* add scalar test cases
* support "IS NULL" and "IS NOT NULL" on unions
* formatting
* fix comments from @alamb
* fix docstring
---
datafusion/common/src/scalar/mod.rs | 34 ++++-
datafusion/core/tests/dataframe/mod.rs | 165 ++++++++++++++++++++-
.../physical-expr/src/expressions/is_not_null.rs | 54 ++++++-
.../physical-expr/src/expressions/is_null.rs | 129 +++++++++++++++-
4 files changed, 373 insertions(+), 9 deletions(-)
diff --git a/datafusion/common/src/scalar/mod.rs
b/datafusion/common/src/scalar/mod.rs
index 55ce76c4b9..26e03a3b98 100644
--- a/datafusion/common/src/scalar/mod.rs
+++ b/datafusion/common/src/scalar/mod.rs
@@ -1459,7 +1459,10 @@ impl ScalarValue {
ScalarValue::DurationMillisecond(v) => v.is_none(),
ScalarValue::DurationMicrosecond(v) => v.is_none(),
ScalarValue::DurationNanosecond(v) => v.is_none(),
- ScalarValue::Union(v, _, _) => v.is_none(),
+ ScalarValue::Union(v, _, _) => match v {
+ Some((_, s)) => s.is_null(),
+ None => true,
+ },
ScalarValue::Dictionary(_, v) => v.is_null(),
}
}
@@ -6514,4 +6517,33 @@ mod tests {
}
intervals
}
+
+ fn union_fields() -> UnionFields {
+ [
+ (0, Arc::new(Field::new("A", DataType::Int32, true))),
+ (1, Arc::new(Field::new("B", DataType::Float64, true))),
+ ]
+ .into_iter()
+ .collect()
+ }
+
+ #[test]
+ fn sparse_scalar_union_is_null() {
+ let sparse_scalar = ScalarValue::Union(
+ Some((0_i8, Box::new(ScalarValue::Int32(None)))),
+ union_fields(),
+ UnionMode::Sparse,
+ );
+ assert!(sparse_scalar.is_null());
+ }
+
+ #[test]
+ fn dense_scalar_union_is_null() {
+ let dense_scalar = ScalarValue::Union(
+ Some((0_i8, Box::new(ScalarValue::Int32(None)))),
+ union_fields(),
+ UnionMode::Dense,
+ );
+ assert!(dense_scalar.is_null());
+ }
}
diff --git a/datafusion/core/tests/dataframe/mod.rs
b/datafusion/core/tests/dataframe/mod.rs
index e46a92e928..2d1904d9e1 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -29,8 +29,9 @@ use arrow::{
},
record_batch::RecordBatch,
};
-use arrow_array::Float32Array;
-use arrow_schema::ArrowError;
+use arrow_array::{Array, Float32Array, Float64Array, UnionArray};
+use arrow_buffer::ScalarBuffer;
+use arrow_schema::{ArrowError, UnionFields, UnionMode};
use datafusion_functions_aggregate::count::count_udaf;
use object_store::local::LocalFileSystem;
use std::fs;
@@ -2195,3 +2196,163 @@ async fn write_parquet_results() -> Result<()> {
Ok(())
}
+
+fn union_fields() -> UnionFields {
+ [
+ (0, Arc::new(Field::new("A", DataType::Int32, true))),
+ (1, Arc::new(Field::new("B", DataType::Float64, true))),
+ (2, Arc::new(Field::new("C", DataType::Utf8, true))),
+ ]
+ .into_iter()
+ .collect()
+}
+
+#[tokio::test]
+async fn sparse_union_is_null() {
+ // union of [{A=1}, {A=}, {B=3.2}, {B=}, {C="a"}, {C=}]
+ let int_array = Int32Array::from(vec![Some(1), None, None, None, None,
None]);
+ let float_array = Float64Array::from(vec![None, None, Some(3.2), None,
None, None]);
+ let str_array = StringArray::from(vec![None, None, None, None, Some("a"),
None]);
+ let type_ids = [0, 0, 1, 1, 2,
2].into_iter().collect::<ScalarBuffer<i8>>();
+
+ let children = vec![
+ Arc::new(int_array) as Arc<dyn Array>,
+ Arc::new(float_array),
+ Arc::new(str_array),
+ ];
+
+ let array = UnionArray::try_new(union_fields(), type_ids, None,
children).unwrap();
+
+ let field = Field::new(
+ "my_union",
+ DataType::Union(union_fields(), UnionMode::Sparse),
+ true,
+ );
+ let schema = Arc::new(Schema::new(vec![field]));
+
+ let batch = RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap();
+
+ let ctx = SessionContext::new();
+
+ ctx.register_batch("union_batch", batch).unwrap();
+
+ let df = ctx.table("union_batch").await.unwrap();
+
+ // view_all
+ let expected = [
+ "+----------+",
+ "| my_union |",
+ "+----------+",
+ "| {A=1} |",
+ "| {A=} |",
+ "| {B=3.2} |",
+ "| {B=} |",
+ "| {C=a} |",
+ "| {C=} |",
+ "+----------+",
+ ];
+ assert_batches_sorted_eq!(expected, &df.clone().collect().await.unwrap());
+
+ // filter where is null
+ let result_df = df.clone().filter(col("my_union").is_null()).unwrap();
+ let expected = [
+ "+----------+",
+ "| my_union |",
+ "+----------+",
+ "| {A=} |",
+ "| {B=} |",
+ "| {C=} |",
+ "+----------+",
+ ];
+ assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap());
+
+ // filter where is not null
+ let result_df = df.filter(col("my_union").is_not_null()).unwrap();
+ let expected = [
+ "+----------+",
+ "| my_union |",
+ "+----------+",
+ "| {A=1} |",
+ "| {B=3.2} |",
+ "| {C=a} |",
+ "+----------+",
+ ];
+ assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap());
+}
+
+#[tokio::test]
+async fn dense_union_is_null() {
+ // union of [{A=1}, null, {B=3.2}, {A=34}]
+ let int_array = Int32Array::from(vec![Some(1), None]);
+ let float_array = Float64Array::from(vec![Some(3.2), None]);
+ let str_array = StringArray::from(vec![Some("a"), None]);
+ let type_ids = [0, 0, 1, 1, 2,
2].into_iter().collect::<ScalarBuffer<i8>>();
+ let offsets = [0, 1, 0, 1, 0, 1]
+ .into_iter()
+ .collect::<ScalarBuffer<i32>>();
+
+ let children = vec![
+ Arc::new(int_array) as Arc<dyn Array>,
+ Arc::new(float_array),
+ Arc::new(str_array),
+ ];
+
+ let array =
+ UnionArray::try_new(union_fields(), type_ids, Some(offsets),
children).unwrap();
+
+ let field = Field::new(
+ "my_union",
+ DataType::Union(union_fields(), UnionMode::Dense),
+ true,
+ );
+ let schema = Arc::new(Schema::new(vec![field]));
+
+ let batch = RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap();
+
+ let ctx = SessionContext::new();
+
+ ctx.register_batch("union_batch", batch).unwrap();
+
+ let df = ctx.table("union_batch").await.unwrap();
+
+ // view_all
+ let expected = [
+ "+----------+",
+ "| my_union |",
+ "+----------+",
+ "| {A=1} |",
+ "| {A=} |",
+ "| {B=3.2} |",
+ "| {B=} |",
+ "| {C=a} |",
+ "| {C=} |",
+ "+----------+",
+ ];
+ assert_batches_sorted_eq!(expected, &df.clone().collect().await.unwrap());
+
+ // filter where is null
+ let result_df = df.clone().filter(col("my_union").is_null()).unwrap();
+ let expected = [
+ "+----------+",
+ "| my_union |",
+ "+----------+",
+ "| {A=} |",
+ "| {B=} |",
+ "| {C=} |",
+ "+----------+",
+ ];
+ assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap());
+
+ // filter where is not null
+ let result_df = df.filter(col("my_union").is_not_null()).unwrap();
+ let expected = [
+ "+----------+",
+ "| my_union |",
+ "+----------+",
+ "| {A=1} |",
+ "| {B=3.2} |",
+ "| {C=a} |",
+ "+----------+",
+ ];
+ assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap());
+}
diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs
b/datafusion/physical-expr/src/expressions/is_not_null.rs
index d8fa77585b..9f7438d13e 100644
--- a/datafusion/physical-expr/src/expressions/is_not_null.rs
+++ b/datafusion/physical-expr/src/expressions/is_not_null.rs
@@ -73,9 +73,11 @@ impl PhysicalExpr for IsNotNullExpr {
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let arg = self.arg.evaluate(batch)?;
match arg {
- ColumnarValue::Array(array) => Ok(ColumnarValue::Array(Arc::new(
- compute::is_not_null(array.as_ref())?,
- ))),
+ ColumnarValue::Array(array) => {
+ let is_null = super::is_null::compute_is_null(array)?;
+ let is_not_null = compute::not(&is_null)?;
+ Ok(ColumnarValue::Array(Arc::new(is_not_null)))
+ }
ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(
ScalarValue::Boolean(Some(!scalar.is_null())),
)),
@@ -120,6 +122,8 @@ mod tests {
array::{BooleanArray, StringArray},
datatypes::*,
};
+ use arrow_array::{Array, Float64Array, Int32Array, UnionArray};
+ use arrow_buffer::ScalarBuffer;
use datafusion_common::cast::as_boolean_array;
#[test]
@@ -143,4 +147,48 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn union_is_not_null_op() {
+ // union of [{A=1}, {A=}, {B=1.1}, {B=1.2}, {B=}]
+ let int_array = Int32Array::from(vec![Some(1), None, None, None,
None]);
+ let float_array =
+ Float64Array::from(vec![None, None, Some(1.1), Some(1.2), None]);
+ let type_ids = [0, 0, 1, 1,
1].into_iter().collect::<ScalarBuffer<i8>>();
+
+ let children = vec![Arc::new(int_array) as Arc<dyn Array>,
Arc::new(float_array)];
+
+ let union_fields: UnionFields = [
+ (0, Arc::new(Field::new("A", DataType::Int32, true))),
+ (1, Arc::new(Field::new("B", DataType::Float64, true))),
+ ]
+ .into_iter()
+ .collect();
+
+ let array =
+ UnionArray::try_new(union_fields.clone(), type_ids, None,
children).unwrap();
+
+ let field = Field::new(
+ "my_union",
+ DataType::Union(union_fields, UnionMode::Sparse),
+ true,
+ );
+
+ let schema = Schema::new(vec![field]);
+ let expr = is_not_null(col("my_union", &schema).unwrap()).unwrap();
+ let batch =
+ RecordBatch::try_new(Arc::new(schema),
vec![Arc::new(array)]).unwrap();
+
+ // expression: "a is not null"
+ let actual = expr
+ .evaluate(&batch)
+ .unwrap()
+ .into_array(batch.num_rows())
+ .expect("Failed to convert to array");
+ let actual = as_boolean_array(&actual).unwrap();
+
+ let expected = &BooleanArray::from(vec![true, false, true, true,
false]);
+
+ assert_eq!(expected, actual);
+ }
}
diff --git a/datafusion/physical-expr/src/expressions/is_null.rs
b/datafusion/physical-expr/src/expressions/is_null.rs
index 41becafde6..e2dc941e26 100644
--- a/datafusion/physical-expr/src/expressions/is_null.rs
+++ b/datafusion/physical-expr/src/expressions/is_null.rs
@@ -25,6 +25,9 @@ use arrow::{
datatypes::{DataType, Schema},
record_batch::RecordBatch,
};
+use arrow_array::{Array, ArrayRef, BooleanArray, Int8Array, UnionArray};
+use arrow_buffer::{BooleanBuffer, ScalarBuffer};
+use arrow_ord::cmp;
use crate::physical_expr::down_cast_any_ref;
use crate::PhysicalExpr;
@@ -74,9 +77,9 @@ impl PhysicalExpr for IsNullExpr {
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let arg = self.arg.evaluate(batch)?;
match arg {
- ColumnarValue::Array(array) => Ok(ColumnarValue::Array(Arc::new(
- compute::is_null(array.as_ref())?,
- ))),
+ ColumnarValue::Array(array) => {
+ Ok(ColumnarValue::Array(Arc::new(compute_is_null(array)?)))
+ }
ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(
ScalarValue::Boolean(Some(scalar.is_null())),
)),
@@ -100,6 +103,55 @@ impl PhysicalExpr for IsNullExpr {
}
}
+/// workaround <https://github.com/apache/arrow-rs/issues/6017>,
+/// this can be replaced with a direct call to `arrow::compute::is_null` once
it's fixed.
+pub(crate) fn compute_is_null(array: ArrayRef) -> Result<BooleanArray> {
+ if let Some(union_array) = array.as_any().downcast_ref::<UnionArray>() {
+ if let Some(offsets) = union_array.offsets() {
+ dense_union_is_null(union_array, offsets)
+ } else {
+ sparse_union_is_null(union_array)
+ }
+ } else {
+ compute::is_null(array.as_ref()).map_err(Into::into)
+ }
+}
+
+fn dense_union_is_null(
+ union_array: &UnionArray,
+ offsets: &ScalarBuffer<i32>,
+) -> Result<BooleanArray> {
+ let child_arrays = (0..union_array.type_names().len())
+ .map(|type_id| {
+ compute::is_null(&union_array.child(type_id as
i8)).map_err(Into::into)
+ })
+ .collect::<Result<Vec<BooleanArray>>>()?;
+
+ let buffer: BooleanBuffer = offsets
+ .iter()
+ .zip(union_array.type_ids())
+ .map(|(offset, type_id)| child_arrays[*type_id as usize].value(*offset
as usize))
+ .collect();
+
+ Ok(BooleanArray::new(buffer, None))
+}
+
+fn sparse_union_is_null(union_array: &UnionArray) -> Result<BooleanArray> {
+ let type_ids = Int8Array::new(union_array.type_ids().clone(), None);
+
+ let mut union_is_null =
+ BooleanArray::new(BooleanBuffer::new_unset(union_array.len()), None);
+ for type_id in 0..union_array.type_names().len() {
+ let type_id = type_id as i8;
+ let union_is_child = cmp::eq(&type_ids,
&Int8Array::new_scalar(type_id))?;
+ let child = union_array.child(type_id);
+ let child_array_is_null = compute::is_null(&child)?;
+ let child_is_null = compute::and(&union_is_child,
&child_array_is_null)?;
+ union_is_null = compute::or(&union_is_null, &child_is_null)?;
+ }
+ Ok(union_is_null)
+}
+
impl PartialEq<dyn Any> for IsNullExpr {
fn eq(&self, other: &dyn Any) -> bool {
down_cast_any_ref(other)
@@ -108,6 +160,7 @@ impl PartialEq<dyn Any> for IsNullExpr {
.unwrap_or(false)
}
}
+
/// Create an IS NULL expression
pub fn is_null(arg: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(IsNullExpr::new(arg)))
@@ -121,6 +174,8 @@ mod tests {
array::{BooleanArray, StringArray},
datatypes::*,
};
+ use arrow_array::{Float64Array, Int32Array};
+ use arrow_buffer::ScalarBuffer;
use datafusion_common::cast::as_boolean_array;
#[test]
@@ -145,4 +200,72 @@ mod tests {
Ok(())
}
+
+ fn union_fields() -> UnionFields {
+ [
+ (0, Arc::new(Field::new("A", DataType::Int32, true))),
+ (1, Arc::new(Field::new("B", DataType::Float64, true))),
+ (2, Arc::new(Field::new("C", DataType::Utf8, true))),
+ ]
+ .into_iter()
+ .collect()
+ }
+
+ #[test]
+ fn sparse_union_is_null() {
+ // union of [{A=1}, {A=}, {B=1.1}, {B=1.2}, {B=}, {C=}, {C="a"}]
+ let int_array =
+ Int32Array::from(vec![Some(1), None, None, None, None, None,
None]);
+ let float_array =
+ Float64Array::from(vec![None, None, Some(1.1), Some(1.2), None,
None, None]);
+ let str_array =
+ StringArray::from(vec![None, None, None, None, None, None,
Some("a")]);
+ let type_ids = [0, 0, 1, 1, 1, 2, 2]
+ .into_iter()
+ .collect::<ScalarBuffer<i8>>();
+
+ let children = vec![
+ Arc::new(int_array) as Arc<dyn Array>,
+ Arc::new(float_array),
+ Arc::new(str_array),
+ ];
+
+ let array =
+ UnionArray::try_new(union_fields(), type_ids, None,
children).unwrap();
+
+ let array_ref = Arc::new(array) as ArrayRef;
+ let result = compute_is_null(array_ref).unwrap();
+
+ let expected =
+ &BooleanArray::from(vec![false, true, false, false, true, true,
false]);
+ assert_eq!(expected, &result);
+ }
+
+ #[test]
+ fn dense_union_is_null() {
+ // union of [{A=1}, {A=}, {B=3.2}, {B=}, {C="a"}, {C=}]
+ let int_array = Int32Array::from(vec![Some(1), None]);
+ let float_array = Float64Array::from(vec![Some(3.2), None]);
+ let str_array = StringArray::from(vec![Some("a"), None]);
+ let type_ids = [0, 0, 1, 1, 2,
2].into_iter().collect::<ScalarBuffer<i8>>();
+ let offsets = [0, 1, 0, 1, 0, 1]
+ .into_iter()
+ .collect::<ScalarBuffer<i32>>();
+
+ let children = vec![
+ Arc::new(int_array) as Arc<dyn Array>,
+ Arc::new(float_array),
+ Arc::new(str_array),
+ ];
+
+ let array =
+ UnionArray::try_new(union_fields(), type_ids, Some(offsets),
children)
+ .unwrap();
+
+ let array_ref = Arc::new(array) as ArrayRef;
+ let result = compute_is_null(array_ref).unwrap();
+
+ let expected = &BooleanArray::from(vec![false, true, false, true,
false, true]);
+ assert_eq!(expected, &result);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]