This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new c43d7bed16 Optimize `Unnest` and implement `skip_nulls=true` if
specified (#7371)
c43d7bed16 is described below
commit c43d7bed16ee2b469b21537d01226fc9857fb9ce
Author: Miklos Szots <[email protected]>
AuthorDate: Thu Aug 24 15:39:29 2023 +0200
Optimize `Unnest` and implement `skip_nulls=true` if specified (#7371)
* reimplement optimized unnset exec
* fixes and more tests
* refactor
* document the code
* review fix
---
datafusion/core/src/physical_plan/unnest.rs | 349 ++++++++++++++++++----------
datafusion/core/tests/dataframe/mod.rs | 134 +++++++----
2 files changed, 306 insertions(+), 177 deletions(-)
diff --git a/datafusion/core/src/physical_plan/unnest.rs
b/datafusion/core/src/physical_plan/unnest.rs
index 69ac857429..40c4edc953 100644
--- a/datafusion/core/src/physical_plan/unnest.rs
+++ b/datafusion/core/src/physical_plan/unnest.rs
@@ -18,19 +18,18 @@
//! Defines the unnest column plan for unnesting values in a column that
contains a list
//! type, conceptually is like joining each row with all the values in the
list column.
use arrow::array::{
- new_empty_array, new_null_array, Array, ArrayAccessor, ArrayRef,
ArrowPrimitiveType,
- FixedSizeListArray, LargeListArray, ListArray, PrimitiveArray,
+ Array, ArrayRef, ArrowPrimitiveType, FixedSizeListArray, LargeListArray,
ListArray,
+ PrimitiveArray,
};
use arrow::compute::kernels;
use arrow::datatypes::{
- ArrowNativeType, ArrowNativeTypeOp, DataType, Int32Type, Int64Type,
Schema, SchemaRef,
+ ArrowNativeType, DataType, Int32Type, Int64Type, Schema, SchemaRef,
};
use arrow::record_batch::RecordBatch;
+use arrow_array::{GenericListArray, OffsetSizeTrait};
use async_trait::async_trait;
use datafusion_common::UnnestOptions;
-use datafusion_common::{
- cast::as_primitive_array, exec_err, not_impl_err, DataFusionError, Result,
-};
+use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
use futures::Stream;
use futures::StreamExt;
@@ -148,14 +147,11 @@ impl ExecutionPlan for UnnestExec {
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, context)?;
- if !self.options.preserve_nulls {
- return not_impl_err!("Unnest with preserve_nulls=false");
- }
-
Ok(Box::pin(UnnestStream {
input,
schema: self.schema.clone(),
column: self.column.clone(),
+ options: self.options.clone(),
num_input_batches: 0,
num_input_rows: 0,
num_output_batches: 0,
@@ -177,6 +173,8 @@ struct UnnestStream {
schema: Arc<Schema>,
/// The unnest column
column: Column,
+ /// Options
+ options: UnnestOptions,
/// number of input batches
num_input_batches: usize,
/// number of input rows
@@ -219,7 +217,8 @@ impl UnnestStream {
.map(|maybe_batch| match maybe_batch {
Some(Ok(batch)) => {
let start = Instant::now();
- let result = build_batch(&batch, &self.schema,
&self.column);
+ let result =
+ build_batch(&batch, &self.schema, &self.column,
&self.options);
self.num_input_batches += 1;
self.num_input_rows += batch.num_rows();
if let Ok(ref batch) = result {
@@ -250,122 +249,260 @@ fn build_batch(
batch: &RecordBatch,
schema: &SchemaRef,
column: &Column,
+ options: &UnnestOptions,
) -> Result<RecordBatch> {
let list_array = column.evaluate(batch)?.into_array(batch.num_rows());
match list_array.data_type() {
DataType::List(_) => {
let list_array =
list_array.as_any().downcast_ref::<ListArray>().unwrap();
- unnest_batch(batch, schema, column, &list_array)
+ build_batch_generic_list::<i32, Int32Type>(
+ batch,
+ schema,
+ column.index(),
+ list_array,
+ options,
+ )
}
DataType::LargeList(_) => {
let list_array = list_array
.as_any()
.downcast_ref::<LargeListArray>()
.unwrap();
- unnest_batch(batch, schema, column, &list_array)
+ build_batch_generic_list::<i64, Int64Type>(
+ batch,
+ schema,
+ column.index(),
+ list_array,
+ options,
+ )
}
DataType::FixedSizeList(_, _) => {
let list_array = list_array
.as_any()
.downcast_ref::<FixedSizeListArray>()
.unwrap();
- unnest_batch(batch, schema, column, list_array)
+ build_batch_fixedsize_list(batch, schema, column.index(),
list_array, options)
}
_ => exec_err!("Invalid unnest column {column}"),
}
}
-fn unnest_batch<T>(
+fn build_batch_generic_list<T: OffsetSizeTrait, P: ArrowPrimitiveType<Native =
T>>(
batch: &RecordBatch,
schema: &SchemaRef,
- column: &Column,
- list_array: &T,
-) -> Result<RecordBatch>
-where
- T: ArrayAccessor<Item = ArrayRef>,
-{
- // Create an array with the unnested values of the list array, given the
list
- // array:
- //
- // [1], null, [2, 3, 4], null, [5, 6]
- //
- // the result array is:
- //
- // 1, null, 2, 3, 4, null, 5, 6
- //
- let unnested_array = unnest_array(list_array)?;
-
- // Create an array with the lengths of each list value in the nested array.
- // Given the nested array:
- //
- // [1], null, [2, 3, 4], null, [5, 6]
- //
- // the result array is:
- //
- // 1, null, 3, null, 2
- //
- // Depending on the list type the result may be Int32Array or Int64Array.
- let list_lengths = kernels::length::length(list_array)?;
-
- // Create the indices for the take kernel and then use those indices to
create
- // the unnested record batch.
- match list_lengths.data_type() {
- DataType::Int32 => {
- let list_lengths = as_primitive_array::<Int32Type>(&list_lengths)?;
- let indices = create_take_indices(list_lengths,
unnested_array.len());
- batch_from_indices(batch, schema, column.index(), &unnested_array,
&indices)
- }
- DataType::Int64 => {
- let list_lengths = as_primitive_array::<Int64Type>(&list_lengths)?;
- let indices = create_take_indices(list_lengths,
unnested_array.len());
- batch_from_indices(batch, schema, column.index(), &unnested_array,
&indices)
- }
- dt => exec_err!("Unnest: unsupported indices type {dt}"),
- }
+ unnest_column_idx: usize,
+ list_array: &GenericListArray<T>,
+ options: &UnnestOptions,
+) -> Result<RecordBatch> {
+ let unnested_array = unnest_generic_list::<T, P>(list_array, options)?;
+
+ let take_indicies =
+ create_take_indicies_generic::<T, P>(list_array, unnested_array.len(),
options);
+
+ batch_from_indices(
+ batch,
+ schema,
+ unnest_column_idx,
+ &unnested_array,
+ &take_indicies,
+ )
}
-/// Create the indices for the take kernel given an array of list values
lengths.
+/// Given this `GenericList` list_array:
+///
+/// ```ignore
+/// [1], null, [2, 3, 4], null, [5, 6]
+/// ```
+/// Its values array is represented like this:
+///
+/// ```ignore
+/// [1, 2, 3, 4, 5, 6]
+/// ```
///
-/// The indices are used to duplicate column elements so that all columns have
as
-/// many rows as the unnested array.
+/// So if there are no null values or `UnnestOptions.preserve_nulls` is false
+/// we can return the values array without any copying.
///
-/// Given the nested array:
+/// Otherwise we'll transfrom the values array using the take kernel and the
following take indicies:
///
/// ```ignore
-/// [1], null, [2, 3, 4], null, [5, 6]
+/// 0, null, 1, 2, 3, null, 4, 5
/// ```
///
-/// the `list_lengths` array contains the length of each list value:
+fn unnest_generic_list<T: OffsetSizeTrait, P: ArrowPrimitiveType<Native = T>>(
+ list_array: &GenericListArray<T>,
+ options: &UnnestOptions,
+) -> Result<Arc<dyn Array + 'static>> {
+ let values = list_array.values();
+ if list_array.null_count() == 0 || !options.preserve_nulls {
+ Ok(values.clone())
+ } else {
+ let mut take_indicies_builder =
+ PrimitiveArray::<P>::builder(values.len() +
list_array.null_count());
+ let mut take_offset = 0;
+
+ list_array.iter().for_each(|elem| match elem {
+ Some(array) => {
+ for i in 0..array.len() {
+ // take_offset + i is always positive
+ let take_index = P::Native::from_usize(take_offset +
i).unwrap();
+ take_indicies_builder.append_value(take_index);
+ }
+ take_offset += array.len();
+ }
+ None => {
+ take_indicies_builder.append_null();
+ }
+ });
+ Ok(kernels::take::take(
+ &values,
+ &take_indicies_builder.finish(),
+ None,
+ )?)
+ }
+}
+
+fn build_batch_fixedsize_list(
+ batch: &RecordBatch,
+ schema: &SchemaRef,
+ unnest_column_idx: usize,
+ list_array: &FixedSizeListArray,
+ options: &UnnestOptions,
+) -> Result<RecordBatch> {
+ let unnested_array = unnest_fixed_list(list_array, options)?;
+
+ let take_indicies =
+ create_take_indicies_fixed(list_array, unnested_array.len(), options);
+
+ batch_from_indices(
+ batch,
+ schema,
+ unnest_column_idx,
+ &unnested_array,
+ &take_indicies,
+ )
+}
+
+/// Given this `FixedSizeListArray` list_array:
+///
+/// ```ignore
+/// [1, 2], null, [3, 4], null, [5, 6]
+/// ```
+/// Its values array is represented like this:
///
/// ```ignore
-/// 1, null, 3, null, 2
+/// [1, 2, null, null 3, 4, null, null, 5, 6]
/// ```
///
-/// the result indices array is:
+/// So if there are no null values
+/// we can return the values array without any copying.
+///
+/// Otherwise we'll transfrom the values array using the take kernel.
+///
+/// If `UnnestOptions.preserve_nulls` is true the take indicies will look like
this:
///
/// ```ignore
-/// 0, 1, 2, 2, 2, 3, 4, 4
+/// 0, 1, null, 4, 5, null, 8, 9
+/// ```
+/// Otherwise we drop the nulls and take indicies will look like this:
+///
+/// ```ignore
+/// 0, 1, 4, 5, 8, 9
/// ```
///
-/// where a null value count as one element.
-fn create_take_indices<T>(
- list_lengths: &PrimitiveArray<T>,
+fn unnest_fixed_list(
+ list_array: &FixedSizeListArray,
+ options: &UnnestOptions,
+) -> Result<Arc<dyn Array + 'static>> {
+ let values = list_array.values();
+
+ if list_array.null_count() == 0 {
+ Ok(values.clone())
+ } else {
+ let len_without_nulls =
+ values.len() - list_array.null_count() * list_array.value_length()
as usize;
+ let null_count = if options.preserve_nulls {
+ list_array.null_count()
+ } else {
+ 0
+ };
+ let mut builder =
+ PrimitiveArray::<Int32Type>::builder(len_without_nulls +
null_count);
+ let mut take_offset = 0;
+ let fixed_value_length = list_array.value_length() as usize;
+ list_array.iter().for_each(|elem| match elem {
+ Some(_) => {
+ for i in 0..fixed_value_length {
+ //take_offset + i is always positive
+ let take_index = take_offset + i;
+ builder.append_value(take_index as i32);
+ }
+ take_offset += fixed_value_length;
+ }
+ None => {
+ if options.preserve_nulls {
+ builder.append_null();
+ }
+ take_offset += fixed_value_length;
+ }
+ });
+ Ok(kernels::take::take(&values, &builder.finish(), None)?)
+ }
+}
+
+/// Creates take indicies to be used to expand all other column's data.
+/// Every column value needs to be repeated as many times as many elements
there is in each corresponding array value.
+///
+/// If the column being unnested looks like this:
+///
+/// ```ignore
+/// [1], null, [2, 3, 4], null, [5, 6]
+/// ```
+/// Then `create_take_indicies_generic` will return an array like this
+///
+/// ```ignore
+/// [1, null, 2, 2, 2, null, 4, 4]
+/// ```
+///
+fn create_take_indicies_generic<T: OffsetSizeTrait, P:
ArrowPrimitiveType<Native = T>>(
+ list_array: &GenericListArray<T>,
capacity: usize,
-) -> PrimitiveArray<T>
-where
- T: ArrowPrimitiveType,
-{
- let mut builder = PrimitiveArray::<T>::builder(capacity);
- for row in 0..list_lengths.len() {
- let repeat = if list_lengths.is_null(row) {
- T::Native::ONE
+ options: &UnnestOptions,
+) -> PrimitiveArray<P> {
+ let mut builder = PrimitiveArray::<P>::builder(capacity);
+ let null_repeat: usize = if options.preserve_nulls { 1 } else { 0 };
+
+ for row in 0..list_array.len() {
+ let repeat = if list_array.is_null(row) {
+ null_repeat
+ } else {
+ list_array.value(row).len()
+ };
+
+ // `index` is a positive interger.
+ let index = P::Native::from_usize(row).unwrap();
+ (0..repeat).for_each(|_| builder.append_value(index));
+ }
+
+ builder.finish()
+}
+
+fn create_take_indicies_fixed(
+ list_array: &FixedSizeListArray,
+ capacity: usize,
+ options: &UnnestOptions,
+) -> PrimitiveArray<Int32Type> {
+ let mut builder = PrimitiveArray::<Int32Type>::builder(capacity);
+ let null_repeat: usize = if options.preserve_nulls { 1 } else { 0 };
+
+ for row in 0..list_array.len() {
+ let repeat = if list_array.is_null(row) {
+ null_repeat
} else {
- list_lengths.value(row)
+ list_array.value_length() as usize
};
- // Both `repeat` and `index` are positive intergers.
- let repeat = repeat.to_usize().unwrap();
- let index = T::Native::from_usize(row).unwrap();
+ // `index` is a positive interger.
+ let index = <Int32Type as
ArrowPrimitiveType>::Native::from_usize(row).unwrap();
(0..repeat).for_each(|_| builder.append_value(index));
}
@@ -428,49 +565,3 @@ where
Ok(RecordBatch::try_new(schema.clone(), arrays.to_vec())?)
}
-
-/// Unnest the given list array. Given the array:
-///
-/// ```ignore
-/// [1], null, [2, 3, 4], null, [5, 6]
-/// ```
-///
-/// returns:
-///
-/// ```ignore
-/// 1, null, 2, 3, 4, null, 5, 6
-/// ```
-fn unnest_array<T>(list_array: &T) -> Result<Arc<dyn Array + 'static>>
-where
- T: ArrayAccessor<Item = ArrayRef>,
-{
- let elem_type = match list_array.data_type() {
- DataType::List(f) | DataType::FixedSizeList(f, _) |
DataType::LargeList(f) => {
- f.data_type()
- }
- dt => return exec_err!("Cannot unnest array of type {dt}"),
- };
-
- if list_array.is_empty() {
- return Ok(new_empty_array(elem_type));
- }
-
- let null_row = new_null_array(elem_type, 1);
-
- // Create a vec of ArrayRef from the list elements.
- let arrays = (0..list_array.len())
- .map(|row| {
- if list_array.is_null(row) {
- null_row.clone()
- } else {
- list_array.value(row)
- }
- })
- .collect::<Vec<_>>();
-
- // Create Vec<&dyn Array> from Vec<Arc<dyn Array>> for `concat`. Calling
- // `as_ref()` in the `map` above causes the borrow checker to complain.
- let arrays = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
-
- Ok(kernels::concat::concat(&arrays)?)
-}
diff --git a/datafusion/core/tests/dataframe/mod.rs
b/datafusion/core/tests/dataframe/mod.rs
index 48ec4ef073..73cba35d70 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -27,6 +27,7 @@ use arrow::{
},
record_batch::RecordBatch,
};
+use arrow_schema::ArrowError;
use std::sync::Arc;
use datafusion::dataframe::DataFrame;
@@ -1045,25 +1046,6 @@ async fn unnest_columns() -> Result<()> {
}
#[tokio::test]
-async fn unnest_column_preserve_nulls_not_supported() -> Result<()> {
- // Unnest, preserving nulls not yet supported
- let options = UnnestOptions::new().with_preserve_nulls(false);
-
- let results = table_with_lists_and_nulls()
- .await?
- .clone()
- .unnest_column_with_options("list", options)?
- .collect()
- .await;
-
- assert_eq!(
- results.unwrap_err().to_string(),
- "This feature is not implemented: Unnest with preserve_nulls=false"
- );
- Ok(())
-}
-#[tokio::test]
-#[ignore] // https://github.com/apache/arrow-datafusion/issues/7087
async fn unnest_column_nulls() -> Result<()> {
let df = table_with_lists_and_nulls().await?;
let results = df.clone().collect().await?;
@@ -1099,7 +1081,6 @@ async fn unnest_column_nulls() -> Result<()> {
];
assert_batches_eq!(expected, &results);
- // NOTE this is incorrect,
let options = UnnestOptions::new().with_preserve_nulls(false);
let results = df
.unnest_column_with_options("list", options)?
@@ -1111,7 +1092,6 @@ async fn unnest_column_nulls() -> Result<()> {
"+------+----+",
"| 1 | A |",
"| 2 | A |",
- "| | B |", // this row should not be here
"| 3 | D |",
"+------+----+",
];
@@ -1122,32 +1102,57 @@ async fn unnest_column_nulls() -> Result<()> {
#[tokio::test]
async fn unnest_fixed_list() -> Result<()> {
- let mut shape_id_builder = UInt32Builder::new();
- let mut tags_builder = FixedSizeListBuilder::new(StringBuilder::new(), 2);
+ let batch = get_fixed_list_batch()?;
- for idx in 0..6 {
- // Append shape id.
- shape_id_builder.append_value(idx as u32 + 1);
+ let ctx = SessionContext::new();
+ ctx.register_batch("shapes", batch)?;
+ let df = ctx.table("shapes").await?;
- if idx % 3 != 0 {
- tags_builder
- .values()
- .append_value(format!("tag{}1", idx + 1));
- tags_builder
- .values()
- .append_value(format!("tag{}2", idx + 1));
- tags_builder.append(true);
- } else {
- tags_builder.values().append_null();
- tags_builder.values().append_null();
- tags_builder.append(false);
- }
- }
+ let results = df.clone().collect().await?;
+ let expected = vec![
+ "+----------+----------------+",
+ "| shape_id | tags |",
+ "+----------+----------------+",
+ "| 1 | |",
+ "| 2 | [tag21, tag22] |",
+ "| 3 | [tag31, tag32] |",
+ "| 4 | |",
+ "| 5 | [tag51, tag52] |",
+ "| 6 | [tag61, tag62] |",
+ "+----------+----------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
- let batch = RecordBatch::try_from_iter(vec![
- ("shape_id", Arc::new(shape_id_builder.finish()) as ArrayRef),
- ("tags", Arc::new(tags_builder.finish()) as ArrayRef),
- ])?;
+ let options = UnnestOptions::new().with_preserve_nulls(true);
+
+ let results = df
+ .unnest_column_with_options("tags", options)?
+ .collect()
+ .await?;
+ let expected = vec![
+ "+----------+-------+",
+ "| shape_id | tags |",
+ "+----------+-------+",
+ "| 1 | |",
+ "| 2 | tag21 |",
+ "| 2 | tag22 |",
+ "| 3 | tag31 |",
+ "| 3 | tag32 |",
+ "| 4 | |",
+ "| 5 | tag51 |",
+ "| 5 | tag52 |",
+ "| 6 | tag61 |",
+ "| 6 | tag62 |",
+ "+----------+-------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn unnest_fixed_list_drop_nulls() -> Result<()> {
+ let batch = get_fixed_list_batch()?;
let ctx = SessionContext::new();
ctx.register_batch("shapes", batch)?;
@@ -1168,17 +1173,20 @@ async fn unnest_fixed_list() -> Result<()> {
];
assert_batches_sorted_eq!(expected, &results);
- let results = df.unnest_column("tags")?.collect().await?;
+ let options = UnnestOptions::new().with_preserve_nulls(false);
+
+ let results = df
+ .unnest_column_with_options("tags", options)?
+ .collect()
+ .await?;
let expected = vec![
"+----------+-------+",
"| shape_id | tags |",
"+----------+-------+",
- "| 1 | |",
"| 2 | tag21 |",
"| 2 | tag22 |",
"| 3 | tag31 |",
"| 3 | tag32 |",
- "| 4 | |",
"| 5 | tag51 |",
"| 5 | tag52 |",
"| 6 | tag61 |",
@@ -1191,7 +1199,6 @@ async fn unnest_fixed_list() -> Result<()> {
}
#[tokio::test]
-#[ignore] // https://github.com/apache/arrow-datafusion/issues/7087
async fn unnest_fixed_list_nonull() -> Result<()> {
let mut shape_id_builder = UInt32Builder::new();
let mut tags_builder = FixedSizeListBuilder::new(StringBuilder::new(), 2);
@@ -1531,6 +1538,37 @@ async fn table_with_nested_types(n: usize) ->
Result<DataFrame> {
ctx.table("shapes").await
}
+fn get_fixed_list_batch() -> Result<RecordBatch, ArrowError> {
+ let mut shape_id_builder = UInt32Builder::new();
+ let mut tags_builder = FixedSizeListBuilder::new(StringBuilder::new(), 2);
+
+ for idx in 0..6 {
+ // Append shape id.
+ shape_id_builder.append_value(idx as u32 + 1);
+
+ if idx % 3 != 0 {
+ tags_builder
+ .values()
+ .append_value(format!("tag{}1", idx + 1));
+ tags_builder
+ .values()
+ .append_value(format!("tag{}2", idx + 1));
+ tags_builder.append(true);
+ } else {
+ tags_builder.values().append_null();
+ tags_builder.values().append_null();
+ tags_builder.append(false);
+ }
+ }
+
+ let batch = RecordBatch::try_from_iter(vec![
+ ("shape_id", Arc::new(shape_id_builder.finish()) as ArrayRef),
+ ("tags", Arc::new(tags_builder.finish()) as ArrayRef),
+ ])?;
+
+ Ok(batch)
+}
+
/// A a data frame that a list of integers and string IDs
async fn table_with_lists_and_nulls() -> Result<DataFrame> {
let mut list_builder = ListBuilder::new(UInt32Builder::new());