alamb commented on code in PR #7650: URL: https://github.com/apache/arrow-rs/pull/7650#discussion_r2143742195
########## arrow-select/src/coalesce.rs: ########## @@ -222,122 +250,333 @@ impl BatchCoalescer { } } -/// Heuristically compact `StringViewArray`s to reduce memory usage, if needed -/// -/// Decides when to consolidate the StringView into a new buffer to reduce -/// memory usage and improve string locality for better performance. -/// -/// This differs from `StringViewArray::gc` because: -/// 1. It may not compact the array depending on a heuristic. -/// 2. It uses a precise block size to reduce the number of buffers to track. -/// -/// # Heuristic +/// Return a new `InProgressArray` for the given data type +fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn InProgressArray> { + match data_type { + DataType::Utf8View => Box::new(InProgressStringViewArray::new(batch_size)), + _ => Box::new(GenericInProgressArray::new()), + } +} + +/// Incrementally builds in progress arrays /// -/// If the average size of each view is larger than 32 bytes, we compact the array. +/// There are different specialized implementations of this trait for different +/// array types (e.g., [`StringViewArray`], [`UInt32Array`], etc.). /// -/// `StringViewArray` include pointers to buffer that hold the underlying data. -/// One of the great benefits of `StringViewArray` is that many operations -/// (e.g., `filter`) can be done without copying the underlying data. +/// This is a subset of the ArrayBuilder APIs, but specialized for +/// the incremental usecase +trait InProgressArray: std::fmt::Debug { + /// Push a new array to the in-progress array + fn push_array(&mut self, array: ArrayRef); + + /// Finish the currently in-progress array and clear state for the next + fn finish(&mut self) -> Result<ArrayRef, ArrowError>; +} + +/// Fallback implementation for [`InProgressArray`] /// -/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the -/// `StringViewArray` may only refer to a small portion of the buffer, -/// significantly increasing memory usage. -fn gc_string_view_batch(batch: RecordBatch) -> RecordBatch { - let (schema, columns, num_rows) = batch.into_parts(); - let new_columns: Vec<ArrayRef> = columns - .into_iter() - .map(|c| { - // Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long. - let Some(s) = c.as_string_view_opt() else { - return c; - }; - if s.data_buffers().is_empty() { - // If there are no data buffers, we can just return the array as is - return c; - } - let ideal_buffer_size: usize = s - .views() +/// Internally, buffers arrays and calls [`concat`] +#[derive(Debug)] +struct GenericInProgressArray { + /// The buffered arrays + buffered_arrays: Vec<ArrayRef>, +} + +impl GenericInProgressArray { + /// Create a new `GenericInProgressArray` + pub fn new() -> Self { + Self { + buffered_arrays: vec![], + } + } +} +impl InProgressArray for GenericInProgressArray { + fn push_array(&mut self, array: ArrayRef) { + self.buffered_arrays.push(array); + } + + fn finish(&mut self) -> Result<ArrayRef, ArrowError> { + // Concatenate all buffered arrays into a single array, which uses 2x + // peak memory + let array = concat( + &self + .buffered_arrays .iter() - .map(|v| { - let len = (*v as u32) as usize; - if len > 12 { - len - } else { - 0 - } - }) - .sum(); - let actual_buffer_size = s.get_buffer_memory_size(); - let buffers = s.data_buffers(); - - // Re-creating the array copies data and can be time consuming. - // We only do it if the array is sparse - if actual_buffer_size > (ideal_buffer_size * 2) { - if ideal_buffer_size == 0 { - // If the ideal buffer size is 0, all views are inlined - // so just reuse the views - return Arc::new(unsafe { - StringViewArray::new_unchecked( - s.views().clone(), - vec![], - s.nulls().cloned(), - ) - }); - } - // We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches. - // See https://github.com/apache/arrow-rs/issues/6094 for more details. - let mut buffer: Vec<u8> = Vec::with_capacity(ideal_buffer_size); - - let views: Vec<u128> = s - .views() - .as_ref() - .iter() - .cloned() - .map(|v| { - let mut b: ByteView = ByteView::from(v); - - if b.length > 12 { - let offset = buffer.len() as u32; - buffer.extend_from_slice( - buffers[b.buffer_index as usize] - .get(b.offset as usize..b.offset as usize + b.length as usize) - .expect("Invalid buffer slice"), - ); - b.offset = offset; - b.buffer_index = 0; // Set buffer index to 0, as we only have one buffer - } - - b.into() - }) - .collect(); - - let buffers = if buffer.is_empty() { - vec![] - } else { - vec![buffer.into()] + .map(|array| array as &dyn Array) + .collect::<Vec<_>>(), + )?; + self.buffered_arrays.clear(); + Ok(array) + } +} + +/// InProgressArray for StringViewArray +/// +/// TODO: genreric for GenericByteView +#[derive(Debug)] +struct InProgressStringViewArray { + /// the target batch size (and thus size for views allocation) + batch_size: usize, + /// The in progress vies + views: Vec<u128>, + /// In progress nulls + nulls: NullBufferBuilder, + /// current buffer + current: Option<Vec<u8>>, + /// completed buffers + completed: Vec<Buffer>, + /// Where to get the next buffer + buffer_source: BufferSource, +} + +impl InProgressStringViewArray { + fn new(batch_size: usize) -> Self { + let buffer_source = BufferSource::new(); + + Self { + batch_size, + views: Vec::with_capacity(batch_size), + nulls: NullBufferBuilder::new(batch_size), + current: None, + completed: vec![], + buffer_source, + } + } + + /// Update self.nulls with the nulls from the StringViewArray + fn push_nulls(&mut self, s: &StringViewArray) { + if let Some(nulls) = s.nulls().as_ref() { + self.nulls.append_buffer(nulls); + } else { + self.nulls.append_n_non_nulls(s.len()); + } + } + + /// Finishes the currently inprogress block, if any + fn finish_current(&mut self) { + let Some(next_buffer) = self.current.take() else { + return; + }; + self.completed.push(next_buffer.into()); + } + + /// Append views to self.views, updating the buffer index if necessary + #[inline(never)] + fn append_views_and_update_buffer_index(&mut self, views: &[u128], buffers: &[Buffer]) { + if let Some(buffer) = self.current.take() { + self.completed.push(buffer.into()); + } + let starting_buffer: u32 = self.completed.len().try_into().expect("too many buffers"); + self.completed.extend_from_slice(buffers); + + if starting_buffer == 0 { + // If there are no buffers, we can just use the views as is + self.views.extend_from_slice(views); + } else { + // If there are buffers, we need to update the buffer index + let updated_views = views.iter().map(|v| { + let mut byte_view = ByteView::from(*v); + if byte_view.length > 12 { + // Small views (<=12 bytes) are inlined, so only need to update large views + byte_view.buffer_index += starting_buffer; }; + byte_view.as_u128() + }); + + self.views.extend(updated_views); + } + } - let gc_string = unsafe { - StringViewArray::new_unchecked(views.into(), buffers, s.nulls().cloned()) + /// Append views to self.views, copying data from the buffers into + /// self.buffers + /// + /// # Arguments + /// - `views` - the views to append + /// - `actual_buffer_size` - the size of the bytes pointed to by the views + /// - `buffers` - the buffers the reviews point to + #[inline(never)] + fn append_views_and_copy_strings( + &mut self, + views: &[u128], + actual_buffer_size: usize, + buffers: &[Buffer], + ) { + let mut current = match self.current.take() { + Some(current) => { + // If the current buffer is not large enough, allocate a new one + // TODO copy as many views that will fit into the current buffer? + if current.len() + actual_buffer_size > current.capacity() { + self.completed.push(current.into()); + self.buffer_source.next_buffer(actual_buffer_size) + } else { + current + } + } + None => { + // If there is no current buffer, allocate a new one + self.buffer_source.next_buffer(actual_buffer_size) + } + }; + + let new_buffer_index: u32 = self.completed.len().try_into().expect("too many buffers"); + + // Copy the views, updating the buffer index and copying the data as needed + let new_views = views.iter().map(|v| { + let mut b: ByteView = ByteView::from(*v); + if b.length > 12 { + let buffer_index = b.buffer_index as usize; + let buffer_offset = b.offset as usize; + let str_len = b.length as usize; + + // Update view to location in current + b.offset = current.len() as u32; + b.buffer_index = new_buffer_index; + + // safety: input views are validly constructed + let src = unsafe { + buffers + .get_unchecked(buffer_index) + .get_unchecked(buffer_offset..buffer_offset + str_len) }; + current.extend_from_slice(src); + } + b.as_u128() + }); + self.views.extend(new_views); + self.current = Some(current); + } +} + +impl InProgressArray for InProgressStringViewArray { + fn push_array(&mut self, array: ArrayRef) { + let s = array.as_string_view(); + + // add any nulls, as necessary + self.push_nulls(s); + + // If there are no data buffers in s (all inlined views), can append the + // views/nulls and done + if s.data_buffers().is_empty() { + self.views.extend_from_slice(s.views().as_ref()); + return; + } + + let ideal_buffer_size = s.total_buffer_bytes_used(); + let actual_buffer_size = s.get_buffer_memory_size(); + let buffers = s.data_buffers(); + + // None of the views references the buffers (e.g. sliced) + if ideal_buffer_size == 0 { + self.views.extend_from_slice(s.views().as_ref()); + return; + } + + // Copying the strings into a buffer can be time-consuming so + // only do it if the array is sparse + if actual_buffer_size > (ideal_buffer_size * 2) { + self.append_views_and_copy_strings(s.views(), actual_buffer_size, buffers); + } else { + self.append_views_and_update_buffer_index(s.views(), buffers); + } + } + + fn finish(&mut self) -> Result<ArrayRef, ArrowError> { + self.finish_current(); + assert!(self.current.is_none()); + let buffers = std::mem::take(&mut self.completed); + + let mut views = Vec::with_capacity(self.batch_size); + std::mem::swap(&mut self.views, &mut views); - Arc::new(gc_string) - } else { - c + let nulls = self.nulls.finish(); + self.nulls = NullBufferBuilder::new(self.batch_size); Review Comment: I double checked and NullBufferBuilder doesn't actually allocate on `new`: https://github.com/apache/arrow-rs/blob/8dad535a4a7481c5f74f98cd571798fa7e87d233/arrow-buffer/src/builder/null.rs#L64-L69 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org