This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 9ff35f659e Minor: Build array_array() with ListArray construction
instead of ArrayData (#7780)
9ff35f659e is described below
commit 9ff35f659e686044b40f4ddc5fa8df4f7aafe2bb
Author: Jay Zhan <[email protected]>
AuthorDate: Tue Oct 10 04:44:48 2023 +0800
Minor: Build array_array() with ListArray construction instead of ArrayData
(#7780)
* rewrite array_array
Signed-off-by: jayzhan211 <[email protected]>
* remove unwrap
Signed-off-by: jayzhan211 <[email protected]>
---------
Signed-off-by: jayzhan211 <[email protected]>
---
datafusion/physical-expr/src/array_expressions.rs | 120 +++++++++++-----------
1 file changed, 58 insertions(+), 62 deletions(-)
diff --git a/datafusion/physical-expr/src/array_expressions.rs
b/datafusion/physical-expr/src/array_expressions.rs
index 790c0481e1..0a9c2f5604 100644
--- a/datafusion/physical-expr/src/array_expressions.rs
+++ b/datafusion/physical-expr/src/array_expressions.rs
@@ -18,7 +18,7 @@
//! Array expressions
use arrow::array::*;
-use arrow::buffer::{Buffer, OffsetBuffer};
+use arrow::buffer::OffsetBuffer;
use arrow::compute;
use arrow::datatypes::{DataType, Field, UInt64Type};
use arrow_buffer::NullBuffer;
@@ -261,12 +261,6 @@ macro_rules! call_array_function {
}};
}
-#[derive(Debug)]
-enum ListOrNull<'a> {
- List(&'a dyn Array),
- Null,
-}
-
/// Convert one or more [`ArrayRef`] of the same type into a
/// `ListArray`
///
@@ -315,64 +309,66 @@ fn array_array(args: &[ArrayRef], data_type: DataType) ->
Result<ArrayRef> {
let res = match data_type {
DataType::List(..) => {
- let mut arrays = vec![];
- let mut row_count = 0;
-
- for arg in args {
- let list_arr = arg.as_list_opt::<i32>();
- if let Some(list_arr) = list_arr {
- // Assume number of rows is the same for all arrays
- row_count = list_arr.len();
- arrays.push(ListOrNull::List(list_arr));
- } else if arg.as_any().downcast_ref::<NullArray>().is_some() {
- arrays.push(ListOrNull::Null);
- } else {
- return internal_err!("Unsupported argument type for
array");
- }
- }
-
- let mut total_capacity = 0;
- let mut array_data = vec![];
- for arr in arrays.iter() {
- if let ListOrNull::List(arr) = arr {
- total_capacity += arr.len();
- array_data.push(arr.to_data());
- }
- }
- let capacity = Capacities::Array(total_capacity);
- let array_data = array_data.iter().collect();
-
- let mut mutable =
- MutableArrayData::with_capacities(array_data, true, capacity);
-
- for i in 0..row_count {
- let mut nulls = 0;
- for (j, arr) in arrays.iter().enumerate() {
- match arr {
- ListOrNull::List(_) => {
- mutable.extend(j - nulls, i, i + 1);
- }
- ListOrNull::Null => {
- mutable.extend_nulls(1);
- nulls += 1;
- }
+ let row_count = args[0].len();
+ let column_count = args.len();
+ let mut list_arrays = vec![];
+ let mut list_array_lengths = vec![];
+ let mut list_valid = BooleanBufferBuilder::new(row_count);
+ // Construct ListArray per row
+ for index in 0..row_count {
+ let mut arrays = vec![];
+ let mut array_lengths = vec![];
+ let mut valid = BooleanBufferBuilder::new(column_count);
+ for arg in args {
+ if arg.as_any().downcast_ref::<NullArray>().is_some() {
+ array_lengths.push(0);
+ valid.append(false);
+ } else {
+ let list_arr = as_list_array(arg)?;
+ let arr = list_arr.value(index);
+ array_lengths.push(arr.len());
+ arrays.push(arr);
+ valid.append(true);
}
}
+ if arrays.is_empty() {
+ list_valid.append(false);
+ list_array_lengths.push(0);
+ } else {
+ let buffer = valid.finish();
+ // Assume all list arrays have the same data type
+ let data_type = arrays[0].data_type();
+ let field = Arc::new(Field::new("item",
data_type.to_owned(), true));
+ let elements = arrays.iter().map(|x|
x.as_ref()).collect::<Vec<_>>();
+ let values = arrow::compute::concat(elements.as_slice())?;
+ let list_arr = ListArray::new(
+ field,
+ OffsetBuffer::from_lengths(array_lengths),
+ values,
+ Some(NullBuffer::new(buffer)),
+ );
+ list_valid.append(true);
+ list_array_lengths.push(list_arr.len());
+ list_arrays.push(list_arr);
+ }
}
-
- let list_data_type =
- DataType::List(Arc::new(Field::new("item", data_type, true)));
-
- let offsets: Vec<i32> = (0..row_count as i32 + 1)
- .map(|i| i * arrays.len() as i32)
- .collect();
-
- let list_data = ArrayData::builder(list_data_type)
- .len(row_count)
- .buffers(vec![Buffer::from_vec(offsets)])
- .add_child_data(mutable.freeze())
- .build()?;
- Arc::new(ListArray::from(list_data))
+ // Construct ListArray for all rows
+ let buffer = list_valid.finish();
+ // Assume all list arrays have the same data type
+ let data_type = list_arrays[0].data_type();
+ let field = Arc::new(Field::new("item", data_type.to_owned(),
true));
+ let elements = list_arrays
+ .iter()
+ .map(|x| x as &dyn Array)
+ .collect::<Vec<_>>();
+ let values = arrow::compute::concat(elements.as_slice())?;
+ let list_arr = ListArray::new(
+ field,
+ OffsetBuffer::from_lengths(list_array_lengths),
+ values,
+ Some(NullBuffer::new(buffer)),
+ );
+ Arc::new(list_arr)
}
DataType::Utf8 => array!(args, StringArray, StringBuilder),
DataType::LargeUtf8 => array!(args, LargeStringArray,
LargeStringBuilder),