This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ed0949e994 Refactor `ByteGroupValueBuilder` to use 
`MaybeNullBufferBuilder` (#12681)
ed0949e994 is described below

commit ed0949e99412fce12b5b8622169ffb0ce9c38c77
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Oct 2 06:31:31 2024 -0400

    Refactor `ByteGroupValueBuilder` to use `MaybeNullBufferBuilder` (#12681)
---
 .../src/aggregates/group_values/group_column.rs    | 95 +++++++---------------
 1 file changed, 30 insertions(+), 65 deletions(-)

diff --git 
a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs 
b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index 7409f5c214..122d77683c 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -15,24 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::array::BooleanBufferBuilder;
 use arrow::array::BufferBuilder;
 use arrow::array::GenericBinaryArray;
 use arrow::array::GenericStringArray;
 use arrow::array::OffsetSizeTrait;
 use arrow::array::PrimitiveArray;
 use arrow::array::{Array, ArrayRef, ArrowPrimitiveType, AsArray};
-use arrow::buffer::NullBuffer;
 use arrow::buffer::OffsetBuffer;
 use arrow::buffer::ScalarBuffer;
-use arrow::datatypes::ArrowNativeType;
 use arrow::datatypes::ByteArrayType;
 use arrow::datatypes::DataType;
 use arrow::datatypes::GenericBinaryType;
-use arrow::datatypes::GenericStringType;
 use datafusion_common::utils::proxy::VecAllocExt;
 
 use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
+use arrow_array::types::GenericStringType;
 use datafusion_physical_expr_common::binary_map::{OutputType, 
INITIAL_BUFFER_CAPACITY};
 use std::sync::Arc;
 use std::vec;
@@ -190,6 +187,12 @@ impl<T: ArrowPrimitiveType> GroupColumn for 
PrimitiveGroupValueBuilder<T> {
 }
 
 /// An implementation of [`GroupColumn`] for binary and utf8 types.
+///
+/// Stores a collection of binary or utf8 group values in a single buffer
+/// in a way that allows:
+///
+/// 1. Efficient comparison of incoming rows to existing rows
+/// 2. Efficient construction of the final output array
 pub struct ByteGroupValueBuilder<O>
 where
     O: OffsetSizeTrait,
@@ -201,8 +204,8 @@ where
     /// stored in the range `offsets[i]..offsets[i+1]` in `buffer`. Null values
     /// are stored as a zero length string.
     offsets: Vec<O>,
-    /// Null indexes in offsets, if `i` is in nulls, `offsets[i]` should be 
equals to `offsets[i+1]`
-    nulls: Vec<usize>,
+    /// Nulls
+    nulls: MaybeNullBufferBuilder,
 }
 
 impl<O> ByteGroupValueBuilder<O>
@@ -214,7 +217,7 @@ where
             output_type,
             buffer: BufferBuilder::new(INITIAL_BUFFER_CAPACITY),
             offsets: vec![O::default()],
-            nulls: vec![],
+            nulls: MaybeNullBufferBuilder::new(),
         }
     }
 
@@ -224,40 +227,33 @@ where
     {
         let arr = array.as_bytes::<B>();
         if arr.is_null(row) {
-            self.nulls.push(self.len());
+            self.nulls.append(true);
             // nulls need a zero length in the offset buffer
             let offset = self.buffer.len();
-
             self.offsets.push(O::usize_as(offset));
-            return;
+        } else {
+            self.nulls.append(false);
+            let value: &[u8] = arr.value(row).as_ref();
+            self.buffer.append_slice(value);
+            self.offsets.push(O::usize_as(self.buffer.len()));
         }
-
-        let value: &[u8] = arr.value(row).as_ref();
-        self.buffer.append_slice(value);
-        self.offsets.push(O::usize_as(self.buffer.len()));
     }
 
     fn equal_to_inner<B>(&self, lhs_row: usize, array: &ArrayRef, rhs_row: 
usize) -> bool
     where
         B: ByteArrayType,
     {
-        // Handle nulls
-        let is_lhs_null = self.nulls.iter().any(|null_idx| *null_idx == 
lhs_row);
         let arr = array.as_bytes::<B>();
-        if is_lhs_null {
-            return arr.is_null(rhs_row);
-        } else if arr.is_null(rhs_row) {
-            return false;
-        }
+        self.nulls.is_null(lhs_row) == arr.is_null(rhs_row)
+            && self.value(lhs_row) == (arr.value(rhs_row).as_ref() as &[u8])
+    }
 
