This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new e106c15340 arrow-select: Optimize
`BatchCoalescer::push_batch_with_filter` for low selectivity filters and
inlined Utf8View/BinaryView (#9755)
e106c15340 is described below
commit e106c1534095cbfe1201a770c8ccca5252fdbb88
Author: ClSlaid <[email protected]>
AuthorDate: Thu Jun 11 02:35:33 2026 +0800
arrow-select: Optimize `BatchCoalescer::push_batch_with_filter` for low
selectivity filters and inlined Utf8View/BinaryView (#9755)
## Summary
- fuse the sparse inline `BinaryView` filter and coalescing paths so
primitive columns and inline views can be appended directly without
materialising an intermediate filtered `RecordBatch`
- reuse optimised filter indices and null-mask handling for coalescing,
while preserving the existing fallback paths for dense and non-inline
`BinaryView` inputs
- add focused tests and benchmarks for single-column and mixed
`BinaryView` filter cases related to `#9143`
## Verification
- `cargo test -p arrow-select coalesce --lib`
- `cargo clippy -p arrow-select --lib --tests -- -D warnings`
- `cargo clippy -p arrow --bench coalesce_kernels --features test_utils
-- -D warnings`
- `cargo bench -p arrow --bench coalesce_kernels --features test_utils
-- --noplot single_binaryview`
- `cargo bench -p arrow --bench coalesce_kernels --features test_utils
-- --noplot mixed_binaryview`
## Benchmark Results
Measured against a clean `origin/main` worktree with the same
`BinaryView` benchmark additions. The figures below compare
representative median times from the baseline worktree and this branch.
### Mixed primitive + BinaryView
- `mixed_binaryview (max_string_len=8), 8192, nulls: 0, selectivity:
0.001`: `23.16 ms` -> `8.51 ms`
- `mixed_binaryview (max_string_len=8), 8192, nulls: 0, selectivity:
0.01`: `2.37 ms` -> `1.31 ms`
- `mixed_binaryview (max_string_len=8), 8192, nulls: 0.1, selectivity:
0.001`: `31.70 ms` -> `14.33 ms`
- `mixed_binaryview (max_string_len=8), 8192, nulls: 0.1, selectivity:
0.01`: `3.92 ms` -> `2.44 ms`
### Single BinaryView
- `single_binaryview, 8192, nulls: 0, selectivity: 0.01`: `4.86 ms` ->
`4.90 ms` (roughly flat, slightly slower)
- `single_binaryview (max_string_len=8), 8192, nulls: 0, selectivity:
0.001`: `34.72 ms` -> `19.33 ms`
- `single_binaryview (max_string_len=8), 8192, nulls: 0, selectivity:
0.01`: `3.46 ms` -> `2.03 ms`
- `single_binaryview (max_string_len=8), 8192, nulls: 0.1, selectivity:
0.01`: `5.93 ms` -> `3.97 ms`
- `single_binaryview (max_string_len=8), 8192, nulls: 0, selectivity:
0.8`: `597 µs` -> `619 µs` (regression)
- `single_binaryview (max_string_len=8), 8192, nulls: 0.1, selectivity:
0.8`: `1.78 ms` -> `1.79 ms` (roughly flat, slightly slower)
In short, this change substantially improves the mixed primitive +
inline `BinaryView` path that motivated `#9143`, while the single-column
`BinaryView` benchmarks still show trade-offs: sparse inline cases
improve, but dense inline cases are slightly slower and the non-inline
single-column path is effectively unchanged.
Closes #9143.
---------
Signed-off-by: cl <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
arrow-select/src/coalesce.rs | 363 +++++++++++++++++++++++++++++++--
arrow-select/src/coalesce/byte_view.rs | 167 ++++++++++++++-
arrow-select/src/coalesce/generic.rs | 11 +
arrow-select/src/coalesce/primitive.rs | 222 +++++++++++++++++++-
arrow-select/src/filter.rs | 145 +++++++++++--
5 files changed, 866 insertions(+), 42 deletions(-)
diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs
index 8fe88fb8c3..0d5199941f 100644
--- a/arrow-select/src/coalesce.rs
+++ b/arrow-select/src/coalesce.rs
@@ -20,7 +20,7 @@
//!
//! [`filter`]: crate::filter::filter
//! [`take`]: crate::take::take
-use crate::filter::filter_record_batch;
+use crate::filter::{FilterBuilder, FilterPredicate, FilterSelection};
use crate::take::take_record_batch;
use arrow_array::types::{BinaryViewType, StringViewType};
use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch,
downcast_primitive};
@@ -38,6 +38,21 @@ use byte_view::InProgressByteViewArray;
use generic::GenericInProgressArray;
use primitive::InProgressPrimitiveArray;
+fn has_sparse_filter_copy(data_type: &DataType) -> bool {
+ data_type.is_primitive() || matches!(data_type, DataType::Utf8View |
DataType::BinaryView)
+}
+
+/// Maximum selected row fraction for the fused sparse-filter copy path.
+///
+/// Shared benchmark results show this path helps low-selectivity filters, but
+/// can regress once the filter becomes denser. Keep this as a cheap integer
+/// threshold on the hot path: `selected_count <= filter_len / 16`.
+const SPARSE_FILTER_COPY_MAX_SELECTIVITY_DENOMINATOR: usize = 16;
+
+fn should_use_sparse_filter_copy(filter_len: usize, selected_count: usize) ->
bool {
+ selected_count <= filter_len /
SPARSE_FILTER_COPY_MAX_SELECTIVITY_DENOMINATOR
+}
+
/// Concatenate multiple [`RecordBatch`]es
///
/// Implements the common pattern of incrementally creating output
@@ -139,6 +154,8 @@ pub struct BatchCoalescer {
target_batch_size: usize,
/// In-progress arrays
in_progress_arrays: Vec<Box<dyn InProgressArray>>,
+ /// True if some column still needs the materialized filter path.
+ has_non_specialized_filter_columns: bool,
/// Buffered row count. Always less than `batch_size`
buffered_rows: usize,
/// Completed batches
@@ -156,6 +173,10 @@ impl BatchCoalescer {
/// Typical values are `4096` or `8192` rows.
///
pub fn new(schema: SchemaRef, target_batch_size: usize) -> Self {
+ let has_non_specialized_filter_columns = schema
+ .fields()
+ .iter()
+ .any(|field| !has_sparse_filter_copy(field.data_type()));
let in_progress_arrays = schema
.fields()
.iter()
@@ -166,6 +187,7 @@ impl BatchCoalescer {
schema,
target_batch_size,
in_progress_arrays,
+ has_non_specialized_filter_columns,
// We will for sure store at least one completed batch
completed: VecDeque::with_capacity(1),
buffered_rows: 0,
@@ -212,7 +234,7 @@ impl BatchCoalescer {
/// Push a batch into the Coalescer after applying a filter
///
/// This is semantically equivalent of calling [`Self::push_batch`]
- /// with the results from [`filter_record_batch`]
+ /// with the results from [`crate::filter::filter_record_batch`]
///
/// # Example
/// ```
@@ -238,10 +260,7 @@ impl BatchCoalescer {
batch: RecordBatch,
filter: &BooleanArray,
) -> Result<(), ArrowError> {
- // TODO: optimize this to avoid materializing (copying the results
- // of filter to a new batch)
- let filtered_batch = filter_record_batch(&batch, filter)?;
- self.push_batch(filtered_batch)
+ self.push_batch_with_filtered_columns(batch, filter)
}
/// Push a batch into the Coalescer after applying a set of indices
@@ -566,6 +585,87 @@ impl BatchCoalescer {
}
}
+impl BatchCoalescer {
+ fn filter_predicate_for_batch(
+ batch: &RecordBatch,
+ filter: &BooleanArray,
+ selected_count: usize,
+ ) -> FilterPredicate {
+ let mut filter_builder = FilterBuilder::new_with_count(filter,
selected_count);
+ if batch.num_columns() > 1
+ || (batch.num_columns() > 0
+ &&
FilterBuilder::is_optimize_beneficial(batch.schema_ref().field(0).data_type()))
+ {
+ filter_builder = filter_builder.optimize();
+ }
+ filter_builder.build()
+ }
+
+ fn push_batch_with_filtered_columns(
+ &mut self,
+ batch: RecordBatch,
+ filter: &BooleanArray,
+ ) -> Result<(), ArrowError> {
+ let filter_len = filter.len();
+ let batch_num_rows = batch.num_rows();
+ let batch_num_columns = batch.num_columns();
+
+ if filter_len > batch_num_rows {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Filter predicate of length {} is larger than target array of
length {}",
+ filter_len, batch_num_rows
+ )));
+ }
+
+ let selected_count = filter.true_count();
+ if selected_count == 0 {
+ return Ok(());
+ }
+
+ if selected_count == batch_num_rows && filter_len == batch_num_rows {
+ return self.push_batch(batch);
+ }
+
+ if batch_num_columns != self.in_progress_arrays.len() {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Batch has {} columns but BatchCoalescer expects {}",
+ batch_num_columns,
+ self.in_progress_arrays.len()
+ )));
+ }
+
+ let exceeds_coalesce_limit = self
+ .biggest_coalesce_batch_size
+ .is_some_and(|limit| selected_count > limit);
+ let does_not_fit_buffer = selected_count > self.target_batch_size -
self.buffered_rows;
+ let should_materialize_filter = exceeds_coalesce_limit
+ || self.has_non_specialized_filter_columns
+ || does_not_fit_buffer
+ || !should_use_sparse_filter_copy(filter_len, selected_count);
+
+ if should_materialize_filter {
+ // Use materialized filtering when sparse per-column copying is
unavailable.
+ let predicate = Self::filter_predicate_for_batch(&batch, filter,
selected_count);
+ let filtered_batch = predicate.filter_record_batch(&batch)?;
+ return self.push_batch(filtered_batch);
+ }
+
+ let predicate = Self::filter_predicate_for_batch(&batch, filter,
selected_count);
+ let (_schema, arrays, _num_rows) = batch.into_parts();
+
+ for (in_progress, array) in
self.in_progress_arrays.iter_mut().zip(arrays) {
+ in_progress.copy_rows_by_filter_from(array, &predicate)?;
+ }
+
+ self.buffered_rows += selected_count;
+ if self.buffered_rows >= self.target_batch_size {
+ self.finish_buffered_batch()?;
+ }
+
+ Ok(())
+ }
+}
+
/// Return a new `InProgressArray` for the given data type
fn create_in_progress_array(data_type: &DataType, batch_size: usize) ->
Box<dyn InProgressArray> {
macro_rules! instantiate_primitive {
@@ -611,6 +711,35 @@ trait InProgressArray: std::fmt::Debug + Send + Sync {
/// Return an error if the source array is not set
fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(),
ArrowError>;
+ /// Copy rows selected by `filter` from the current source array.
+ fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(),
ArrowError> {
+ self.copy_rows_by_selection(filter.selection())
+ }
+
+ /// Copy rows selected by `filter` from `source`.
+ fn copy_rows_by_filter_from(
+ &mut self,
+ source: ArrayRef,
+ filter: &FilterPredicate,
+ ) -> Result<(), ArrowError> {
+ self.set_source(Some(source));
+ let result = self.copy_rows_by_filter(filter);
+ self.set_source(None);
+ result
+ }
+
+ /// Copy rows described by a [`FilterSelection`] from the current source
array.
+ fn copy_rows_by_selection(&mut self, selection: FilterSelection<'_>) ->
Result<(), ArrowError> {
+ match selection {
+ FilterSelection::None => Ok(()),
+ FilterSelection::All { len } => self.copy_rows(0, len),
+ FilterSelection::Slices(slices) => {
+ slices.try_for_each(|(start, end)| self.copy_rows(start, end -
start))
+ }
+ FilterSelection::Indices(indices) => indices.try_for_each(|idx|
self.copy_rows(idx, 1)),
+ }
+ }
+
/// Finish the currently in-progress array and return it as an `ArrayRef`
fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
}
@@ -619,6 +748,7 @@ trait InProgressArray: std::fmt::Debug + Send + Sync {
mod tests {
use super::*;
use crate::concat::concat_batches;
+ use crate::filter::filter_record_batch;
use arrow_array::builder::StringViewBuilder;
use arrow_array::cast::AsArray;
use arrow_array::types::Int32Type;
@@ -665,6 +795,14 @@ mod tests {
.run();
}
+ #[test]
+ fn test_sparse_filter_copy_threshold() {
+ assert!(should_use_sparse_filter_copy(8192, 8));
+ assert!(should_use_sparse_filter_copy(8192, 81));
+ assert!(!should_use_sparse_filter_copy(8192, 819));
+ assert!(!should_use_sparse_filter_copy(8192, 6553));
+ }
+
#[test]
fn test_single_large_batch_greater_than_target() {
// test a single large batch
@@ -1197,6 +1335,195 @@ mod tests {
.run();
}
+ #[test]
+ fn test_binary_view_filtered() {
+ let values: Vec<Option<&[u8]>> = vec![
+ Some(b"foo"),
+ None,
+ Some(b"A longer string that is more than 12 bytes"),
+ ];
+
+ let binary_view =
+
BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
+ let batch =
+ RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as
ArrayRef)]).unwrap();
+ let filter = sparse_filter(1000);
+
+ Test::new("coalesce_binary_view_filtered")
+ .with_batch(batch.clone())
+ .with_filter(filter.clone())
+ .with_batch(batch)
+ .with_filter(filter)
+ .with_batch_size(256)
+ .with_expected_output_sizes(vec![250])
+ .run();
+ }
+
+ #[test]
+ fn test_binary_view_filtered_inline() {
+ let values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None,
Some(b"barbaz")];
+
+ let binary_view =
+
BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
+ let batch =
+ RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as
ArrayRef)]).unwrap();
+ let filter = sparse_filter(1000);
+
+ Test::new("coalesce_binary_view_filtered_inline")
+ .with_batch(batch.clone())
+ .with_filter(filter.clone())
+ .with_batch(batch)
+ .with_filter(filter)
+ .with_batch_size(300)
+ .with_expected_output_sizes(vec![250])
+ .run();
+ }
+
+ #[test]
+ fn test_string_view_filtered_inline() {
+ let values: Vec<Option<&str>> = vec![Some("foo"), None,
Some("barbaz")];
+
+ let string_view =
+
StringViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
+ let batch =
+ RecordBatch::try_from_iter(vec![("c0", Arc::new(string_view) as
ArrayRef)]).unwrap();
+ let filter = sparse_filter(1000);
+
+ Test::new("coalesce_string_view_filtered_inline")
+ .with_batch(batch.clone())
+ .with_filter(filter.clone())
+ .with_batch(batch)
+ .with_filter(filter)
+ .with_batch_size(300)
+ .with_expected_output_sizes(vec![250])
+ .run();
+ }
+
+ #[test]
+ fn test_mixed_inline_binary_view_filtered() {
+ let int_values =
+ Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None }
else { Some(v) }));
+ let float_values =
arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64)));
+ let binary_values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None,
Some(b"barbaz")];
+ let binary_view = BinaryViewArray::from_iter(
+ std::iter::repeat(binary_values.iter()).flatten().take(1000),
+ );
+
+ let batch = RecordBatch::try_from_iter(vec![
+ ("i", Arc::new(int_values) as ArrayRef),
+ ("f", Arc::new(float_values) as ArrayRef),
+ ("b", Arc::new(binary_view) as ArrayRef),
+ ])
+ .unwrap();
+
+ let filter = sparse_filter(1000);
+
+ Test::new("coalesce_mixed_inline_binary_view_filtered")
+ .with_batch(batch.clone())
+ .with_filter(filter.clone())
+ .with_batch(batch)
+ .with_filter(filter)
+ .with_batch_size(300)
+ .with_expected_output_sizes(vec![250])
+ .run();
+ }
+
+ #[test]
+ fn test_mixed_inline_string_view_filtered() {
+ let int_values =
+ Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None }
else { Some(v) }));
+ let float_values =
arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64)));
+ let string_values: Vec<Option<&str>> = vec![Some("foo"), None,
Some("barbaz")];
+ let string_view = StringViewArray::from_iter(
+ std::iter::repeat(string_values.iter()).flatten().take(1000),
+ );
+
+ let batch = RecordBatch::try_from_iter(vec![
+ ("i", Arc::new(int_values) as ArrayRef),
+ ("f", Arc::new(float_values) as ArrayRef),
+ ("s", Arc::new(string_view) as ArrayRef),
+ ])
+ .unwrap();
+
+ let filter = sparse_filter(1000);
+
+ Test::new("coalesce_mixed_inline_string_view_filtered")
+ .with_batch(batch.clone())
+ .with_filter(filter.clone())
+ .with_batch(batch)
+ .with_filter(filter)
+ .with_batch_size(300)
+ .with_expected_output_sizes(vec![250])
+ .run();
+ }
+
+ #[test]
+ fn test_mixed_boolean_inline_string_view_filtered() {
+ let bool_values = BooleanArray::from_iter((0..1000).map(|v| Some(v % 3
== 0)));
+ let string_values: Vec<Option<&str>> = vec![Some("foo"), None,
Some("barbaz")];
+ let string_view = StringViewArray::from_iter(
+ std::iter::repeat(string_values.iter()).flatten().take(1000),
+ );
+
+ let batch = RecordBatch::try_from_iter(vec![
+ ("b", Arc::new(bool_values) as ArrayRef),
+ ("s", Arc::new(string_view) as ArrayRef),
+ ])
+ .unwrap();
+
+ let filter = sparse_filter(1000);
+
+ Test::new("coalesce_mixed_boolean_inline_string_view_filtered")
+ .with_batch(batch.clone())
+ .with_filter(filter.clone())
+ .with_batch(batch)
+ .with_filter(filter)
+ .with_batch_size(300)
+ .with_expected_output_sizes(vec![250])
+ .run();
+ }
+
+ #[test]
+ fn test_inline_filter_rejects_filter_longer_than_batch() {
+ let values: Vec<Option<&[u8]>> = vec![Some(b"foo"), Some(b"bar")];
+ let binary_view = BinaryViewArray::from_iter(values);
+ let batch =
+ RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as
ArrayRef)]).unwrap();
+ let filter = BooleanArray::from(vec![true, false, true]);
+
+ let mut coalescer = BatchCoalescer::new(batch.schema(), 100);
+ let result = coalescer.push_batch_with_filter(batch, &filter);
+ assert!(result.is_err());
+ let err = result.unwrap_err().to_string();
+ assert!(
+ err.contains("Filter predicate of length 3 is larger than target
array of length 2"),
+ "unexpected error: {err}"
+ );
+ }
+
+ #[test]
+ fn test_filter_fast_path_schema_capability() {
+ let supported = Arc::new(Schema::new(vec![
+ Field::new("primitive", DataType::UInt32, false),
+ Field::new("utf8_view", DataType::Utf8View, true),
+ Field::new("binary_view", DataType::BinaryView, true),
+ ]));
+ let coalescer = BatchCoalescer::new(supported, 100);
+ assert!(!coalescer.has_non_specialized_filter_columns);
+
+ let utf8 = Arc::new(Schema::new(vec![Field::new("utf8",
DataType::Utf8, true)]));
+ let coalescer = BatchCoalescer::new(utf8, 100);
+ assert!(coalescer.has_non_specialized_filter_columns);
+
+ let boolean = Arc::new(Schema::new(vec![Field::new(
+ "boolean",
+ DataType::Boolean,
+ true,
+ )]));
+ let coalescer = BatchCoalescer::new(boolean, 100);
+ assert!(coalescer.has_non_specialized_filter_columns);
+ }
+
#[derive(Debug, Clone, PartialEq)]
struct ExpectedLayout {
len: usize,
@@ -1685,6 +2012,10 @@ mod tests {
}
}
+ fn sparse_filter(len: usize) -> BooleanArray {
+ BooleanArray::from_iter((0..len).map(|idx| Some(idx % 8 == 0)))
+ }
+
/// Returns the named column as a StringViewArray
fn col_as_string_view<'b>(name: &str, batch: &'b RecordBatch) -> &'b
StringViewArray {
batch
@@ -1701,18 +2032,20 @@ mod tests {
let (schema, mut columns, row_count) = batch.into_parts();
for column in columns.iter_mut() {
- let Some(string_view) = column.as_string_view_opt() else {
+ if let Some(string_view) = column.as_string_view_opt() {
+ // Re-create the StringViewArray to ensure memory layout is
+ // consistent
+ let mut builder = StringViewBuilder::new();
+ for s in string_view.iter() {
+ builder.append_option(s);
+ }
+ *column = Arc::new(builder.finish());
continue;
- };
+ }
- // Re-create the StringViewArray to ensure memory layout is
- // consistent
- let mut builder = StringViewBuilder::new();
- for s in string_view.iter() {
- builder.append_option(s);
+ if let Some(binary_view) = column.as_binary_view_opt() {
+ *column =
Arc::new(BinaryViewArray::from_iter(binary_view.iter()));
}
- // Update the column with the new StringViewArray
- *column = Arc::new(builder.finish());
}
let options =
RecordBatchOptions::new().with_row_count(Some(row_count));
diff --git a/arrow-select/src/coalesce/byte_view.rs
b/arrow-select/src/coalesce/byte_view.rs
index 6062cd5e77..32f92d044e 100644
--- a/arrow-select/src/coalesce/byte_view.rs
+++ b/arrow-select/src/coalesce/byte_view.rs
@@ -16,10 +16,11 @@
// under the License.
use crate::coalesce::InProgressArray;
+use crate::filter::{FilterPredicate, FilterSelection, filter_null_mask};
use arrow_array::cast::AsArray;
use arrow_array::types::ByteViewType;
use arrow_array::{Array, ArrayRef, GenericByteViewArray};
-use arrow_buffer::{Buffer, NullBufferBuilder};
+use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, NullBufferBuilder};
use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN};
use arrow_schema::ArrowError;
use std::marker::PhantomData;
@@ -111,6 +112,61 @@ impl<B: ByteViewType> InProgressByteViewArray<B> {
self.completed.push(next_buffer.into());
}
+ fn append_views_by_filter(&mut self, views: &[u128], filter:
&FilterPredicate) {
+ let selected_count = filter.count();
+ let current_len = self.views.len();
+ self.views.reserve(selected_count);
+
+ let mut written = 0;
+
+ unsafe {
+ let mut out =
self.views.spare_capacity_mut().as_mut_ptr().cast::<u128>();
+
+ match filter.selection() {
+ FilterSelection::None => {}
+ FilterSelection::All { .. } => {
+ std::ptr::copy_nonoverlapping(views.as_ptr(), out,
selected_count);
+ written = selected_count;
+ }
+ FilterSelection::Slices(slices) => {
+ slices.for_each(|(start, end)| {
+ let len = end - start;
+
std::ptr::copy_nonoverlapping(views.as_ptr().add(start), out, len);
+ out = out.add(len);
+ written += len;
+ });
+ }
+ FilterSelection::Indices(indices) => {
+ indices.for_each(|idx| {
+ out.write(*views.get_unchecked(idx));
+ out = out.add(1);
+ written += 1;
+ });
+ }
+ }
+
+ self.views.set_len(current_len + written);
+ }
+
+ debug_assert_eq!(written, selected_count);
+ }
+
+ fn append_nulls_by_filter(
+ &mut self,
+ filter: &FilterPredicate,
+ source_nulls: Option<&NullBuffer>,
+ ) {
+ let Some((null_count, nulls)) = filter_null_mask(source_nulls, filter)
else {
+ self.nulls.append_n_non_nulls(filter.count());
+ return;
+ };
+
+ let nulls = unsafe {
+ NullBuffer::new_unchecked(BooleanBuffer::new(nulls, 0,
filter.count()), null_count)
+ };
+ self.nulls.append_buffer(&nulls);
+ }
+
/// Append views to self.views, updating the buffer index if necessary
#[inline(never)]
fn append_views_and_update_buffer_index(&mut self, views: &[u128],
buffers: &[Buffer]) {
@@ -325,7 +381,8 @@ impl<B: ByteViewType> InProgressArray for
InProgressByteViewArray<B> {
};
let buffers = s.data_buffers();
- let views = &s.views().as_ref()[offset..offset + len];
+ // SAFETY: copy_rows is called with ranges derived from the source
array.
+ let views = unsafe { s.views().as_ref().get_unchecked(offset..offset +
len) };
// If there are no data buffers in s (all inlined views), can append
the
// views/nulls and done
@@ -346,6 +403,59 @@ impl<B: ByteViewType> InProgressArray for
InProgressByteViewArray<B> {
Ok(())
}
+ fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(),
ArrowError> {
+ self.ensure_capacity();
+ let source = self.source.take().ok_or_else(|| {
+ ArrowError::InvalidArgumentError(
+ "Internal Error: InProgressByteViewArray: source not
set".to_string(),
+ )
+ })?;
+
+ let s = source.array.as_byte_view::<B>();
+
+ if !s.data_buffers().is_empty() {
+ // Restore the source taken above before returning the guard error.
+ self.source = Some(source);
+ return Err(ArrowError::InvalidArgumentError(
+ "Internal Error: InProgressByteViewArray::copy_rows_by_filter
requires inline views"
+ .to_string(),
+ ));
+ }
+
+ self.append_nulls_by_filter(filter, s.nulls());
+ self.append_views_by_filter(s.views(), filter);
+
+ self.source = Some(source);
+ Ok(())
+ }
+
+ fn copy_rows_by_filter_from(
+ &mut self,
+ source: ArrayRef,
+ filter: &FilterPredicate,
+ ) -> Result<(), ArrowError> {
+ let s = source.as_byte_view::<B>();
+ if s.data_buffers().is_empty() {
+ self.ensure_capacity();
+ self.append_nulls_by_filter(filter, s.nulls());
+ self.append_views_by_filter(s.views(), filter);
+ return Ok(());
+ }
+
+ // Match the filter kernel: filter views/nulls, but reuse data buffers.
+ let filtered = filter.filter(source.as_ref())?;
+ let filtered = filtered.as_byte_view::<B>();
+
+ self.ensure_capacity();
+ if let Some(nulls) = filtered.nulls().as_ref() {
+ self.nulls.append_buffer(nulls);
+ } else {
+ self.nulls.append_n_non_nulls(filter.count());
+ }
+ self.append_views_and_update_buffer_index(filtered.views(),
filtered.data_buffers());
+ Ok(())
+ }
+
fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
self.finish_current();
assert!(self.current.is_none());
@@ -405,6 +515,9 @@ impl BufferSource {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::filter::FilterBuilder;
+ use arrow_array::types::BinaryViewType;
+ use arrow_array::{BinaryViewArray, BooleanArray};
#[test]
fn test_buffer_source() {
@@ -444,4 +557,54 @@ mod tests {
// Can override with larger size request
assert_eq!(source.next_buffer(2_000_000).capacity(), 2_000_000);
}
+
+ #[test]
+ fn test_copy_rows_by_filter_rejects_non_inline_views() {
+ let values: Vec<Option<&[u8]>> = vec![Some(b"This value is longer than
12 bytes")];
+ let array = BinaryViewArray::from_iter(values);
+ assert!(!array.data_buffers().is_empty());
+
+ let mut in_progress =
InProgressByteViewArray::<BinaryViewType>::new(1);
+ in_progress.set_source(Some(Arc::new(array)));
+
+ let filter = BooleanArray::from(vec![true]);
+ let predicate = FilterBuilder::new(&filter).build();
+ let err = in_progress.copy_rows_by_filter(&predicate).unwrap_err();
+
+ assert!(
+ err.to_string().contains("requires inline views"),
+ "unexpected error: {err}"
+ );
+ }
+
+ #[test]
+ fn test_copy_rows_by_filter_from_reuses_non_inline_buffers() {
+ let values = (0..32)
+ .map(|i| format!("This value is longer than 12 bytes:
{i}").into_bytes())
+ .collect::<Vec<_>>();
+ let array = BinaryViewArray::from_iter(values.iter().map(|v|
Some(v.as_slice())));
+ assert!(!array.data_buffers().is_empty());
+ let source_buffer = array.data_buffers()[0].as_ptr();
+
+ let filter = BooleanArray::from((0..32).map(|i| i == 3 || i ==
29).collect::<Vec<_>>());
+ let predicate = FilterBuilder::new(&filter).build();
+
+ let mut in_progress =
InProgressByteViewArray::<BinaryViewType>::new(32);
+ in_progress
+ .copy_rows_by_filter_from(Arc::new(array), &predicate)
+ .unwrap();
+ let output = in_progress.finish().unwrap();
+ let output = output.as_binary_view();
+
+ assert_eq!(output.len(), 2);
+ assert_eq!(output.value(0), values[3].as_slice());
+ assert_eq!(output.value(1), values[29].as_slice());
+ assert!(
+ output
+ .data_buffers()
+ .iter()
+ .any(|buffer| std::ptr::addr_eq(buffer.as_ptr(),
source_buffer)),
+ "expected filtered output to reuse the source data buffer"
+ );
+ }
}
diff --git a/arrow-select/src/coalesce/generic.rs
b/arrow-select/src/coalesce/generic.rs
index 1ea57dff92..4fa64273ec 100644
--- a/arrow-select/src/coalesce/generic.rs
+++ b/arrow-select/src/coalesce/generic.rs
@@ -17,6 +17,7 @@
use super::InProgressArray;
use crate::concat::concat;
+use crate::filter::FilterPredicate;
use arrow_array::ArrayRef;
use arrow_schema::ArrowError;
@@ -60,6 +61,16 @@ impl InProgressArray for GenericInProgressArray {
Ok(())
}
+ fn copy_rows_by_filter_from(
+ &mut self,
+ source: ArrayRef,
+ filter: &FilterPredicate,
+ ) -> Result<(), ArrowError> {
+ let array = filter.filter(source.as_ref())?;
+ self.buffered_arrays.push(array);
+ Ok(())
+ }
+
fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
// Concatenate all buffered arrays into a single array, which uses 2x
// peak memory
diff --git a/arrow-select/src/coalesce/primitive.rs
b/arrow-select/src/coalesce/primitive.rs
index a7f2fb32ce..ac831fe089 100644
--- a/arrow-select/src/coalesce/primitive.rs
+++ b/arrow-select/src/coalesce/primitive.rs
@@ -16,9 +16,12 @@
// under the License.
use crate::coalesce::InProgressArray;
+use crate::filter::{
+ FilterIndices, FilterPredicate, FilterSelection, FilterSlices,
filter_null_mask,
+};
use arrow_array::cast::AsArray;
use arrow_array::{Array, ArrayRef, ArrowPrimitiveType, PrimitiveArray};
-use arrow_buffer::{NullBufferBuilder, ScalarBuffer};
+use arrow_buffer::{BooleanBuffer, NullBuffer, NullBufferBuilder, ScalarBuffer};
use arrow_schema::{ArrowError, DataType};
use std::fmt::Debug;
use std::sync::Arc;
@@ -59,6 +62,95 @@ impl<T: ArrowPrimitiveType> InProgressPrimitiveArray<T> {
self.current.reserve(self.batch_size);
}
}
+
+ fn append_values_by_indices(
+ current: &mut Vec<T::Native>,
+ values: &[T::Native],
+ indices: FilterIndices<'_>,
+ selected_count: usize,
+ ) {
+ let current_len = current.len();
+ let mut written = 0;
+
+ unsafe {
+ let mut out = current
+ .spare_capacity_mut()
+ .as_mut_ptr()
+ .cast::<T::Native>();
+
+ indices.for_each(|idx| {
+ // SAFETY: indices are derived from the filter predicate for
this source.
+ out.write(*values.get_unchecked(idx));
+ out = out.add(1);
+ written += 1;
+ });
+
+ current.set_len(current_len + written);
+ }
+
+ debug_assert_eq!(written, selected_count);
+ }
+
+ fn append_values_by_slices(
+ current: &mut Vec<T::Native>,
+ values: &[T::Native],
+ slices: FilterSlices<'_>,
+ selected_count: usize,
+ ) {
+ let current_len = current.len();
+ let mut written = 0;
+
+ unsafe {
+ let mut out = current
+ .spare_capacity_mut()
+ .as_mut_ptr()
+ .cast::<T::Native>();
+
+ slices.for_each(|(start, end)| {
+ let len = end - start;
+ // SAFETY: slices are derived from the filter predicate for
this source.
+ std::ptr::copy_nonoverlapping(values.as_ptr().add(start), out,
len);
+ out = out.add(len);
+ written += len;
+ });
+
+ current.set_len(current_len + written);
+ }
+
+ debug_assert_eq!(written, selected_count);
+ }
+}
+
+#[inline]
+fn primitive_source<T: ArrowPrimitiveType>(
+ source: &Option<ArrayRef>,
+) -> Result<&PrimitiveArray<T>, ArrowError> {
+ Ok(source
+ .as_ref()
+ .ok_or_else(|| {
+ ArrowError::InvalidArgumentError(
+ "Internal Error: InProgressPrimitiveArray: source not
set".to_string(),
+ )
+ })?
+ .as_primitive::<T>())
+}
+
+fn append_filtered_nulls(
+ nulls: &mut NullBufferBuilder,
+ source_nulls: Option<&NullBuffer>,
+ filter: &FilterPredicate,
+) {
+ if let Some((null_count, filtered_nulls)) = filter_null_mask(source_nulls,
filter) {
+ let filtered_nulls = unsafe {
+ NullBuffer::new_unchecked(
+ BooleanBuffer::new(filtered_nulls, 0, filter.count()),
+ null_count,
+ )
+ };
+ nulls.append_buffer(&filtered_nulls);
+ } else {
+ nulls.append_n_non_nulls(filter.count());
+ }
}
impl<T: ArrowPrimitiveType + Debug> InProgressArray for
InProgressPrimitiveArray<T> {
@@ -69,15 +161,7 @@ impl<T: ArrowPrimitiveType + Debug> InProgressArray for
InProgressPrimitiveArray
fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(),
ArrowError> {
self.ensure_capacity();
- let s = self
- .source
- .as_ref()
- .ok_or_else(|| {
- ArrowError::InvalidArgumentError(
- "Internal Error: InProgressPrimitiveArray: source not
set".to_string(),
- )
- })?
- .as_primitive::<T>();
+ let s = primitive_source::<T>(&self.source)?;
// add nulls if necessary
if let Some(nulls) = s.nulls().as_ref() {
@@ -88,12 +172,49 @@ impl<T: ArrowPrimitiveType + Debug> InProgressArray for
InProgressPrimitiveArray
};
// Copy the values
+ let values = s.values();
+ // SAFETY: copy_rows is called with ranges derived from the source
array.
self.current
- .extend_from_slice(&s.values()[offset..offset + len]);
+ .extend_from_slice(unsafe { values.get_unchecked(offset..offset +
len) });
Ok(())
}
+ fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(),
ArrowError> {
+ match filter.selection() {
+ FilterSelection::Indices(indices) => {
+ self.ensure_capacity();
+ let s = primitive_source::<T>(&self.source)?;
+
+ append_filtered_nulls(&mut self.nulls, s.nulls(), filter);
+ self.current.reserve(filter.count());
+ Self::append_values_by_indices(
+ &mut self.current,
+ s.values(),
+ indices,
+ filter.count(),
+ );
+ Ok(())
+ }
+ FilterSelection::Slices(slices) => {
+ self.ensure_capacity();
+ let s = primitive_source::<T>(&self.source)?;
+
+ append_filtered_nulls(&mut self.nulls, s.nulls(), filter);
+ self.current.reserve(filter.count());
+ Self::append_values_by_slices(
+ &mut self.current,
+ s.values(),
+ slices,
+ filter.count(),
+ );
+ Ok(())
+ }
+ // Other selection shapes reuse the generic copy_rows path.
+ selection => self.copy_rows_by_selection(selection),
+ }
+ }
+
fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
// take and reset the current values and nulls
let values = std::mem::take(&mut self.current);
@@ -106,3 +227,82 @@ impl<T: ArrowPrimitiveType + Debug> InProgressArray for
InProgressPrimitiveArray
Ok(Arc::new(array))
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::filter::FilterBuilder;
+ use arrow_array::types::Int32Type;
+ use arrow_array::{BooleanArray, Int32Array};
+
+ #[test]
+ fn test_copy_rows_by_filter_index_iterator() {
+ let source =
+ Int32Array::from_iter((0..21).map(|idx| if idx % 5 == 0 { None }
else { Some(idx) }));
+ let filter = BooleanArray::from_iter(
+ (0..21).map(|idx| Some(matches!(idx, 0 | 1 | 2 | 3 | 5 | 8 | 13))),
+ );
+ let predicate = FilterBuilder::new(&filter).build();
+ let FilterSelection::Indices(indices) = predicate.selection() else {
+ panic!("expected index iterator selection");
+ };
+ let mut selected_indices = Vec::new();
+ indices.for_each(|idx| selected_indices.push(idx));
+ assert_eq!(selected_indices, vec![0, 1, 2, 3, 5, 8, 13]);
+
+ let mut in_progress = InProgressPrimitiveArray::<Int32Type>::new(7,
DataType::Int32);
+ in_progress.set_source(Some(Arc::new(source)));
+ in_progress.copy_rows_by_filter(&predicate).unwrap();
+
+ let result = in_progress.finish().unwrap();
+ let result = result.as_primitive::<Int32Type>();
+ let expected = Int32Array::from(vec![
+ None,
+ Some(1),
+ Some(2),
+ Some(3),
+ None,
+ Some(8),
+ Some(13),
+ ]);
+ assert_eq!(result, &expected);
+ }
+
+ #[test]
+ fn test_copy_rows_by_filter_slice_iterator() {
+ let source =
+ Int32Array::from_iter((0..16).map(|idx| if idx % 5 == 0 { None }
else { Some(idx) }));
+ let filter = BooleanArray::from_iter((0..16).map(|idx|
Some(!matches!(idx, 3 | 9))));
+ let predicate = FilterBuilder::new(&filter).build();
+ let FilterSelection::Slices(slices) = predicate.selection() else {
+ panic!("expected slice iterator selection");
+ };
+ let mut selected_slices = Vec::new();
+ slices.for_each(|slice| selected_slices.push(slice));
+ assert_eq!(selected_slices, vec![(0, 3), (4, 9), (10, 16)]);
+
+ let mut in_progress = InProgressPrimitiveArray::<Int32Type>::new(14,
DataType::Int32);
+ in_progress.set_source(Some(Arc::new(source)));
+ in_progress.copy_rows_by_filter(&predicate).unwrap();
+
+ let result = in_progress.finish().unwrap();
+ let result = result.as_primitive::<Int32Type>();
+ let expected = Int32Array::from(vec![
+ None,
+ Some(1),
+ Some(2),
+ Some(4),
+ None,
+ Some(6),
+ Some(7),
+ Some(8),
+ None,
+ Some(11),
+ Some(12),
+ Some(13),
+ Some(14),
+ None,
+ ]);
+ assert_eq!(result, &expected);
+ }
+}
diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs
index fcbce82d5d..b1f3a21a3e 100644
--- a/arrow-select/src/filter.rs
+++ b/arrow-select/src/filter.rs
@@ -81,13 +81,13 @@ impl Iterator for SlicesIterator<'_> {
///
/// This provides the best performance on most predicates, apart from those
which keep
/// large runs and therefore favour [`SlicesIterator`]
-struct IndexIterator<'a> {
+pub(crate) struct IndexIterator<'a> {
remaining: usize,
iter: BitIndexIterator<'a>,
}
impl<'a> IndexIterator<'a> {
- fn new(filter: &'a BooleanArray, remaining: usize) -> Self {
+ pub(crate) fn new(filter: &'a BooleanArray, remaining: usize) -> Self {
assert_eq!(filter.null_count(), 0);
let iter = filter.values().set_indices();
Self { remaining, iter }
@@ -137,11 +137,6 @@ impl Iterator for IndexIterator<'_> {
}
}
-/// Counts the number of set bits in `filter`
-fn filter_count(filter: &BooleanArray) -> usize {
- filter.values().count_set_bits()
-}
-
/// Convert all null values in `BooleanArray` to `false`
///
/// This is useful for filter-like operations which select only `true`
@@ -259,12 +254,15 @@ pub struct FilterBuilder {
impl FilterBuilder {
/// Create a new [`FilterBuilder`] that can be used to construct a
[`FilterPredicate`]
pub fn new(filter: &BooleanArray) -> Self {
+ Self::new_with_count(filter, filter.true_count())
+ }
+
+ pub(crate) fn new_with_count(filter: &BooleanArray, count: usize) -> Self {
let filter = match filter.null_count() {
0 => filter.clone(),
_ => prep_null_mask_filter(filter),
};
- let count = filter_count(&filter);
let strategy = IterationStrategy::default_strategy(filter.len(),
count);
Self {
@@ -366,6 +364,66 @@ impl IterationStrategy {
}
}
+/// Borrowed description of which rows a [`FilterPredicate`] selects.
+pub(crate) enum FilterSelection<'a> {
+ None,
+ All { len: usize },
+ Slices(FilterSlices<'a>),
+ Indices(FilterIndices<'a>),
+}
+
+pub(crate) type FilterSlices<'a> =
+ FilterIterator<std::iter::Copied<std::slice::Iter<'a, (usize, usize)>>,
SlicesIterator<'a>>;
+
+pub(crate) type FilterIndices<'a> =
+ FilterIterator<std::iter::Copied<std::slice::Iter<'a, usize>>,
IndexIterator<'a>>;
+
+/// Holds either materialized rows or a lazy iterator.
+///
+/// This does not implement [`Iterator`] on purpose. Callers use
+/// [`Self::for_each`] or [`Self::try_for_each`] so the enum is matched once
+/// before the loop, not once per row in `next`.
+pub(crate) enum FilterIterator<M, I> {
+ Materialized(M),
+ Lazy(I),
+}
+
+impl<M, I> FilterIterator<M, I>
+where
+ M: Iterator,
+ I: Iterator<Item = M::Item>,
+{
+ pub(crate) fn for_each<F>(self, f: F)
+ where
+ F: FnMut(M::Item),
+ {
+ match self {
+ Self::Materialized(iter) => iter.for_each(f),
+ Self::Lazy(iter) => iter.for_each(f),
+ }
+ }
+
+ pub(crate) fn try_for_each<F, E>(self, mut f: F) -> Result<(), E>
+ where
+ F: FnMut(M::Item) -> Result<(), E>,
+ {
+ match self {
+ Self::Materialized(iter) => {
+ for item in iter {
+ f(item)?;
+ }
+ }
+ Self::Lazy(iter) => {
+ for item in iter {
+ f(item)?;
+ }
+ }
+ }
+
+ Ok(())
+ }
+}
+
/// A filtering predicate that can be applied to an [`Array`]
#[derive(Debug)]
pub struct FilterPredicate {
@@ -410,6 +468,25 @@ impl FilterPredicate {
self.count
}
+ pub(crate) fn selection(&self) -> FilterSelection<'_> {
+ match &self.strategy {
+ IterationStrategy::None => FilterSelection::None,
+ IterationStrategy::All => FilterSelection::All { len: self.count },
+ IterationStrategy::Slices(slices) => {
+
FilterSelection::Slices(FilterIterator::Materialized(slices.iter().copied()))
+ }
+ IterationStrategy::SlicesIterator => {
+
FilterSelection::Slices(FilterIterator::Lazy(SlicesIterator::new(&self.filter)))
+ }
+ IterationStrategy::Indices(indices) => {
+
FilterSelection::Indices(FilterIterator::Materialized(indices.iter().copied()))
+ }
+ IterationStrategy::IndexIterator =>
FilterSelection::Indices(FilterIterator::Lazy(
+ IndexIterator::new(&self.filter, self.count),
+ )),
+ }
+ }
+
/// Filters the given `nulls` buffer using this predicate.
///
/// Returns `None` when there is nothing to track in the output, either
@@ -575,7 +652,7 @@ where
/// `Some((null_count, null_buffer))` where `null_count` is the number of nulls
/// in the filtered output, and `null_buffer` is the filtered null buffer
///
-fn filter_null_mask(
+pub(crate) fn filter_null_mask(
nulls: Option<&NullBuffer>,
predicate: &FilterPredicate,
) -> Option<(usize, Buffer)> {
@@ -649,7 +726,10 @@ fn filter_boolean(array: &BooleanArray, predicate:
&FilterPredicate) -> BooleanA
}
#[inline(never)]
-fn filter_native<T: ArrowNativeType>(values: &[T], predicate:
&FilterPredicate) -> Buffer {
+pub(crate) fn filter_native<T: ArrowNativeType>(
+ values: &[T],
+ predicate: &FilterPredicate,
+) -> Buffer {
assert!(values.len() >= predicate.filter.len());
match &predicate.strategy {
@@ -1571,7 +1651,7 @@ mod tests {
fn test_slice_iterator_bits() {
let filter_values = (0..64).map(|i| i == 1).collect::<Vec<bool>>();
let filter = BooleanArray::from(filter_values);
- let filter_count = filter_count(&filter);
+ let filter_count = filter.true_count();
let iter = SlicesIterator::new(&filter);
let chunks = iter.collect::<Vec<_>>();
@@ -1584,7 +1664,7 @@ mod tests {
fn test_slice_iterator_bits1() {
let filter_values = (0..64).map(|i| i != 1).collect::<Vec<bool>>();
let filter = BooleanArray::from(filter_values);
- let filter_count = filter_count(&filter);
+ let filter_count = filter.true_count();
let iter = SlicesIterator::new(&filter);
let chunks = iter.collect::<Vec<_>>();
@@ -1597,7 +1677,7 @@ mod tests {
fn test_slice_iterator_chunk_and_bits() {
let filter_values = (0..130).map(|i| i % 62 !=
0).collect::<Vec<bool>>();
let filter = BooleanArray::from(filter_values);
- let filter_count = filter_count(&filter);
+ let filter_count = filter.true_count();
let iter = SlicesIterator::new(&filter);
let chunks = iter.collect::<Vec<_>>();
@@ -1606,6 +1686,43 @@ mod tests {
assert_eq!(filter_count, 61 + 61 + 5);
}
+ #[test]
+ fn test_filter_selection_iterators() {
+ let slices = [(0, 2), (4, 5)];
+ let mut ranges = Vec::new();
+ let selection: FilterSlices<'_> =
FilterIterator::Materialized(slices.iter().copied());
+ selection.for_each(|range| ranges.push(range));
+ assert_eq!(ranges, slices);
+
+ let filter = BooleanArray::from(vec![true, true, false, false, true]);
+ let mut ranges = Vec::new();
+ let selection: FilterSlices<'_> =
FilterIterator::Lazy(SlicesIterator::new(&filter));
+ selection
+ .try_for_each(|range| {
+ ranges.push(range);
+ Ok::<(), ArrowError>(())
+ })
+ .unwrap();
+ assert_eq!(ranges, vec![(0, 2), (4, 5)]);
+
+ let indices = [1, 3, 5];
+ let mut selected = Vec::new();
+ let selection: FilterIndices<'_> =
FilterIterator::Materialized(indices.iter().copied());
+ selection.for_each(|idx| selected.push(idx));
+ assert_eq!(selected, indices);
+
+ let filter = BooleanArray::from(vec![false, true, false, true]);
+ let mut selected = Vec::new();
+ let selection: FilterIndices<'_> =
FilterIterator::Lazy(IndexIterator::new(&filter, 2));
+ selection
+ .try_for_each(|idx| {
+ selected.push(idx);
+ Ok::<(), ArrowError>(())
+ })
+ .unwrap();
+ assert_eq!(selected, vec![1, 3]);
+ }
+
#[test]
fn test_null_mask() {
let a = Int64Array::from(vec![Some(1), Some(2), None]);
@@ -1690,7 +1807,7 @@ mod tests {
.flat_map(|(start, end)| start..end)
.collect();
- let count = filter_count(&filter);
+ let count = filter.true_count();
let index_bits: Vec<_> = IndexIterator::new(&filter, count).collect();
let expected_bits: Vec<_> = bools