This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new e106c15340 arrow-select: Optimize 
`BatchCoalescer::push_batch_with_filter` for low selectivity filters and 
inlined Utf8View/BinaryView (#9755)
e106c15340 is described below

commit e106c1534095cbfe1201a770c8ccca5252fdbb88
Author: ClSlaid <[email protected]>
AuthorDate: Thu Jun 11 02:35:33 2026 +0800

    arrow-select: Optimize `BatchCoalescer::push_batch_with_filter` for low 
selectivity filters and inlined Utf8View/BinaryView (#9755)
    
    ## Summary
    - fuse the sparse inline `BinaryView` filter and coalescing paths so
    primitive columns and inline views can be appended directly without
    materialising an intermediate filtered `RecordBatch`
    - reuse optimised filter indices and null-mask handling for coalescing,
    while preserving the existing fallback paths for dense and non-inline
    `BinaryView` inputs
    - add focused tests and benchmarks for single-column and mixed
    `BinaryView` filter cases related to `#9143`
    
    ## Verification
    - `cargo test -p arrow-select coalesce --lib`
    - `cargo clippy -p arrow-select --lib --tests -- -D warnings`
    - `cargo clippy -p arrow --bench coalesce_kernels --features test_utils
    -- -D warnings`
    - `cargo bench -p arrow --bench coalesce_kernels --features test_utils
    -- --noplot single_binaryview`
    - `cargo bench -p arrow --bench coalesce_kernels --features test_utils
    -- --noplot mixed_binaryview`
    
    ## Benchmark Results
    Measured against a clean `origin/main` worktree with the same
    `BinaryView` benchmark additions. The figures below compare
    representative median times from the baseline worktree and this branch.
    
    ### Mixed primitive + BinaryView
    - `mixed_binaryview (max_string_len=8), 8192, nulls: 0, selectivity:
    0.001`: `23.16 ms` -> `8.51 ms`
    - `mixed_binaryview (max_string_len=8), 8192, nulls: 0, selectivity:
    0.01`: `2.37 ms` -> `1.31 ms`
    - `mixed_binaryview (max_string_len=8), 8192, nulls: 0.1, selectivity:
    0.001`: `31.70 ms` -> `14.33 ms`
    - `mixed_binaryview (max_string_len=8), 8192, nulls: 0.1, selectivity:
    0.01`: `3.92 ms` -> `2.44 ms`
    
    ### Single BinaryView
    - `single_binaryview, 8192, nulls: 0, selectivity: 0.01`: `4.86 ms` ->
    `4.90 ms` (roughly flat, slightly slower)
    - `single_binaryview (max_string_len=8), 8192, nulls: 0, selectivity:
    0.001`: `34.72 ms` -> `19.33 ms`
    - `single_binaryview (max_string_len=8), 8192, nulls: 0, selectivity:
    0.01`: `3.46 ms` -> `2.03 ms`
    - `single_binaryview (max_string_len=8), 8192, nulls: 0.1, selectivity:
    0.01`: `5.93 ms` -> `3.97 ms`
    - `single_binaryview (max_string_len=8), 8192, nulls: 0, selectivity:
    0.8`: `597 µs` -> `619 µs` (regression)
    - `single_binaryview (max_string_len=8), 8192, nulls: 0.1, selectivity:
    0.8`: `1.78 ms` -> `1.79 ms` (roughly flat, slightly slower)
    
    In short, this change substantially improves the mixed primitive +
    inline `BinaryView` path that motivated `#9143`, while the single-column
    `BinaryView` benchmarks still show trade-offs: sparse inline cases
    improve, but dense inline cases are slightly slower and the non-inline
    single-column path is effectively unchanged.
    
    Closes #9143.
    
    ---------
    
    Signed-off-by: cl <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 arrow-select/src/coalesce.rs           | 363 +++++++++++++++++++++++++++++++--
 arrow-select/src/coalesce/byte_view.rs | 167 ++++++++++++++-
 arrow-select/src/coalesce/generic.rs   |  11 +
 arrow-select/src/coalesce/primitive.rs | 222 +++++++++++++++++++-
 arrow-select/src/filter.rs             | 145 +++++++++++--
 5 files changed, 866 insertions(+), 42 deletions(-)

diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs
index 8fe88fb8c3..0d5199941f 100644
--- a/arrow-select/src/coalesce.rs
+++ b/arrow-select/src/coalesce.rs
@@ -20,7 +20,7 @@
 //!
 //! [`filter`]: crate::filter::filter
 //! [`take`]: crate::take::take
-use crate::filter::filter_record_batch;
+use crate::filter::{FilterBuilder, FilterPredicate, FilterSelection};
 use crate::take::take_record_batch;
 use arrow_array::types::{BinaryViewType, StringViewType};
 use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, 
downcast_primitive};
@@ -38,6 +38,21 @@ use byte_view::InProgressByteViewArray;
 use generic::GenericInProgressArray;
 use primitive::InProgressPrimitiveArray;
 
+fn has_sparse_filter_copy(data_type: &DataType) -> bool {
+    data_type.is_primitive() || matches!(data_type, DataType::Utf8View | 
DataType::BinaryView)
+}
+
+/// Maximum selected row fraction for the fused sparse-filter copy path.
+///
+/// Shared benchmark results show this path helps low-selectivity filters, but
+/// can regress once the filter becomes denser. Keep this as a cheap integer
+/// threshold on the hot path: `selected_count <= filter_len / 16`.
+const SPARSE_FILTER_COPY_MAX_SELECTIVITY_DENOMINATOR: usize = 16;
+
+fn should_use_sparse_filter_copy(filter_len: usize, selected_count: usize) -> 
bool {
+    selected_count <= filter_len / 
SPARSE_FILTER_COPY_MAX_SELECTIVITY_DENOMINATOR
+}
+
 /// Concatenate multiple [`RecordBatch`]es
 ///
 /// Implements the common pattern of incrementally creating output
@@ -139,6 +154,8 @@ pub struct BatchCoalescer {
     target_batch_size: usize,
     /// In-progress arrays
     in_progress_arrays: Vec<Box<dyn InProgressArray>>,
+    /// True if some column still needs the materialized filter path.
+    has_non_specialized_filter_columns: bool,
     /// Buffered row count. Always less than `batch_size`
     buffered_rows: usize,
     /// Completed batches
@@ -156,6 +173,10 @@ impl BatchCoalescer {
     ///   Typical values are `4096` or `8192` rows.
     ///
     pub fn new(schema: SchemaRef, target_batch_size: usize) -> Self {
+        let has_non_specialized_filter_columns = schema
+            .fields()
+            .iter()
+            .any(|field| !has_sparse_filter_copy(field.data_type()));
         let in_progress_arrays = schema
             .fields()
             .iter()
@@ -166,6 +187,7 @@ impl BatchCoalescer {
             schema,
             target_batch_size,
             in_progress_arrays,
+            has_non_specialized_filter_columns,
             // We will for sure store at least one completed batch
             completed: VecDeque::with_capacity(1),
             buffered_rows: 0,
@@ -212,7 +234,7 @@ impl BatchCoalescer {
     /// Push a batch into the Coalescer after applying a filter
     ///
     /// This is semantically equivalent of calling [`Self::push_batch`]
-    /// with the results from  [`filter_record_batch`]
+    /// with the results from [`crate::filter::filter_record_batch`]
     ///
     /// # Example
     /// ```
@@ -238,10 +260,7 @@ impl BatchCoalescer {
         batch: RecordBatch,
         filter: &BooleanArray,
     ) -> Result<(), ArrowError> {
-        // TODO: optimize this to avoid materializing (copying the results
-        // of filter to a new batch)
-        let filtered_batch = filter_record_batch(&batch, filter)?;
-        self.push_batch(filtered_batch)
+        self.push_batch_with_filtered_columns(batch, filter)
     }
 
     /// Push a batch into the Coalescer after applying a set of indices
@@ -566,6 +585,87 @@ impl BatchCoalescer {
     }
 }
 
+impl BatchCoalescer {
+    fn filter_predicate_for_batch(
+        batch: &RecordBatch,
+        filter: &BooleanArray,
+        selected_count: usize,
+    ) -> FilterPredicate {
+        let mut filter_builder = FilterBuilder::new_with_count(filter, 
selected_count);
+        if batch.num_columns() > 1
+            || (batch.num_columns() > 0
+                && 
FilterBuilder::is_optimize_beneficial(batch.schema_ref().field(0).data_type()))
+        {
+            filter_builder = filter_builder.optimize();
+        }
+        filter_builder.build()
+    }
+
+    fn push_batch_with_filtered_columns(
+        &mut self,
+        batch: RecordBatch,
+        filter: &BooleanArray,
+    ) -> Result<(), ArrowError> {
+        let filter_len = filter.len();
+        let batch_num_rows = batch.num_rows();
+        let batch_num_columns = batch.num_columns();
+
+        if filter_len > batch_num_rows {
+            return Err(ArrowError::InvalidArgumentError(format!(
+                "Filter predicate of length {} is larger than target array of 
length {}",
+                filter_len, batch_num_rows
+            )));
+        }
+
+        let selected_count = filter.true_count();
+        if selected_count == 0 {
+            return Ok(());
+        }
+
+        if selected_count == batch_num_rows && filter_len == batch_num_rows {
+            return self.push_batch(batch);
+        }
+
+        if batch_num_columns != self.in_progress_arrays.len() {
+            return Err(ArrowError::InvalidArgumentError(format!(
+                "Batch has {} columns but BatchCoalescer expects {}",
+                batch_num_columns,
+                self.in_progress_arrays.len()
+            )));
+        }
+
+        let exceeds_coalesce_limit = self
+            .biggest_coalesce_batch_size
+            .is_some_and(|limit| selected_count > limit);
+        let does_not_fit_buffer = selected_count > self.target_batch_size - 
self.buffered_rows;
+        let should_materialize_filter = exceeds_coalesce_limit
+            || self.has_non_specialized_filter_columns
+            || does_not_fit_buffer
+            || !should_use_sparse_filter_copy(filter_len, selected_count);
+
+        if should_materialize_filter {
+            // Use materialized filtering when sparse per-column copying is 
unavailable.
+            let predicate = Self::filter_predicate_for_batch(&batch, filter, 
selected_count);
+            let filtered_batch = predicate.filter_record_batch(&batch)?;
+            return self.push_batch(filtered_batch);
+        }
+
+        let predicate = Self::filter_predicate_for_batch(&batch, filter, 
selected_count);
+        let (_schema, arrays, _num_rows) = batch.into_parts();
+
+        for (in_progress, array) in 
self.in_progress_arrays.iter_mut().zip(arrays) {
+            in_progress.copy_rows_by_filter_from(array, &predicate)?;
+        }
+
+        self.buffered_rows += selected_count;
+        if self.buffered_rows >= self.target_batch_size {
+            self.finish_buffered_batch()?;
+        }
+
+        Ok(())
+    }
+}
+
 /// Return a new `InProgressArray` for the given data type
 fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> 
Box<dyn InProgressArray> {
     macro_rules! instantiate_primitive {
@@ -611,6 +711,35 @@ trait InProgressArray: std::fmt::Debug + Send + Sync {
     /// Return an error if the source array is not set
     fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), 
ArrowError>;
 
+    /// Copy rows selected by `filter` from the current source array.
+    fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), 
ArrowError> {
+        self.copy_rows_by_selection(filter.selection())
+    }
+
+    /// Copy rows selected by `filter` from `source`.
+    fn copy_rows_by_filter_from(
+        &mut self,
+        source: ArrayRef,
+        filter: &FilterPredicate,
+    ) -> Result<(), ArrowError> {
+        self.set_source(Some(source));
+        let result = self.copy_rows_by_filter(filter);
+        self.set_source(None);
+        result
+    }
+
+    /// Copy rows described by a [`FilterSelection`] from the current source 
array.
+    fn copy_rows_by_selection(&mut self, selection: FilterSelection<'_>) -> 
Result<(), ArrowError> {
+        match selection {
+            FilterSelection::None => Ok(()),
+            FilterSelection::All { len } => self.copy_rows(0, len),
+            FilterSelection::Slices(slices) => {
+                slices.try_for_each(|(start, end)| self.copy_rows(start, end - 
start))
+            }
+            FilterSelection::Indices(indices) => indices.try_for_each(|idx| 
self.copy_rows(idx, 1)),
+        }
+    }
+
     /// Finish the currently in-progress array and return it as an `ArrayRef`
     fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
 }
@@ -619,6 +748,7 @@ trait InProgressArray: std::fmt::Debug + Send + Sync {
 mod tests {
     use super::*;
     use crate::concat::concat_batches;
+    use crate::filter::filter_record_batch;
     use arrow_array::builder::StringViewBuilder;
     use arrow_array::cast::AsArray;
     use arrow_array::types::Int32Type;
@@ -665,6 +795,14 @@ mod tests {
             .run();
     }
 
+    #[test]
+    fn test_sparse_filter_copy_threshold() {
+        assert!(should_use_sparse_filter_copy(8192, 8));
+        assert!(should_use_sparse_filter_copy(8192, 81));
+        assert!(!should_use_sparse_filter_copy(8192, 819));
+        assert!(!should_use_sparse_filter_copy(8192, 6553));
+    }
+
     #[test]
     fn test_single_large_batch_greater_than_target() {
         // test a single large batch
@@ -1197,6 +1335,195 @@ mod tests {
             .run();
     }
 
+    #[test]
+    fn test_binary_view_filtered() {
+        let values: Vec<Option<&[u8]>> = vec![
+            Some(b"foo"),
+            None,
+            Some(b"A longer string that is more than 12 bytes"),
+        ];
+
+        let binary_view =
+            
BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
+        let batch =
+            RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as 
ArrayRef)]).unwrap();
+        let filter = sparse_filter(1000);
+
+        Test::new("coalesce_binary_view_filtered")
+            .with_batch(batch.clone())
+            .with_filter(filter.clone())
+            .with_batch(batch)
+            .with_filter(filter)
+            .with_batch_size(256)
+            .with_expected_output_sizes(vec![250])
+            .run();
+    }
+
+    #[test]
+    fn test_binary_view_filtered_inline() {
+        let values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None, 
Some(b"barbaz")];
+
+        let binary_view =
+            
BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
+        let batch =
+            RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as 
ArrayRef)]).unwrap();
+        let filter = sparse_filter(1000);
+
+        Test::new("coalesce_binary_view_filtered_inline")
+            .with_batch(batch.clone())
+            .with_filter(filter.clone())
+            .with_batch(batch)
+            .with_filter(filter)
+            .with_batch_size(300)
+            .with_expected_output_sizes(vec![250])
+            .run();
+    }
+
+    #[test]
+    fn test_string_view_filtered_inline() {
+        let values: Vec<Option<&str>> = vec![Some("foo"), None, 
Some("barbaz")];
+
+        let string_view =
+            
StringViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
+        let batch =
+            RecordBatch::try_from_iter(vec![("c0", Arc::new(string_view) as 
ArrayRef)]).unwrap();
+        let filter = sparse_filter(1000);
+
+        Test::new("coalesce_string_view_filtered_inline")
+            .with_batch(batch.clone())
+            .with_filter(filter.clone())
+            .with_batch(batch)
+            .with_filter(filter)
+            .with_batch_size(300)
+            .with_expected_output_sizes(vec![250])
+            .run();
+    }
+
+    #[test]
+    fn test_mixed_inline_binary_view_filtered() {
+        let int_values =
+            Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None } 
else { Some(v) }));
+        let float_values = 
arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64)));
+        let binary_values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None, 
Some(b"barbaz")];
+        let binary_view = BinaryViewArray::from_iter(
+            std::iter::repeat(binary_values.iter()).flatten().take(1000),
+        );
+
+        let batch = RecordBatch::try_from_iter(vec![
+            ("i", Arc::new(int_values) as ArrayRef),
+            ("f", Arc::new(float_values) as ArrayRef),
+            ("b", Arc::new(binary_view) as ArrayRef),
+        ])
+        .unwrap();
+
+        let filter = sparse_filter(1000);
+
+        Test::new("coalesce_mixed_inline_binary_view_filtered")
+            .with_batch(batch.clone())
+            .with_filter(filter.clone())
+            .with_batch(batch)
+            .with_filter(filter)
+            .with_batch_size(300)
+            .with_expected_output_sizes(vec![250])
+            .run();
+    }
+
+    #[test]
+    fn test_mixed_inline_string_view_filtered() {
+        let int_values =
+            Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None } 
else { Some(v) }));
+        let float_values = 
arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64)));
+        let string_values: Vec<Option<&str>> = vec![Some("foo"), None, 
Some("barbaz")];
+        let string_view = StringViewArray::from_iter(
+            std::iter::repeat(string_values.iter()).flatten().take(1000),
+        );
+
+        let batch = RecordBatch::try_from_iter(vec![
+            ("i", Arc::new(int_values) as ArrayRef),
+            ("f", Arc::new(float_values) as ArrayRef),
+            ("s", Arc::new(string_view) as ArrayRef),
+        ])
+        .unwrap();
+
+        let filter = sparse_filter(1000);
+
+        Test::new("coalesce_mixed_inline_string_view_filtered")
+            .with_batch(batch.clone())
+            .with_filter(filter.clone())
+            .with_batch(batch)
+            .with_filter(filter)
+            .with_batch_size(300)
+            .with_expected_output_sizes(vec![250])
+            .run();
+    }
+
+    #[test]
+    fn test_mixed_boolean_inline_string_view_filtered() {
+        let bool_values = BooleanArray::from_iter((0..1000).map(|v| Some(v % 3 
== 0)));
+        let string_values: Vec<Option<&str>> = vec![Some("foo"), None, 
Some("barbaz")];
+        let string_view = StringViewArray::from_iter(
+            std::iter::repeat(string_values.iter()).flatten().take(1000),
+        );
+
+        let batch = RecordBatch::try_from_iter(vec![
+            ("b", Arc::new(bool_values) as ArrayRef),
+            ("s", Arc::new(string_view) as ArrayRef),
+        ])
+        .unwrap();
+
+        let filter = sparse_filter(1000);
+
+        Test::new("coalesce_mixed_boolean_inline_string_view_filtered")
+            .with_batch(batch.clone())
+            .with_filter(filter.clone())
+            .with_batch(batch)
+            .with_filter(filter)
+            .with_batch_size(300)
+            .with_expected_output_sizes(vec![250])
+            .run();
+    }
+
+    #[test]
+    fn test_inline_filter_rejects_filter_longer_than_batch() {
+        let values: Vec<Option<&[u8]>> = vec![Some(b"foo"), Some(b"bar")];
+        let binary_view = BinaryViewArray::from_iter(values);
+        let batch =
+            RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as 
ArrayRef)]).unwrap();
+        let filter = BooleanArray::from(vec![true, false, true]);
+
+        let mut coalescer = BatchCoalescer::new(batch.schema(), 100);
+        let result = coalescer.push_batch_with_filter(batch, &filter);
+        assert!(result.is_err());
+        let err = result.unwrap_err().to_string();
+        assert!(
+            err.contains("Filter predicate of length 3 is larger than target 
array of length 2"),
+            "unexpected error: {err}"
+        );
+    }
+
+    #[test]
+    fn test_filter_fast_path_schema_capability() {
+        let supported = Arc::new(Schema::new(vec![
+            Field::new("primitive", DataType::UInt32, false),
+            Field::new("utf8_view", DataType::Utf8View, true),
+            Field::new("binary_view", DataType::BinaryView, true),
+        ]));
+        let coalescer = BatchCoalescer::new(supported, 100);
+        assert!(!coalescer.has_non_specialized_filter_columns);
+
+        let utf8 = Arc::new(Schema::new(vec![Field::new("utf8", 
DataType::Utf8, true)]));
+        let coalescer = BatchCoalescer::new(utf8, 100);
+        assert!(coalescer.has_non_specialized_filter_columns);
+
+        let boolean = Arc::new(Schema::new(vec![Field::new(
+            "boolean",
+            DataType::Boolean,
+            true,
+        )]));
+        let coalescer = BatchCoalescer::new(boolean, 100);
+        assert!(coalescer.has_non_specialized_filter_columns);
+    }
+
     #[derive(Debug, Clone, PartialEq)]
     struct ExpectedLayout {
         len: usize,
@@ -1685,6 +2012,10 @@ mod tests {
         }
     }
 