-        let arr = array.as_bytes::<B>();
-        let rhs_elem: &[u8] = arr.value(rhs_row).as_ref();
-        let rhs_elem_len = arr.value_length(rhs_row).as_usize();
-        debug_assert_eq!(rhs_elem_len, rhs_elem.len());
-        let l = self.offsets[lhs_row].as_usize();
-        let r = self.offsets[lhs_row + 1].as_usize();
-        let existing_elem = unsafe { 
self.buffer.as_slice().get_unchecked(l..r) };
-        rhs_elem == existing_elem
+    /// return the current value of the specified row irrespective of null
+    pub fn value(&self, row: usize) -> &[u8] {
+        let l = self.offsets[row].as_usize();
+        let r = self.offsets[row + 1].as_usize();
+        // Safety: the offsets are constructed correctly and never decrease
+        unsafe { self.buffer.as_slice().get_unchecked(l..r) }
     }
 }
 
@@ -325,18 +321,7 @@ where
             nulls,
         } = *self;
 
-        let null_buffer = if nulls.is_empty() {
-            None
-        } else {
-            // Only make a `NullBuffer` if there was a null value
-            let num_values = offsets.len() - 1;
-            let mut bool_builder = BooleanBufferBuilder::new(num_values);
-            bool_builder.append_n(num_values, true);
-            nulls.into_iter().for_each(|null_index| {
-                bool_builder.set_bit(null_index, false);
-            });
-            Some(NullBuffer::from(bool_builder.finish()))
-        };
+        let null_buffer = nulls.build();
 
         // SAFETY: the offsets were constructed correctly in `insert_if_new` --
         // monotonically increasing, overflows were checked.
@@ -353,9 +338,9 @@ where
                 // SAFETY:
                 // 1. the offsets were constructed safely
                 //
-                // 2. we asserted the input arrays were all the correct type 
and
-                // thus since all the values that went in were valid (e.g. 
utf8)
-                // so are all the values that come out
+                // 2. the input arrays were all the correct type and thus since
+                // all the values that went in were valid (e.g. utf8) so are 
all
+                // the values that come out
                 Arc::new(unsafe {
                     GenericStringArray::new_unchecked(offsets, values, 
null_buffer)
                 })
@@ -366,27 +351,7 @@ where
 
     fn take_n(&mut self, n: usize) -> ArrayRef {
         debug_assert!(self.len() >= n);
-
-        let null_buffer = if self.nulls.is_empty() {
-            None
-        } else {
-            // Only make a `NullBuffer` if there was a null value
-            let mut bool_builder = BooleanBufferBuilder::new(n);
-            bool_builder.append_n(n, true);
-
-            let mut new_nulls = vec![];
-            self.nulls.iter().for_each(|null_index| {
-                if *null_index < n {
-                    bool_builder.set_bit(*null_index, false);
-                } else {
-                    new_nulls.push(null_index - n);
-                }
-            });
-
-            self.nulls = new_nulls;
-            Some(NullBuffer::from(bool_builder.finish()))
-        };
-
+        let null_buffer = self.nulls.take_n(n);
         let first_remaining_offset = O::as_usize(self.offsets[n]);
 
         // Given offests like [0, 2, 4, 5] and n = 1, we expect to get


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to