This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 01352a026b Support `make_array` null handling in nested version (#7207) 01352a026b is described below commit 01352a026bad267a6311879c9c1b5be898968a40 Author: Jay Zhan <jayzhan...@gmail.com> AuthorDate: Wed Aug 9 00:57:22 2023 +0800 Support `make_array` null handling in nested version (#7207) * first draft Signed-off-by: jayzhan211 <jayzhan...@gmail.com> * fix nulls Signed-off-by: jayzhan211 <jayzhan...@gmail.com> * move rust test to sql logic test Signed-off-by: jayzhan211 <jayzhan...@gmail.com> * differentitate empty array and null array Signed-off-by: jayzhan211 <jayzhan...@gmail.com> --------- Signed-off-by: jayzhan211 <jayzhan...@gmail.com> --- .../core/tests/sqllogictests/test_files/array.slt | 53 ++++--- datafusion/physical-expr/src/array_expressions.rs | 152 ++++++++++----------- datafusion/physical-expr/src/scalar_function.rs | 6 +- 3 files changed, 111 insertions(+), 100 deletions(-) diff --git a/datafusion/core/tests/sqllogictests/test_files/array.slt b/datafusion/core/tests/sqllogictests/test_files/array.slt index 25e2e4b453..218817fc16 100644 --- a/datafusion/core/tests/sqllogictests/test_files/array.slt +++ b/datafusion/core/tests/sqllogictests/test_files/array.slt @@ -19,10 +19,8 @@ ## Array Expressions Tests ############# - ### Tables - statement ok CREATE TABLE values( a INT, @@ -529,7 +527,7 @@ select make_array(1, 2, NULL), make_array(make_array(NULL, 2), make_array(NULL, query ??? select make_array(NULL), make_array(NULL, NULL, NULL), make_array(make_array(NULL, NULL), make_array(NULL, NULL)); ---- -[] [] [[], []] +[] [, , ] [[, ], [, ]] # make_array with 1 columns query ??? @@ -614,10 +612,8 @@ select array_element(make_array(1, 2, 3, 4, 5), 0), array_element(make_array('h' NULL NULL # array_element scalar function #4 (with NULL) -query error +query error select array_element(make_array(1, 2, 3, 4, 5), NULL), array_element(make_array('h', 'e', 'l', 'l', 'o'), NULL); ----- -NULL NULL # array_element scalar function #5 (with negative index) query IT @@ -724,16 +720,12 @@ select array_slice(make_array(1, 2, 3, 4, 5), 0, 4), array_slice(make_array('h', [1, 2, 3, 4] [h, e, l] # array_slice scalar function #8 (with NULL and positive number) -query error +query error select array_slice(make_array(1, 2, 3, 4, 5), NULL, 4), array_slice(make_array('h', 'e', 'l', 'l', 'o'), NULL, 3); ----- -[1, 2, 3, 4] [h, e, l] # array_slice scalar function #9 (with positive number and NULL) -query error +query error select array_slice(make_array(1, 2, 3, 4, 5), 2, NULL), array_slice(make_array('h', 'e', 'l', 'l', 'o'), 3, NULL); ----- -[2, 3, 4, 5] [l, l, o] # array_slice scalar function #10 (with zero-zero) query ?? @@ -742,10 +734,8 @@ select array_slice(make_array(1, 2, 3, 4, 5), 0, 0), array_slice(make_array('h', [] [] # array_slice scalar function #11 (with NULL-NULL) -query error +query error select array_slice(make_array(1, 2, 3, 4, 5), NULL), array_slice(make_array('h', 'e', 'l', 'l', 'o'), NULL); ----- -[] [] # array_slice scalar function #12 (with zero and negative number) query ?? @@ -754,16 +744,12 @@ select array_slice(make_array(1, 2, 3, 4, 5), 0, -4), array_slice(make_array('h' [1] [h, e] # array_slice scalar function #13 (with negative number and NULL) -query error +query error select array_slice(make_array(1, 2, 3, 4, 5), 2, NULL), array_slice(make_array('h', 'e', 'l', 'l', 'o'), 3, NULL); ----- -[2, 3, 4, 5] [l, l, o] # array_slice scalar function #14 (with NULL and negative number) -query error +query error select array_slice(make_array(1, 2, 3, 4, 5), NULL, -4), array_slice(make_array('h', 'e', 'l', 'l', 'o'), NULL, -3); ----- -[1] [h, e] # array_slice scalar function #15 (with negative indexes) query ?? @@ -844,6 +830,31 @@ select array_slice(make_array(1, 2, 3, 4, 5), column2, column3), array_slice(col [1, 2, 3, 4, 5] [43, 44, 45, 46] [41, 42, 43, 44, 45] [5] [, 54, 55, 56, 57, 58, 59, 60] [55] +# make_array with nulls +query ??????? +select make_array(make_array('a','b'), null), + make_array(make_array('a','b'), null, make_array('c','d')), + make_array(null, make_array('a','b'), null), + make_array(null, make_array('a','b'), null, null, make_array('c','d')), + make_array(['a', 'bc', 'def'], null, make_array('rust')), + make_array([1,2,3], null, make_array(4,5,6,7)), + make_array(null, 1, null, 2, null, 3, null, null, 4, 5); +---- +[[a, b], ] [[a, b], , [c, d]] [, [a, b], ] [, [a, b], , , [c, d]] [[a, bc, def], , [rust]] [[1, 2, 3], , [4, 5, 6, 7]] [, 1, , 2, , 3, , , 4, 5] + +query ? +select make_array(column5, null, column5) from arrays_values_without_nulls; +---- +[[2, 3], , [2, 3]] +[[4, 5], , [4, 5]] +[[6, 7], , [6, 7]] +[[8, 9], , [8, 9]] + +query ? +select make_array(['a','b'], null); +---- +[[a, b], ] + ## array_append (aliases: `list_append`, `array_push_back`, `list_push_back`) # array_append scalar function #1 diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index a223a6998a..fcd9adf19d 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -212,6 +212,12 @@ fn compute_array_dims(arr: Option<ArrayRef>) -> Result<Option<Vec<Option<u64>>>> } } +#[derive(Debug)] +enum ListOrNull<'a> { + List(&'a dyn Array), + Null, +} + /// Convert one or more [`ArrayRef`] of the same type into a /// `ListArray` /// @@ -238,18 +244,18 @@ fn compute_array_dims(arr: Option<ArrayRef>) -> Result<Option<Vec<Option<u64>>>> /// /// Calling `array(col1, col2)` where col1 and col2 are lists /// would return a single new `ListArray`, where each row was a list -/// of the corresponding elements of col1 and col2 flattened. +/// of the corresponding elements of col1 and col2. /// /// ``` text -/// ┌──────────────┐ ┌──────────────┐ ┌────────────────────────┐ -/// │ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌────────────────────┐ │ -/// │ │ [A, X] │ │ │ │ [] │ │ │ │ [A, X] │ │ -/// │ ├──────────┤ │ │ ├──────────┤ │ │ ├────────────────────┤ │ -/// │ │[NULL, Y] │ │ │ │[Q, R, S] │ │───────▶│ │ [NULL, Y, Q, R, S] │ │ -/// │ ├──────────┤ │ │ ├──────────┤ │ │ ├────────────────────┤ │ -/// │ │ [C, Z] │ │ │ │ NULL │ │ │ │ [C, Z, NULL] │ │ -/// │ └──────────┘ │ │ └──────────┘ │ │ └────────────────────┘ │ -/// └──────────────┘ └──────────────┘ └────────────────────────┘ +/// ┌──────────────┐ ┌──────────────┐ ┌─────────────────────────────┐ +/// │ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌────────────────────────┐ │ +/// │ │ [A, X] │ │ │ │ [] │ │ │ │ [[A, X], []] │ │ +/// │ ├──────────┤ │ │ ├──────────┤ │ │ ├────────────────────────┤ │ +/// │ │[NULL, Y] │ │ │ │[Q, R, S] │ │───────▶│ │ [[NULL, Y], [Q, R, S]] │ │ +/// │ ├──────────┤ │ │ ├──────────┤ │ │ ├────────────────────────│ │ +/// │ │ [C, Z] │ │ │ │ NULL │ │ │ │ [[C, Z], NULL] │ │ +/// │ └──────────┘ │ │ └──────────┘ │ │ └────────────────────────┘ │ +/// └──────────────┘ └──────────────┘ └─────────────────────────────┘ /// col1 col2 output /// ``` fn array_array(args: &[ArrayRef], data_type: DataType) -> Result<ArrayRef> { @@ -260,23 +266,53 @@ fn array_array(args: &[ArrayRef], data_type: DataType) -> Result<ArrayRef> { let res = match data_type { DataType::List(..) => { - let arrays = - downcast_vec!(args, ListArray).collect::<Result<Vec<&ListArray>>>()?; - - // Assume number of rows is the same for all arrays - let row_count = arrays[0].len(); + let mut arrays = vec![]; + let mut row_count = 0; + + for arg in args { + let list_arr = arg.as_list_opt::<i32>(); + if let Some(list_arr) = list_arr { + // Assume number of rows is the same for all arrays + row_count = list_arr.len(); + arrays.push(ListOrNull::List(list_arr)); + } else if arg.as_any().downcast_ref::<NullArray>().is_some() { + arrays.push(ListOrNull::Null); + } else { + return Err(DataFusionError::Internal( + "Unsupported argument type for array".to_string(), + )); + } + } - let capacity = Capacities::Array(arrays.iter().map(|a| a.len()).sum()); - let array_data = arrays.iter().map(|a| a.to_data()).collect::<Vec<_>>(); + let mut total_capacity = 0; + let mut array_data = vec![]; + for arr in arrays.iter() { + if let ListOrNull::List(arr) = arr { + total_capacity += arr.len(); + array_data.push(arr.to_data()); + } + } + let capacity = Capacities::Array(total_capacity); let array_data = array_data.iter().collect(); + let mut mutable = MutableArrayData::with_capacities(array_data, true, capacity); for i in 0..row_count { - for (j, _) in arrays.iter().enumerate() { - mutable.extend(j, i, i + 1); + let mut nulls = 0; + for (j, arr) in arrays.iter().enumerate() { + match arr { + ListOrNull::List(_) => { + mutable.extend(j - nulls, i, i + 1); + } + ListOrNull::Null => { + mutable.extend_nulls(1); + nulls += 1; + } + } } } + let list_data_type = DataType::List(Arc::new(Field::new("item", data_type, true))); @@ -327,21 +363,36 @@ fn array(values: &[ColumnarValue]) -> Result<ColumnarValue> { }) .collect(); - let mut data_type = DataType::Null; + let mut data_type = None; for arg in &arrays { let arg_data_type = arg.data_type(); if !arg_data_type.equals_datatype(&DataType::Null) { - data_type = arg_data_type.clone(); + data_type = Some(arg_data_type.clone()); break; + } else { + data_type = Some(DataType::Null); } } match data_type { - DataType::Null => Ok(ColumnarValue::Scalar(ScalarValue::new_list( + // empty array + None => Ok(ColumnarValue::Scalar(ScalarValue::new_list( Some(vec![]), DataType::Null, ))), - _ => Ok(ColumnarValue::Array(array_array( + // all nulls, set default data type as int32 + Some(DataType::Null) => { + let nulls = arrays.len(); + let null_arr = Int32Array::from(vec![None; nulls]); + let field = Arc::new(Field::new("item", DataType::Int32, true)); + let offsets = OffsetBuffer::from_lengths([nulls]); + let values = Arc::new(null_arr) as ArrayRef; + let nulls = None; + Ok(ColumnarValue::Array(Arc::new(ListArray::new( + field, offsets, values, nulls, + )))) + } + Some(data_type) => Ok(ColumnarValue::Array(array_array( arrays.as_slice(), data_type, )?)), @@ -2118,61 +2169,6 @@ mod tests { ); } - #[test] - fn test_array_with_nulls() { - // make_array(NULL, 1, NULL, 2, NULL, 3, NULL, NULL, 4, 5) = [NULL, 1, NULL, 2, NULL, 3, NULL, NULL, 4, 5] - let args = [ - ColumnarValue::Scalar(ScalarValue::Null), - ColumnarValue::Scalar(ScalarValue::Int64(Some(1))), - ColumnarValue::Scalar(ScalarValue::Null), - ColumnarValue::Scalar(ScalarValue::Int64(Some(2))), - ColumnarValue::Scalar(ScalarValue::Null), - ColumnarValue::Scalar(ScalarValue::Int64(Some(3))), - ColumnarValue::Scalar(ScalarValue::Null), - ColumnarValue::Scalar(ScalarValue::Null), - ColumnarValue::Scalar(ScalarValue::Int64(Some(4))), - ColumnarValue::Scalar(ScalarValue::Int64(Some(5))), - ]; - let array = array(&args) - .expect("failed to initialize function array") - .into_array(1); - let result = as_list_array(&array).expect("failed to initialize function array"); - assert_eq!(result.len(), 1); - assert_eq!( - &[0, 1, 0, 2, 0, 3, 0, 0, 4, 5], - result - .value(0) - .as_any() - .downcast_ref::<Int64Array>() - .unwrap() - .values() - ) - } - - #[test] - fn test_array_all_nulls() { - // make_array(NULL, NULL, NULL) = [] - let args = [ - ColumnarValue::Scalar(ScalarValue::Null), - ColumnarValue::Scalar(ScalarValue::Null), - ColumnarValue::Scalar(ScalarValue::Null), - ]; - let array = array(&args) - .expect("failed to initialize function array") - .into_array(1); - let result = as_list_array(&array).expect("failed to initialize function array"); - assert_eq!(result.len(), 1); - assert_eq!( - 0, - result - .value(0) - .as_any() - .downcast_ref::<NullArray>() - .unwrap() - .null_count() - ) - } - #[test] fn test_array_element() { // array_element([1, 2, 3, 4], 1) = 1 diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index 25c0627aee..df1e459efb 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -125,7 +125,11 @@ impl PhysicalExpr for ScalarFunctionExpr { // evaluate the arguments, if there are no arguments we'll instead pass in a null array // indicating the batch size (as a convention) let inputs = match (self.args.len(), self.name.parse::<BuiltinScalarFunction>()) { - (0, Ok(scalar_fun)) if scalar_fun.supports_zero_argument() => { + // MakeArray support zero argument but has the different behavior from the array with one null. + (0, Ok(scalar_fun)) + if scalar_fun.supports_zero_argument() + && scalar_fun != BuiltinScalarFunction::MakeArray => + { vec![ColumnarValue::create_null_array(batch.num_rows())] } _ => self