+    fn sparse_filter(len: usize) -> BooleanArray {
+        BooleanArray::from_iter((0..len).map(|idx| Some(idx % 8 == 0)))
+    }
+
     /// Returns the named column as a StringViewArray
     fn col_as_string_view<'b>(name: &str, batch: &'b RecordBatch) -> &'b 
StringViewArray {
         batch
@@ -1701,18 +2032,20 @@ mod tests {
         let (schema, mut columns, row_count) = batch.into_parts();
 
         for column in columns.iter_mut() {
-            let Some(string_view) = column.as_string_view_opt() else {
+            if let Some(string_view) = column.as_string_view_opt() {
+                // Re-create the StringViewArray to ensure memory layout is
+                // consistent
+                let mut builder = StringViewBuilder::new();
+                for s in string_view.iter() {
+                    builder.append_option(s);
+                }
+                *column = Arc::new(builder.finish());
                 continue;
-            };
+            }
 
-            // Re-create the StringViewArray to ensure memory layout is
-            // consistent
-            let mut builder = StringViewBuilder::new();
-            for s in string_view.iter() {
-                builder.append_option(s);
+            if let Some(binary_view) = column.as_binary_view_opt() {
+                *column = 
Arc::new(BinaryViewArray::from_iter(binary_view.iter()));
             }
-            // Update the column with the new StringViewArray
-            *column = Arc::new(builder.finish());
         }
 
         let options = 
RecordBatchOptions::new().with_row_count(Some(row_count));
diff --git a/arrow-select/src/coalesce/byte_view.rs 
b/arrow-select/src/coalesce/byte_view.rs
index 6062cd5e77..32f92d044e 100644
--- a/arrow-select/src/coalesce/byte_view.rs
+++ b/arrow-select/src/coalesce/byte_view.rs
@@ -16,10 +16,11 @@
 // under the License.
 
 use crate::coalesce::InProgressArray;
+use crate::filter::{FilterPredicate, FilterSelection, filter_null_mask};
 use arrow_array::cast::AsArray;
 use arrow_array::types::ByteViewType;
 use arrow_array::{Array, ArrayRef, GenericByteViewArray};
-use arrow_buffer::{Buffer, NullBufferBuilder};
+use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, NullBufferBuilder};
 use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN};
 use arrow_schema::ArrowError;
 use std::marker::PhantomData;
@@ -111,6 +112,61 @@ impl<B: ByteViewType> InProgressByteViewArray<B> {
         self.completed.push(next_buffer.into());
     }
 
+    fn append_views_by_filter(&mut self, views: &[u128], filter: 
&FilterPredicate) {
+        let selected_count = filter.count();
+        let current_len = self.views.len();
+        self.views.reserve(selected_count);
+
+        let mut written = 0;
+
+        unsafe {
+            let mut out = 
self.views.spare_capacity_mut().as_mut_ptr().cast::<u128>();
+
+            match filter.selection() {
+                FilterSelection::None => {}
+                FilterSelection::All { .. } => {
+                    std::ptr::copy_nonoverlapping(views.as_ptr(), out, 
selected_count);
+                    written = selected_count;
+                }
+                FilterSelection::Slices(slices) => {
+                    slices.for_each(|(start, end)| {
+                        let len = end - start;
+                        
std::ptr::copy_nonoverlapping(views.as_ptr().add(start), out, len);
+                        out = out.add(len);
+                        written += len;
+                    });
+                }
+                FilterSelection::Indices(indices) => {
+                    indices.for_each(|idx| {
+                        out.write(*views.get_unchecked(idx));
+                        out = out.add(1);
+                        written += 1;
+                    });
+                }
+            }
+
+            self.views.set_len(current_len + written);
+        }
+
+        debug_assert_eq!(written, selected_count);
+    }
+
+    fn append_nulls_by_filter(
+        &mut self,
+        filter: &FilterPredicate,
+        source_nulls: Option<&NullBuffer>,
+    ) {
+        let Some((null_count, nulls)) = filter_null_mask(source_nulls, filter) 
else {
+            self.nulls.append_n_non_nulls(filter.count());
+            return;
+        };
+
+        let nulls = unsafe {
+            NullBuffer::new_unchecked(BooleanBuffer::new(nulls, 0, 
filter.count()), null_count)
+        };
+        self.nulls.append_buffer(&nulls);
+    }
+
     /// Append views to self.views, updating the buffer index if necessary
     #[inline(never)]
     fn append_views_and_update_buffer_index(&mut self, views: &[u128], 
buffers: &[Buffer]) {
@@ -325,7 +381,8 @@ impl<B: ByteViewType> InProgressArray for 
InProgressByteViewArray<B> {
         };
 
         let buffers = s.data_buffers();
-        let views = &s.views().as_ref()[offset..offset + len];
+        // SAFETY: copy_rows is called with ranges derived from the source 
array.
+        let views = unsafe { s.views().as_ref().get_unchecked(offset..offset + 
len) };
 
         // If there are no data buffers in s (all inlined views), can append 
the
         // views/nulls and done
@@ -346,6 +403,59 @@ impl<B: ByteViewType> InProgressArray for 
InProgressByteViewArray<B> {
         Ok(())
     }
 
+    fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), 
ArrowError> {
+        self.ensure_capacity();
+        let source = self.source.take().ok_or_else(|| {
+            ArrowError::InvalidArgumentError(
+                "Internal Error: InProgressByteViewArray: source not 
set".to_string(),
+            )
+        })?;
+
+        let s = source.array.as_byte_view::<B>();
+
+        if !s.data_buffers().is_empty() {
+            // Restore the source taken above before returning the guard error.
+            self.source = Some(source);
+            return Err(ArrowError::InvalidArgumentError(
+                "Internal Error: InProgressByteViewArray::copy_rows_by_filter 
requires inline views"
+                    .to_string(),
+            ));
+        }
+
+        self.append_nulls_by_filter(filter, s.nulls());
+        self.append_views_by_filter(s.views(), filter);
+
+        self.source = Some(source);
+        Ok(())
+    }
+
+    fn copy_rows_by_filter_from(
+        &mut self,
+        source: ArrayRef,
+        filter: &FilterPredicate,
+    ) -> Result<(), ArrowError> {
+        let s = source.as_byte_view::<B>();
+        if s.data_buffers().is_empty() {
+            self.ensure_capacity();
+            self.append_nulls_by_filter(filter, s.nulls());
+            self.append_views_by_filter(s.views(), filter);
+            return Ok(());
+        }
+
+        // Match the filter kernel: filter views/nulls, but reuse data buffers.
+        let filtered = filter.filter(source.as_ref())?;
+        let filtered = filtered.as_byte_view::<B>();
+
+        self.ensure_capacity();
+        if let Some(nulls) = filtered.nulls().as_ref() {
+            self.nulls.append_buffer(nulls);
+        } else {
+            self.nulls.append_n_non_nulls(filter.count());
+        }
+        self.append_views_and_update_buffer_index(filtered.views(), 
filtered.data_buffers());
+        Ok(())
+    }
+
     fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
         self.finish_current();
         assert!(self.current.is_none());
@@ -405,6 +515,9 @@ impl BufferSource {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::filter::FilterBuilder;
+    use arrow_array::types::BinaryViewType;
+    use arrow_array::{BinaryViewArray, BooleanArray};
 
     #[test]
     fn test_buffer_source() {
@@ -444,4 +557,54 @@ mod tests {
         // Can override with larger size request
         assert_eq!(source.next_buffer(2_000_000).capacity(), 2_000_000);
     }
+
+    #[test]
+    fn test_copy_rows_by_filter_rejects_non_inline_views() {
+        let values: Vec<Option<&[u8]>> = vec![Some(b"This value is longer than 
12 bytes")];
+        let array = BinaryViewArray::from_iter(values);
+        assert!(!array.data_buffers().is_empty());
+
+        let mut in_progress = 
InProgressByteViewArray::<BinaryViewType>::new(1);
+        in_progress.set_source(Some(Arc::new(array)));
+
+        let filter = BooleanArray::from(vec![true]);
+        let predicate = FilterBuilder::new(&filter).build();
+        let err = in_progress.copy_rows_by_filter(&predicate).unwrap_err();
+
+        assert!(
+            err.to_string().contains("requires inline views"),
+            "unexpected error: {err}"
+        );
+    }
+
+    #[test]
+    fn test_copy_rows_by_filter_from_reuses_non_inline_buffers() {
+        let values = (0..32)
+            .map(|i| format!("This value is longer than 12 bytes: 
{i}").into_bytes())
+            .collect::<Vec<_>>();
+        let array = BinaryViewArray::from_iter(values.iter().map(|v| 
Some(v.as_slice())));
+        assert!(!array.data_buffers().is_empty());
+        let source_buffer = array.data_buffers()[0].as_ptr();
+
+        let filter = BooleanArray::from((0..32).map(|i| i == 3 || i == 
29).collect::<Vec<_>>());
+        let predicate = FilterBuilder::new(&filter).build();
+
+        let mut in_progress = 
InProgressByteViewArray::<BinaryViewType>::new(32);
+        in_progress
+            .copy_rows_by_filter_from(Arc::new(array), &predicate)
+            .unwrap();
+        let output = in_progress.finish().unwrap();
+        let output = output.as_binary_view();
+
+        assert_eq!(output.len(), 2);
+        assert_eq!(output.value(0), values[3].as_slice());
+        assert_eq!(output.value(1), values[29].as_slice());
+        assert!(
+            output
+                .data_buffers()
+                .iter()
+                .any(|buffer| std::ptr::addr_eq(buffer.as_ptr(), 
source_buffer)),
+            "expected filtered output to reuse the source data buffer"
+        );
+    }
 }
diff --git a/arrow-select/src/coalesce/generic.rs 
b/arrow-select/src/coalesce/generic.rs
index 1ea57dff92..4fa64273ec 100644
--- a/arrow-select/src/coalesce/generic.rs
+++ b/arrow-select/src/coalesce/generic.rs
@@ -17,6 +17,7 @@
 
 use super::InProgressArray;
 use crate::concat::concat;
+use crate::filter::FilterPredicate;
 use arrow_array::ArrayRef;
 use arrow_schema::ArrowError;
 
@@ -60,6 +61,16 @@ impl InProgressArray for GenericInProgressArray {
         Ok(())
     }
 
+    fn copy_rows_by_filter_from(
+        &mut self,
+        source: ArrayRef,
+        filter: &FilterPredicate,
+    ) -> Result<(), ArrowError> {
+        let array = filter.filter(source.as_ref())?;
+        self.buffered_arrays.push(array);
+        Ok(())
+    }
+
     fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
         // Concatenate all buffered arrays into a single array, which uses 2x
         // peak memory
diff --git a/arrow-select/src/coalesce/primitive.rs 
b/arrow-select/src/coalesce/primitive.rs
index a7f2fb32ce..ac831fe089 100644
--- a/arrow-select/src/coalesce/primitive.rs
+++ b/arrow-select/src/coalesce/primitive.rs
@@ -16,9 +16,12 @@
 // under the License.
 
 use crate::coalesce::InProgressArray;
+use crate::filter::{
+    FilterIndices, FilterPredicate, FilterSelection, FilterSlices, 
filter_null_mask,
+};
 use arrow_array::cast::AsArray;
 use arrow_array::{Array, ArrayRef, ArrowPrimitiveType, PrimitiveArray};
-use arrow_buffer::{NullBufferBuilder, ScalarBuffer};
+use arrow_buffer::{BooleanBuffer, NullBuffer, NullBufferBuilder, ScalarBuffer};
 use arrow_schema::{ArrowError, DataType};
 use std::fmt::Debug;
 use std::sync::Arc;
@@ -59,6 +62,95 @@ impl<T: ArrowPrimitiveType> InProgressPrimitiveArray<T> {
             self.current.reserve(self.batch_size);
         }
     }
+
+    fn append_values_by_indices(
+        current: &mut Vec<T::Native>,
+        values: &[T::Native],
+        indices: FilterIndices<'_>,
+        selected_count: usize,
+    ) {
+        let current_len = current.len();
+        let mut written = 0;
+
+        unsafe {
+            let mut out = current
+                .spare_capacity_mut()
+                .as_mut_ptr()
+                .cast::<T::Native>();
+
+            indices.for_each(|idx| {
+                // SAFETY: indices are derived from the filter predicate for 
this source.
+                out.write(*values.get_unchecked(idx));
+                out = out.add(1);
+                written += 1;
+            });
+
+            current.set_len(current_len + written);
+        }
+
+        debug_assert_eq!(written, selected_count);
+    }
+
+    fn append_values_by_slices(
+        current: &mut Vec<T::Native>,
+        values: &[T::Native],
+        slices: FilterSlices<'_>,
+        selected_count: usize,
+    ) {
+        let current_len = current.len();
+        let mut written = 0;
+
+        unsafe {
+            let mut out = current
+                .spare_capacity_mut()
+                .as_mut_ptr()
+                .cast::<T::Native>();
+
+            slices.for_each(|(start, end)| {
+                let len = end - start;
+                // SAFETY: slices are derived from the filter predicate for 
this source.
+                std::ptr::copy_nonoverlapping(values.as_ptr().add(start), out, 
len);
+                out = out.add(len);
+                written += len;
+            });
+
+            current.set_len(current_len + written);
+        }
+
+        debug_assert_eq!(written, selected_count);
+    }
+}
+
+#[inline]
+fn primitive_source<T: ArrowPrimitiveType>(
+    source: &Option<ArrayRef>,
+) -> Result<&PrimitiveArray<T>, ArrowError> {
+    Ok(source
+        .as_ref()
+        .ok_or_else(|| {
+            ArrowError::InvalidArgumentError(
+                "Internal Error: InProgressPrimitiveArray: source not 
set".to_string(),
+            )
+        })?
+        .as_primitive::<T>())
+}
+
+fn append_filtered_nulls(
+    nulls: &mut NullBufferBuilder,
+    source_nulls: Option<&NullBuffer>,
+    filter: &FilterPredicate,
+) {
+    if let Some((null_count, filtered_nulls)) = filter_null_mask(source_nulls, 
filter) {
+        let filtered_nulls = unsafe {
+            NullBuffer::new_unchecked(
+                BooleanBuffer::new(filtered_nulls, 0, filter.count()),
+                null_count,
+            )
+        };
+        nulls.append_buffer(&filtered_nulls);
+    } else {
+        nulls.append_n_non_nulls(filter.count());
+    }
 }
 
 impl<T: ArrowPrimitiveType + Debug> InProgressArray for 
InProgressPrimitiveArray<T> {
@@ -69,15 +161,7 @@ impl<T: ArrowPrimitiveType + Debug> InProgressArray for 
InProgressPrimitiveArray
     fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), 
ArrowError> {
         self.ensure_capacity();
 
-        let s = self
-            .source
-            .as_ref()
-            .ok_or_else(|| {
-                ArrowError::InvalidArgumentError(
-                    "Internal Error: InProgressPrimitiveArray: source not 
set".to_string(),
-                )
-            })?
-            .as_primitive::<T>();
+        let s = primitive_source::<T>(&self.source)?;
 
         // add nulls if necessary
         if let Some(nulls) = s.nulls().as_ref() {
@@ -88,12 +172,49 @@ impl<T: ArrowPrimitiveType + Debug> InProgressArray for 
InProgressPrimitiveArray
         };
 
         // Copy the values
+        let values = s.values();
+        // SAFETY: copy_rows is called with ranges derived from the source 
array.
         self.current
-            .extend_from_slice(&s.values()[offset..offset + len]);
+            .extend_from_slice(unsafe { values.get_unchecked(offset..offset + 
len) });
 
         Ok(())
     }
 
+    fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), 
ArrowError> {
+        match filter.selection() {
+            FilterSelection::Indices(indices) => {
+                self.ensure_capacity();
+                let s = primitive_source::<T>(&self.source)?;
+
+                append_filtered_nulls(&mut self.nulls, s.nulls(), filter);
+                self.current.reserve(filter.count());
+                Self::append_values_by_indices(
+                    &mut self.current,
+                    s.values(),
+                    indices,
+                    filter.count(),
+                );
+                Ok(())
+            }
+            FilterSelection::Slices(slices) => {
+                self.ensure_capacity();
+                let s = primitive_source::<T>(&self.source)?;
+
+                append_filtered_nulls(&mut self.nulls, s.nulls(), filter);
+                self.current.reserve(filter.count());
+                Self::append_values_by_slices(
+                    &mut self.current,
+                    s.values(),
+                    slices,
+                    filter.count(),
+                );
+                Ok(())
+            }
+            // Other selection shapes reuse the generic copy_rows path.
+            selection => self.copy_rows_by_selection(selection),
+        }
+    }
+
     fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
         // take and reset the current values and nulls
         let values = std::mem::take(&mut self.current);
@@ -106,3 +227,82 @@ impl<T: ArrowPrimitiveType + Debug> InProgressArray for 
InProgressPrimitiveArray
         Ok(Arc::new(array))
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::filter::FilterBuilder;
+    use arrow_array::types::Int32Type;
+    use arrow_array::{BooleanArray, Int32Array};
+
+    #[test]
+    fn test_copy_rows_by_filter_index_iterator() {
+        let source =
+            Int32Array::from_iter((0..21).map(|idx| if idx % 5 == 0 { None } 
else { Some(idx) }));
+        let filter = BooleanArray::from_iter(
+            (0..21).map(|idx| Some(matches!(idx, 0 | 1 | 2 | 3 | 5 | 8 | 13))),
+        );
+        let predicate = FilterBuilder::new(&filter).build();
+        let FilterSelection::Indices(indices) = predicate.selection() else {
+            panic!("expected index iterator selection");
+        };
+        let mut selected_indices = Vec::new();
+        indices.for_each(|idx| selected_indices.push(idx));
+        assert_eq!(selected_indices, vec![0, 1, 2, 3, 5, 8, 13]);
+
+        let mut in_progress = InProgressPrimitiveArray::<Int32Type>::new(7, 
DataType::Int32);
+        in_progress.set_source(Some(Arc::new(source)));
+        in_progress.copy_rows_by_filter(&predicate).unwrap();
+
+        let result = in_progress.finish().unwrap();
+        let result = result.as_primitive::<Int32Type>();
+        let expected = Int32Array::from(vec![
+            None,
+            Some(1),
+            Some(2),
+            Some(3),
+            None,
+            Some(8),
+            Some(13),
+        ]);
+        assert_eq!(result, &expected);
+    }
+
+    #[test]
+    fn test_copy_rows_by_filter_slice_iterator() {
+        let source =
+            Int32Array::from_iter((0..16).map(|idx| if idx % 5 == 0 { None } 
else { Some(idx) }));
+        let filter = BooleanArray::from_iter((0..16).map(|idx| 
Some(!matches!(idx, 3 | 9))));
+        let predicate = FilterBuilder::new(&filter).build();
+        let FilterSelection::Slices(slices) = predicate.selection() else {
+            panic!("expected slice iterator selection");
+        };
+        let mut selected_slices = Vec::new();
+        slices.for_each(|slice| selected_slices.push(slice));
+        assert_eq!(selected_slices, vec![(0, 3), (4, 9), (10, 16)]);
+
+        let mut in_progress = InProgressPrimitiveArray::<Int32Type>::new(14, 
DataType::Int32);
+        in_progress.set_source(Some(Arc::new(source)));
+        in_progress.copy_rows_by_filter(&predicate).unwrap();
+
+        let result = in_progress.finish().unwrap();
+        let result = result.as_primitive::<Int32Type>();
+        let expected = Int32Array::from(vec![
+            None,
+            Some(1),
+            Some(2),
+            Some(4),
+            None,
+            Some(6),
+            Some(7),
+            Some(8),
+            None,
+            Some(11),
+            Some(12),
+            Some(13),
+            Some(14),
+            None,
+        ]);
+        assert_eq!(result, &expected);
+    }
+}
diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs
index fcbce82d5d..b1f3a21a3e 100644
--- a/arrow-select/src/filter.rs
+++ b/arrow-select/src/filter.rs
@@ -81,13 +81,13 @@ impl Iterator for SlicesIterator<'_> {
 ///
 /// This provides the best performance on most predicates, apart from those 
which keep
 /// large runs and therefore favour [`SlicesIterator`]
-struct IndexIterator<'a> {
+pub(crate) struct IndexIterator<'a> {
     remaining: usize,
     iter: BitIndexIterator<'a>,
 }
 
 impl<'a> IndexIterator<'a> {
-    fn new(filter: &'a BooleanArray, remaining: usize) -> Self {
+    pub(crate) fn new(filter: &'a BooleanArray, remaining: usize) -> Self {
         assert_eq!(filter.null_count(), 0);
         let iter = filter.values().set_indices();
         Self { remaining, iter }
@@ -137,11 +137,6 @@ impl Iterator for IndexIterator<'_> {
     }
 }
 
-/// Counts the number of set bits in `filter`
-fn filter_count(filter: &BooleanArray) -> usize {
-    filter.values().count_set_bits()
-}
-
 /// Convert all null values in `BooleanArray` to `false`
 ///
 /// This is useful for filter-like operations which select only `true`
@@ -259,12 +254,15 @@ pub struct FilterBuilder {
 impl FilterBuilder {
     /// Create a new [`FilterBuilder`] that can be used to construct a 
[`FilterPredicate`]
     pub fn new(filter: &BooleanArray) -> Self {
+        Self::new_with_count(filter, filter.true_count())
+    }
+
+    pub(crate) fn new_with_count(filter: &BooleanArray, count: usize) -> Self {
         let filter = match filter.null_count() {
             0 => filter.clone(),
             _ => prep_null_mask_filter(filter),
         };
 
-        let count = filter_count(&filter);
         let strategy = IterationStrategy::default_strategy(filter.len(), 
count);
 
         Self {
@@ -366,6 +364,66 @@ impl IterationStrategy {
     }
 }
 
+/// Borrowed description of which rows a [`FilterPredicate`] selects.
+pub(crate) enum FilterSelection<'a> {
+    None,
+    All { len: usize },
+    Slices(FilterSlices<'a>),
+    Indices(FilterIndices<'a>),
+}
+
+pub(crate) type FilterSlices<'a> =
+    FilterIterator<std::iter::Copied<std::slice::Iter<'a, (usize, usize)>>, 
SlicesIterator<'a>>;
+
+pub(crate) type FilterIndices<'a> =
+    FilterIterator<std::iter::Copied<std::slice::Iter<'a, usize>>, 
IndexIterator<'a>>;
+
+/// Holds either materialized rows or a lazy iterator.
+///
+/// This does not implement [`Iterator`] on purpose. Callers use
+/// [`Self::for_each`] or [`Self::try_for_each`] so the enum is matched once
+/// before the loop, not once per row in `next`.
+pub(crate) enum FilterIterator<M, I> {
+    Materialized(M),
+    Lazy(I),
+}
+
+impl<M, I> FilterIterator<M, I>
+where
+    M: Iterator,
+    I: Iterator<Item = M::Item>,
+{
+    pub(crate) fn for_each<F>(self, f: F)
+    where
+        F: FnMut(M::Item),
+    {
+        match self {
+            Self::Materialized(iter) => iter.for_each(f),
+            Self::Lazy(iter) => iter.for_each(f),
+        }
+    }
+
+    pub(crate) fn try_for_each<F, E>(self, mut f: F) -> Result<(), E>
+    where
+        F: FnMut(M::Item) -> Result<(), E>,
+    {
+        match self {
+            Self::Materialized(iter) => {
+                for item in iter {
+                    f(item)?;
+                }
+            }
+            Self::Lazy(iter) => {
+                for item in iter {
+                    f(item)?;
+                }
+            }
+        }
+
+        Ok(())
+    }
+}
+
 /// A filtering predicate that can be applied to an [`Array`]
 #[derive(Debug)]
 pub struct FilterPredicate {
@@ -410,6 +468,25 @@ impl FilterPredicate {
         self.count
     }
 
+    pub(crate) fn selection(&self) -> FilterSelection<'_> {
+        match &self.strategy {
+            IterationStrategy::None => FilterSelection::None,
+            IterationStrategy::All => FilterSelection::All { len: self.count },
+            IterationStrategy::Slices(slices) => {
+                
FilterSelection::Slices(FilterIterator::Materialized(slices.iter().copied()))
+            }
+            IterationStrategy::SlicesIterator => {
+                
FilterSelection::Slices(FilterIterator::Lazy(SlicesIterator::new(&self.filter)))
+            }
+            IterationStrategy::Indices(indices) => {
+                
FilterSelection::Indices(FilterIterator::Materialized(indices.iter().copied()))
+            }
+            IterationStrategy::IndexIterator => 
FilterSelection::Indices(FilterIterator::Lazy(
+                IndexIterator::new(&self.filter, self.count),
+            )),
+        }
+    }
+
     /// Filters the given `nulls` buffer using this predicate.
     ///
     /// Returns `None` when there is nothing to track in the output, either
@@ -575,7 +652,7 @@ where
 /// `Some((null_count, null_buffer))` where `null_count` is the number of nulls
 /// in the filtered output, and `null_buffer` is the filtered null buffer
 ///
-fn filter_null_mask(
+pub(crate) fn filter_null_mask(
     nulls: Option<&NullBuffer>,
     predicate: &FilterPredicate,
 ) -> Option<(usize, Buffer)> {
@@ -649,7 +726,10 @@ fn filter_boolean(array: &BooleanArray, predicate: 
&FilterPredicate) -> BooleanA
 }
 
 #[inline(never)]
-fn filter_native<T: ArrowNativeType>(values: &[T], predicate: 
&FilterPredicate) -> Buffer {
+pub(crate) fn filter_native<T: ArrowNativeType>(
+    values: &[T],
+    predicate: &FilterPredicate,
+) -> Buffer {
     assert!(values.len() >= predicate.filter.len());
 
     match &predicate.strategy {
@@ -1571,7 +1651,7 @@ mod tests {
     fn test_slice_iterator_bits() {
         let filter_values = (0..64).map(|i| i == 1).collect::<Vec<bool>>();
         let filter = BooleanArray::from(filter_values);
-        let filter_count = filter_count(&filter);
+        let filter_count = filter.true_count();
 
         let iter = SlicesIterator::new(&filter);
         let chunks = iter.collect::<Vec<_>>();
@@ -1584,7 +1664,7 @@ mod tests {
     fn test_slice_iterator_bits1() {
         let filter_values = (0..64).map(|i| i != 1).collect::<Vec<bool>>();
         let filter = BooleanArray::from(filter_values);
-        let filter_count = filter_count(&filter);
+        let filter_count = filter.true_count();
 
         let iter = SlicesIterator::new(&filter);
         let chunks = iter.collect::<Vec<_>>();
@@ -1597,7 +1677,7 @@ mod tests {
     fn test_slice_iterator_chunk_and_bits() {
         let filter_values = (0..130).map(|i| i % 62 != 
0).collect::<Vec<bool>>();
         let filter = BooleanArray::from(filter_values);
-        let filter_count = filter_count(&filter);
+        let filter_count = filter.true_count();
 
         let iter = SlicesIterator::new(&filter);
         let chunks = iter.collect::<Vec<_>>();
@@ -1606,6 +1686,43 @@ mod tests {
         assert_eq!(filter_count, 61 + 61 + 5);
     }
 
+    #[test]
+    fn test_filter_selection_iterators() {
+        let slices = [(0, 2), (4, 5)];
+        let mut ranges = Vec::new();
+        let selection: FilterSlices<'_> = 
FilterIterator::Materialized(slices.iter().copied());
+        selection.for_each(|range| ranges.push(range));
+        assert_eq!(ranges, slices);
+
+        let filter = BooleanArray::from(vec![true, true, false, false, true]);
+        let mut ranges = Vec::new();
+        let selection: FilterSlices<'_> = 
FilterIterator::Lazy(SlicesIterator::new(&filter));
+        selection
+            .try_for_each(|range| {
+                ranges.push(range);
+                Ok::<(), ArrowError>(())
+            })
+            .unwrap();
+        assert_eq!(ranges, vec![(0, 2), (4, 5)]);
+
+        let indices = [1, 3, 5];
+        let mut selected = Vec::new();
+        let selection: FilterIndices<'_> = 
FilterIterator::Materialized(indices.iter().copied());
+        selection.for_each(|idx| selected.push(idx));
+        assert_eq!(selected, indices);
+
+        let filter = BooleanArray::from(vec![false, true, false, true]);
+        let mut selected = Vec::new();
+        let selection: FilterIndices<'_> = 
FilterIterator::Lazy(IndexIterator::new(&filter, 2));
+        selection
+            .try_for_each(|idx| {
+                selected.push(idx);
+                Ok::<(), ArrowError>(())
+            })
+            .unwrap();
+        assert_eq!(selected, vec![1, 3]);
+    }
+
     #[test]
     fn test_null_mask() {
         let a = Int64Array::from(vec![Some(1), Some(2), None]);
@@ -1690,7 +1807,7 @@ mod tests {
             .flat_map(|(start, end)| start..end)
             .collect();
 
-        let count = filter_count(&filter);
+        let count = filter.true_count();
         let index_bits: Vec<_> = IndexIterator::new(&filter, count).collect();
 
         let expected_bits: Vec<_> = bools

Reply via email to