This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new c43d7bed16 Optimize `Unnest` and implement `skip_nulls=true` if 
specified (#7371)
c43d7bed16 is described below

commit c43d7bed16ee2b469b21537d01226fc9857fb9ce
Author: Miklos Szots <[email protected]>
AuthorDate: Thu Aug 24 15:39:29 2023 +0200

    Optimize `Unnest` and implement `skip_nulls=true` if specified (#7371)
    
    * reimplement optimized unnset exec
    
    * fixes and more tests
    
    * refactor
    
    * document the code
    
    * review fix
---
 datafusion/core/src/physical_plan/unnest.rs | 349 ++++++++++++++++++----------
 datafusion/core/tests/dataframe/mod.rs      | 134 +++++++----
 2 files changed, 306 insertions(+), 177 deletions(-)

diff --git a/datafusion/core/src/physical_plan/unnest.rs 
b/datafusion/core/src/physical_plan/unnest.rs
index 69ac857429..40c4edc953 100644
--- a/datafusion/core/src/physical_plan/unnest.rs
+++ b/datafusion/core/src/physical_plan/unnest.rs
@@ -18,19 +18,18 @@
 //! Defines the unnest column plan for unnesting values in a column that 
contains a list
 //! type, conceptually is like joining each row with all the values in the 
list column.
 use arrow::array::{
-    new_empty_array, new_null_array, Array, ArrayAccessor, ArrayRef, 
ArrowPrimitiveType,
-    FixedSizeListArray, LargeListArray, ListArray, PrimitiveArray,
+    Array, ArrayRef, ArrowPrimitiveType, FixedSizeListArray, LargeListArray, 
ListArray,
+    PrimitiveArray,
 };
 use arrow::compute::kernels;
 use arrow::datatypes::{
-    ArrowNativeType, ArrowNativeTypeOp, DataType, Int32Type, Int64Type, 
Schema, SchemaRef,
+    ArrowNativeType, DataType, Int32Type, Int64Type, Schema, SchemaRef,
 };
 use arrow::record_batch::RecordBatch;
+use arrow_array::{GenericListArray, OffsetSizeTrait};
 use async_trait::async_trait;
 use datafusion_common::UnnestOptions;
-use datafusion_common::{
-    cast::as_primitive_array, exec_err, not_impl_err, DataFusionError, Result,
-};
+use datafusion_common::{exec_err, DataFusionError, Result};
 use datafusion_execution::TaskContext;
 use futures::Stream;
 use futures::StreamExt;
@@ -148,14 +147,11 @@ impl ExecutionPlan for UnnestExec {
     ) -> Result<SendableRecordBatchStream> {
         let input = self.input.execute(partition, context)?;
 
-        if !self.options.preserve_nulls {
-            return not_impl_err!("Unnest with preserve_nulls=false");
-        }
-
         Ok(Box::pin(UnnestStream {
             input,
             schema: self.schema.clone(),
             column: self.column.clone(),
+            options: self.options.clone(),
             num_input_batches: 0,
             num_input_rows: 0,
             num_output_batches: 0,
@@ -177,6 +173,8 @@ struct UnnestStream {
     schema: Arc<Schema>,
     /// The unnest column
     column: Column,
+    /// Options
+    options: UnnestOptions,
     /// number of input batches
     num_input_batches: usize,
     /// number of input rows
@@ -219,7 +217,8 @@ impl UnnestStream {
             .map(|maybe_batch| match maybe_batch {
                 Some(Ok(batch)) => {
                     let start = Instant::now();
-                    let result = build_batch(&batch, &self.schema, 
&self.column);
+                    let result =
+                        build_batch(&batch, &self.schema, &self.column, 
&self.options);
                     self.num_input_batches += 1;
                     self.num_input_rows += batch.num_rows();
                     if let Ok(ref batch) = result {
@@ -250,122 +249,260 @@ fn build_batch(
     batch: &RecordBatch,
     schema: &SchemaRef,
     column: &Column,
+    options: &UnnestOptions,
 ) -> Result<RecordBatch> {
     let list_array = column.evaluate(batch)?.into_array(batch.num_rows());
     match list_array.data_type() {
         DataType::List(_) => {
             let list_array = 
list_array.as_any().downcast_ref::<ListArray>().unwrap();
-            unnest_batch(batch, schema, column, &list_array)
+            build_batch_generic_list::<i32, Int32Type>(
+                batch,
+                schema,
+                column.index(),
+                list_array,
+                options,
+            )
         }
         DataType::LargeList(_) => {
             let list_array = list_array
                 .as_any()
                 .downcast_ref::<LargeListArray>()
                 .unwrap();
-            unnest_batch(batch, schema, column, &list_array)
+            build_batch_generic_list::<i64, Int64Type>(
+                batch,
+                schema,
+                column.index(),
+                list_array,
+                options,
+            )
         }
         DataType::FixedSizeList(_, _) => {
             let list_array = list_array
                 .as_any()
                 .downcast_ref::<FixedSizeListArray>()
                 .unwrap();
-            unnest_batch(batch, schema, column, list_array)
+            build_batch_fixedsize_list(batch, schema, column.index(), 
list_array, options)
         }
         _ => exec_err!("Invalid unnest column {column}"),
     }
 }
 
-fn unnest_batch<T>(
+fn build_batch_generic_list<T: OffsetSizeTrait, P: ArrowPrimitiveType<Native = 
T>>(
     batch: &RecordBatch,
     schema: &SchemaRef,
-    column: &Column,
-    list_array: &T,
-) -> Result<RecordBatch>
-where
-    T: ArrayAccessor<Item = ArrayRef>,
-{
-    // Create an array with the unnested values of the list array, given the 
list
-    // array:
-    //
-    //   [1], null, [2, 3, 4], null, [5, 6]
-    //
-    // the result array is:
-    //
-    //   1, null, 2, 3, 4, null, 5, 6
-    //
-    let unnested_array = unnest_array(list_array)?;
-
-    // Create an array with the lengths of each list value in the nested array.
-    // Given the nested array:
-    //
-    //   [1], null, [2, 3, 4], null, [5, 6]
-    //
-    // the result array is:
-    //
-    //   1, null, 3, null, 2
-    //
-    // Depending on the list type the result may be Int32Array or Int64Array.
-    let list_lengths = kernels::length::length(list_array)?;
-
-    // Create the indices for the take kernel and then use those indices to 
create
-    // the unnested record batch.
-    match list_lengths.data_type() {
-        DataType::Int32 => {
-            let list_lengths = as_primitive_array::<Int32Type>(&list_lengths)?;
-            let indices = create_take_indices(list_lengths, 
unnested_array.len());
-            batch_from_indices(batch, schema, column.index(), &unnested_array, 
&indices)
-        }
-        DataType::Int64 => {
-            let list_lengths = as_primitive_array::<Int64Type>(&list_lengths)?;
-            let indices = create_take_indices(list_lengths, 
unnested_array.len());
-            batch_from_indices(batch, schema, column.index(), &unnested_array, 
&indices)
-        }
-        dt => exec_err!("Unnest: unsupported indices type {dt}"),
-    }
+    unnest_column_idx: usize,
+    list_array: &GenericListArray<T>,
+    options: &UnnestOptions,
+) -> Result<RecordBatch> {
+    let unnested_array = unnest_generic_list::<T, P>(list_array, options)?;
+
+    let take_indicies =
+        create_take_indicies_generic::<T, P>(list_array, unnested_array.len(), 
options);
+
+    batch_from_indices(
+        batch,
+        schema,
+        unnest_column_idx,
+        &unnested_array,
+        &take_indicies,
+    )
 }
 
-/// Create the indices for the take kernel given an array of list values 
lengths.
+/// Given this `GenericList` list_array:
+///   
+/// ```ignore
+/// [1], null, [2, 3, 4], null, [5, 6]
+/// ```
+/// Its values array is represented like this:
+///
+/// ```ignore
+/// [1, 2, 3, 4, 5, 6]
+/// ```
 ///
-/// The indices are used to duplicate column elements so that all columns have 
as
-/// many rows as the unnested array.
+/// So if there are no null values or `UnnestOptions.preserve_nulls` is false
+/// we can return the values array without any copying.
 ///
-/// Given the nested array:
+/// Otherwise we'll transfrom the values array using the take kernel and the 
following take indicies:
 ///
 /// ```ignore
-/// [1], null, [2, 3, 4], null, [5, 6]
+/// 0, null, 1, 2, 3, null, 4, 5
 /// ```
 ///
-/// the `list_lengths` array contains the length of each list value:
+fn unnest_generic_list<T: OffsetSizeTrait, P: ArrowPrimitiveType<Native = T>>(
+    list_array: &GenericListArray<T>,
+    options: &UnnestOptions,
+) -> Result<Arc<dyn Array + 'static>> {
+    let values = list_array.values();
+    if list_array.null_count() == 0 || !options.preserve_nulls {
+        Ok(values.clone())
+    } else {
+        let mut take_indicies_builder =
+            PrimitiveArray::<P>::builder(values.len() + 
list_array.null_count());
+        let mut take_offset = 0;
+
+        list_array.iter().for_each(|elem| match elem {
+            Some(array) => {
+                for i in 0..array.len() {
+                    // take_offset + i is always positive
+                    let take_index = P::Native::from_usize(take_offset + 
i).unwrap();
+                    take_indicies_builder.append_value(take_index);
+                }
+                take_offset += array.len();
+            }
+            None => {
+                take_indicies_builder.append_null();
+            }
+        });
+        Ok(kernels::take::take(
+            &values,
+            &take_indicies_builder.finish(),
+            None,
+        )?)
+    }
+}
+
+fn build_batch_fixedsize_list(
+    batch: &RecordBatch,
+    schema: &SchemaRef,
+    unnest_column_idx: usize,
+    list_array: &FixedSizeListArray,
+    options: &UnnestOptions,
+) -> Result<RecordBatch> {
+    let unnested_array = unnest_fixed_list(list_array, options)?;
+
+    let take_indicies =
+        create_take_indicies_fixed(list_array, unnested_array.len(), options);
+
+    batch_from_indices(
+        batch,
+        schema,
+        unnest_column_idx,
+        &unnested_array,
+        &take_indicies,
+    )
+}
+
+/// Given this `FixedSizeListArray` list_array:
+///   
+/// ```ignore
+/// [1, 2], null, [3, 4], null, [5, 6]
+/// ```
+/// Its values array is represented like this:
 ///
 /// ```ignore
-/// 1, null,  3, null, 2
+/// [1, 2, null, null 3, 4, null, null, 5, 6]
 /// ```
 ///
-/// the result indices array is:
+/// So if there are no null values
+/// we can return the values array without any copying.
+///
+/// Otherwise we'll transfrom the values array using the take kernel.
+///
+/// If `UnnestOptions.preserve_nulls` is true the take indicies will look like 
this:
 ///
 /// ```ignore
-/// 0, 1, 2, 2, 2, 3, 4, 4
+/// 0, 1, null, 4, 5, null, 8, 9
+/// ```
+/// Otherwise we drop the nulls and take indicies will look like this:
+///
+/// ```ignore
+/// 0, 1, 4, 5, 8, 9
 /// ```
 ///
-/// where a null value count as one element.
-fn create_take_indices<T>(
-    list_lengths: &PrimitiveArray<T>,
+fn unnest_fixed_list(
+    list_array: &FixedSizeListArray,
+    options: &UnnestOptions,
+) -> Result<Arc<dyn Array + 'static>> {
+    let values = list_array.values();
+
+    if list_array.null_count() == 0 {
+        Ok(values.clone())
+    } else {
+        let len_without_nulls =
+            values.len() - list_array.null_count() * list_array.value_length() 
as usize;
+        let null_count = if options.preserve_nulls {
+            list_array.null_count()
+        } else {
+            0
+        };
+        let mut builder =
+            PrimitiveArray::<Int32Type>::builder(len_without_nulls + 
null_count);
+        let mut take_offset = 0;
+        let fixed_value_length = list_array.value_length() as usize;
+        list_array.iter().for_each(|elem| match elem {
+            Some(_) => {
+                for i in 0..fixed_value_length {
+                    //take_offset + i is always positive
+                    let take_index = take_offset + i;
+                    builder.append_value(take_index as i32);
+                }
+                take_offset += fixed_value_length;
+            }
+            None => {
+                if options.preserve_nulls {
+                    builder.append_null();
+                }
+                take_offset += fixed_value_length;
+            }
+        });
+        Ok(kernels::take::take(&values, &builder.finish(), None)?)
+    }
+}
+
+/// Creates take indicies to be used to expand all other column's data.
+/// Every column value needs to be repeated as many times as many elements 
there is in each corresponding array value.
+///
+/// If the column being unnested looks like this:
+///    
+/// ```ignore
+/// [1], null, [2, 3, 4], null, [5, 6]
+/// ```
+/// Then `create_take_indicies_generic` will return an array like this
+///
+/// ```ignore
+/// [1, null, 2, 2, 2, null, 4, 4]
+/// ```
+///
+fn create_take_indicies_generic<T: OffsetSizeTrait, P: 
ArrowPrimitiveType<Native = T>>(
+    list_array: &GenericListArray<T>,
     capacity: usize,
-) -> PrimitiveArray<T>
-where
-    T: ArrowPrimitiveType,
-{
-    let mut builder = PrimitiveArray::<T>::builder(capacity);
-    for row in 0..list_lengths.len() {
-        let repeat = if list_lengths.is_null(row) {
-            T::Native::ONE
+    options: &UnnestOptions,
+) -> PrimitiveArray<P> {
+    let mut builder = PrimitiveArray::<P>::builder(capacity);
+    let null_repeat: usize = if options.preserve_nulls { 1 } else { 0 };
+
+    for row in 0..list_array.len() {
+        let repeat = if list_array.is_null(row) {
+            null_repeat
+        } else {
+            list_array.value(row).len()
+        };
+
+        // `index` is a positive interger.
+        let index = P::Native::from_usize(row).unwrap();
+        (0..repeat).for_each(|_| builder.append_value(index));
+    }
+
+    builder.finish()
+}
+
+fn create_take_indicies_fixed(
+    list_array: &FixedSizeListArray,
+    capacity: usize,
+    options: &UnnestOptions,
+) -> PrimitiveArray<Int32Type> {
+    let mut builder = PrimitiveArray::<Int32Type>::builder(capacity);
+    let null_repeat: usize = if options.preserve_nulls { 1 } else { 0 };
+
+    for row in 0..list_array.len() {
+        let repeat = if list_array.is_null(row) {
+            null_repeat
         } else {
-            list_lengths.value(row)
+            list_array.value_length() as usize
         };
 
-        // Both `repeat` and `index` are positive intergers.
-        let repeat = repeat.to_usize().unwrap();
-        let index = T::Native::from_usize(row).unwrap();
+        // `index` is a positive interger.
+        let index = <Int32Type as 
ArrowPrimitiveType>::Native::from_usize(row).unwrap();
         (0..repeat).for_each(|_| builder.append_value(index));
     }
 
@@ -428,49 +565,3 @@ where
 
     Ok(RecordBatch::try_new(schema.clone(), arrays.to_vec())?)
 }
-
-/// Unnest the given list array. Given the array:
-///
-/// ```ignore
-/// [1], null, [2, 3, 4], null, [5, 6]
-/// ```
-///
-/// returns:
-///
-/// ```ignore
-/// 1, null, 2, 3, 4, null, 5, 6
-/// ```
-fn unnest_array<T>(list_array: &T) -> Result<Arc<dyn Array + 'static>>
-where
-    T: ArrayAccessor<Item = ArrayRef>,
-{
-    let elem_type = match list_array.data_type() {
-        DataType::List(f) | DataType::FixedSizeList(f, _) | 
DataType::LargeList(f) => {
-            f.data_type()
-        }
-        dt => return exec_err!("Cannot unnest array of type {dt}"),
-    };
-
-    if list_array.is_empty() {
-        return Ok(new_empty_array(elem_type));
-    }
-
-    let null_row = new_null_array(elem_type, 1);
-
-    // Create a vec of ArrayRef from the list elements.
-    let arrays = (0..list_array.len())
-        .map(|row| {
-            if list_array.is_null(row) {
-                null_row.clone()
-            } else {
-                list_array.value(row)
-            }
-        })
-        .collect::<Vec<_>>();
-
-    // Create Vec<&dyn Array> from Vec<Arc<dyn Array>> for `concat`. Calling
-    // `as_ref()` in the `map` above causes the borrow checker to complain.
-    let arrays = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
-
-    Ok(kernels::concat::concat(&arrays)?)
-}
diff --git a/datafusion/core/tests/dataframe/mod.rs 
b/datafusion/core/tests/dataframe/mod.rs
index 48ec4ef073..73cba35d70 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -27,6 +27,7 @@ use arrow::{
     },
     record_batch::RecordBatch,
 };
+use arrow_schema::ArrowError;
 use std::sync::Arc;
 
 use datafusion::dataframe::DataFrame;
@@ -1045,25 +1046,6 @@ async fn unnest_columns() -> Result<()> {
 }
 
 #[tokio::test]
-async fn unnest_column_preserve_nulls_not_supported() -> Result<()> {
-    // Unnest, preserving nulls not yet supported
-    let options = UnnestOptions::new().with_preserve_nulls(false);
-
-    let results = table_with_lists_and_nulls()
-        .await?
-        .clone()
-        .unnest_column_with_options("list", options)?
-        .collect()
-        .await;
-
-    assert_eq!(
-        results.unwrap_err().to_string(),
-        "This feature is not implemented: Unnest with preserve_nulls=false"
-    );
-    Ok(())
-}
-#[tokio::test]
-#[ignore] // https://github.com/apache/arrow-datafusion/issues/7087
 async fn unnest_column_nulls() -> Result<()> {
     let df = table_with_lists_and_nulls().await?;
     let results = df.clone().collect().await?;
@@ -1099,7 +1081,6 @@ async fn unnest_column_nulls() -> Result<()> {
     ];
     assert_batches_eq!(expected, &results);
 
-    // NOTE this is incorrect,
     let options = UnnestOptions::new().with_preserve_nulls(false);
     let results = df
         .unnest_column_with_options("list", options)?
@@ -1111,7 +1092,6 @@ async fn unnest_column_nulls() -> Result<()> {
         "+------+----+",
         "| 1    | A  |",
         "| 2    | A  |",
-        "|      | B  |", // this row should not be here
         "| 3    | D  |",
         "+------+----+",
     ];
@@ -1122,32 +1102,57 @@ async fn unnest_column_nulls() -> Result<()> {
 
 #[tokio::test]
 async fn unnest_fixed_list() -> Result<()> {
-    let mut shape_id_builder = UInt32Builder::new();
-    let mut tags_builder = FixedSizeListBuilder::new(StringBuilder::new(), 2);
+    let batch = get_fixed_list_batch()?;
 
-    for idx in 0..6 {
-        // Append shape id.
-        shape_id_builder.append_value(idx as u32 + 1);
+    let ctx = SessionContext::new();
+    ctx.register_batch("shapes", batch)?;
+    let df = ctx.table("shapes").await?;
 
-        if idx % 3 != 0 {
-            tags_builder
-                .values()
-                .append_value(format!("tag{}1", idx + 1));
-            tags_builder
-                .values()
-                .append_value(format!("tag{}2", idx + 1));
-            tags_builder.append(true);
-        } else {
-            tags_builder.values().append_null();
-            tags_builder.values().append_null();
-            tags_builder.append(false);
-        }
-    }
+    let results = df.clone().collect().await?;
+    let expected = vec![
+        "+----------+----------------+",
+        "| shape_id | tags           |",
+        "+----------+----------------+",
+        "| 1        |                |",
+        "| 2        | [tag21, tag22] |",
+        "| 3        | [tag31, tag32] |",
+        "| 4        |                |",
+        "| 5        | [tag51, tag52] |",
+        "| 6        | [tag61, tag62] |",
+        "+----------+----------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
 
-    let batch = RecordBatch::try_from_iter(vec![
-        ("shape_id", Arc::new(shape_id_builder.finish()) as ArrayRef),
-        ("tags", Arc::new(tags_builder.finish()) as ArrayRef),
-    ])?;
+    let options = UnnestOptions::new().with_preserve_nulls(true);
+
+    let results = df
+        .unnest_column_with_options("tags", options)?
+        .collect()
+        .await?;
+    let expected = vec![
+        "+----------+-------+",
+        "| shape_id | tags  |",
+        "+----------+-------+",
+        "| 1        |       |",
+        "| 2        | tag21 |",
+        "| 2        | tag22 |",
+        "| 3        | tag31 |",
+        "| 3        | tag32 |",
+        "| 4        |       |",
+        "| 5        | tag51 |",
+        "| 5        | tag52 |",
+        "| 6        | tag61 |",
+        "| 6        | tag62 |",
+        "+----------+-------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn unnest_fixed_list_drop_nulls() -> Result<()> {
+    let batch = get_fixed_list_batch()?;
 
     let ctx = SessionContext::new();
     ctx.register_batch("shapes", batch)?;
@@ -1168,17 +1173,20 @@ async fn unnest_fixed_list() -> Result<()> {
     ];
     assert_batches_sorted_eq!(expected, &results);
 
-    let results = df.unnest_column("tags")?.collect().await?;
+    let options = UnnestOptions::new().with_preserve_nulls(false);
+
+    let results = df
+        .unnest_column_with_options("tags", options)?
+        .collect()
+        .await?;
     let expected = vec![
         "+----------+-------+",
         "| shape_id | tags  |",
         "+----------+-------+",
-        "| 1        |       |",
         "| 2        | tag21 |",
         "| 2        | tag22 |",
         "| 3        | tag31 |",
         "| 3        | tag32 |",
-        "| 4        |       |",
         "| 5        | tag51 |",
         "| 5        | tag52 |",
         "| 6        | tag61 |",
@@ -1191,7 +1199,6 @@ async fn unnest_fixed_list() -> Result<()> {
 }
 
 #[tokio::test]
-#[ignore] // https://github.com/apache/arrow-datafusion/issues/7087
 async fn unnest_fixed_list_nonull() -> Result<()> {
     let mut shape_id_builder = UInt32Builder::new();
     let mut tags_builder = FixedSizeListBuilder::new(StringBuilder::new(), 2);
@@ -1531,6 +1538,37 @@ async fn table_with_nested_types(n: usize) -> 
Result<DataFrame> {
     ctx.table("shapes").await
 }
 
+fn get_fixed_list_batch() -> Result<RecordBatch, ArrowError> {
+    let mut shape_id_builder = UInt32Builder::new();
+    let mut tags_builder = FixedSizeListBuilder::new(StringBuilder::new(), 2);
+
+    for idx in 0..6 {
+        // Append shape id.
+        shape_id_builder.append_value(idx as u32 + 1);
+
+        if idx % 3 != 0 {
+            tags_builder
+                .values()
+                .append_value(format!("tag{}1", idx + 1));
+            tags_builder
+                .values()
+                .append_value(format!("tag{}2", idx + 1));
+            tags_builder.append(true);
+        } else {
+            tags_builder.values().append_null();
+            tags_builder.values().append_null();
+            tags_builder.append(false);
+        }
+    }
+
+    let batch = RecordBatch::try_from_iter(vec![
+        ("shape_id", Arc::new(shape_id_builder.finish()) as ArrayRef),
+        ("tags", Arc::new(tags_builder.finish()) as ArrayRef),
+    ])?;
+
+    Ok(batch)
+}
+
 /// A a data frame that a list of integers and string IDs
 async fn table_with_lists_and_nulls() -> Result<DataFrame> {
     let mut list_builder = ListBuilder::new(UInt32Builder::new());

Reply via email to