alamb commented on a change in pull request #1248:
URL: https://github.com/apache/arrow-rs/pull/1248#discussion_r798754727



##########
File path: arrow/benches/filter_kernels.rs
##########
@@ -42,12 +42,24 @@ fn add_benchmark(c: &mut Criterion) {
     let dense_filter_array = create_boolean_array(size, 0.0, 1.0 - 1.0 / 
1024.0);
     let sparse_filter_array = create_boolean_array(size, 0.0, 1.0 / 1024.0);
 
-    let filter = build_filter(&filter_array).unwrap();
-    let dense_filter = build_filter(&dense_filter_array).unwrap();
-    let sparse_filter = build_filter(&sparse_filter_array).unwrap();
+    let filter = FilterBuilder::new(&filter_array).optimize().build();
+    let dense_filter = 
FilterBuilder::new(&dense_filter_array).optimize().build();
+    let sparse_filter = 
FilterBuilder::new(&sparse_filter_array).optimize().build();
 
     let data_array = create_primitive_array::<UInt8Type>(size, 0.0);
 
+    c.bench_function("filter optimize", |b| {

Review comment:
       maybe we should name the default ones like 
   
   ```suggestion
       c.bench_function("filter optimize (1/2)", |b| {
   ```
   
   To make it clear?

##########
File path: arrow/benches/filter_kernels.rs
##########
@@ -42,12 +42,24 @@ fn add_benchmark(c: &mut Criterion) {
     let dense_filter_array = create_boolean_array(size, 0.0, 1.0 - 1.0 / 
1024.0);
     let sparse_filter_array = create_boolean_array(size, 0.0, 1.0 / 1024.0);
 
-    let filter = build_filter(&filter_array).unwrap();
-    let dense_filter = build_filter(&dense_filter_array).unwrap();
-    let sparse_filter = build_filter(&sparse_filter_array).unwrap();
+    let filter = FilterBuilder::new(&filter_array).optimize().build();
+    let dense_filter = 
FilterBuilder::new(&dense_filter_array).optimize().build();
+    let sparse_filter = 
FilterBuilder::new(&sparse_filter_array).optimize().build();
 
     let data_array = create_primitive_array::<UInt8Type>(size, 0.0);
 
+    c.bench_function("filter optimize", |b| {
+        b.iter(|| FilterBuilder::new(&filter_array).optimize().build())
+    });
+
+    c.bench_function("filter optimize high selectivity", |b| {
+        b.iter(|| FilterBuilder::new(&dense_filter_array).optimize().build())
+    });
+
+    c.bench_function("filter optimize low selectivity", |b| {

Review comment:
       ```suggestion
       c.bench_function("filter optimize low selectivity (1/1024)", |b| {
   ```

##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -185,79 +279,559 @@ pub fn prep_null_mask_filter(filter: &BooleanArray) -> 
BooleanArray {
 /// # Ok(())
 /// # }
 /// ```
-pub fn filter(array: &dyn Array, predicate: &BooleanArray) -> Result<ArrayRef> 
{
-    if predicate.null_count() > 0 {
-        // this greatly simplifies subsequent filtering code
-        // now we only have a boolean mask to deal with
-        let predicate = prep_null_mask_filter(predicate);
-        return filter(array, &predicate);
+pub fn filter(values: &dyn Array, predicate: &BooleanArray) -> 
Result<ArrayRef> {
+    let predicate = FilterBuilder::new(predicate).build();
+    filter_array(values, &predicate)
+}
+
+/// Returns a new [RecordBatch] with arrays containing only values matching 
the filter.
+pub fn filter_record_batch(
+    record_batch: &RecordBatch,
+    predicate: &BooleanArray,
+) -> Result<RecordBatch> {
+    let filter = FilterBuilder::new(predicate).optimize().build();
+
+    let filtered_arrays = record_batch
+        .columns()
+        .iter()
+        .map(|a| filter_array(a, &filter))
+        .collect::<Result<Vec<_>>>()?;
+
+    RecordBatch::try_new(record_batch.schema(), filtered_arrays)
+}
+
+/// A builder to construct [`FilterPredicate`]
+#[derive(Debug)]
+pub struct FilterBuilder {
+    filter: BooleanArray,
+    count: usize,
+    iterator: FilterIterator,
+}
+
+impl FilterBuilder {
+    /// Create a new [`FilterBuilder`] that can be used to construct a 
[`FilterPredicate`]
+    pub fn new(filter: &BooleanArray) -> Self {
+        let filter = match filter.null_count() {
+            0 => BooleanArray::from(filter.data().clone()),
+            _ => prep_null_mask_filter(filter),
+        };
+
+        let count = filter_count(&filter);
+        let selectivity_frac = count as f64 / filter.len() as f64;
+        let iterator = if selectivity_frac > 0.8 {
+            FilterIterator::SlicesIterator
+        } else {
+            FilterIterator::IndexIterator
+        };
+
+        Self {
+            filter,
+            count,
+            iterator,
+        }
+    }
+
+    /// Compute an optimised representation of the provided `filter` mask that 
can be
+    /// applied to an array more quickly.
+    ///
+    /// Note: There is limited benefit to calling this to then filter a single 
array
+    /// Note: This will likely have a larger memory footprint than the 
original mask
+    pub fn optimize(mut self) -> Self {
+        match self.iterator {
+            FilterIterator::SlicesIterator => {
+                let slices = SlicesIterator::new(&self.filter).collect();
+                self.iterator = FilterIterator::Slices(slices)
+            }
+            FilterIterator::IndexIterator => {
+                let indices = IndexIterator::new(&self.filter, 
self.count).collect();
+                self.iterator = FilterIterator::Indices(indices)
+            }
+            _ => {}
+        }
+        self
+    }
+
+    /// Construct the final `FilterPredicate`
+    pub fn build(self) -> FilterPredicate {
+        FilterPredicate {
+            filter: self.filter,
+            count: self.count,
+            iterator: self.iterator,
+        }
     }
+}
 
-    let filter_count = filter_count(predicate);
+/// The internal iterator type of [`FilterPredicate`]
+#[derive(Debug)]
+enum FilterIterator {
+    // A lazily evaluated iterator of ranges
+    SlicesIterator,
+    // A lazily evaluated iterator of indices
+    IndexIterator,
+    // A precomputed list of indices
+    Indices(Vec<usize>),
+    // A precomputed array of ranges
+    Slices(Vec<(usize, usize)>),
+}
+
+/// A filtering predicate that can be applied to an [`Array`]
+#[derive(Debug)]
+pub struct FilterPredicate {
+    filter: BooleanArray,
+    count: usize,
+    iterator: FilterIterator,
+}
 
-    match filter_count {
+impl FilterPredicate {
+    /// Selects rows from `values` based on this [`FilterPredicate`]
+    pub fn filter(&self, values: &dyn Array) -> Result<ArrayRef> {
+        filter_array(values, self)
+    }
+}
+
+fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> 
Result<ArrayRef> {
+    if predicate.filter.len() > values.len() {
+        return Err(ArrowError::InvalidArgumentError(format!(
+            "Filter predicate of length {} is larger than target array of 
length {}",
+            predicate.filter.len(),
+            values.len()
+        )));
+    }
+
+    match predicate.count {
         0 => {
             // return empty
-            Ok(new_empty_array(array.data_type()))
+            Ok(new_empty_array(values.data_type()))
         }
-        len if len == array.len() => {
+        len if len == values.len() => {
             // return all
-            let data = array.data().clone();
+            let data = values.data().clone();
             Ok(make_array(data))
         }
-        _ => {
-            // actually filter
-            let mut mutable =
-                MutableArrayData::new(vec![array.data_ref()], false, 
filter_count);
+        // actually filter
+        _ => match values.data_type() {
+            DataType::Boolean => {
+                let values = 
values.as_any().downcast_ref::<BooleanArray>().unwrap();
+                Ok(Arc::new(filter_boolean(values, predicate)))
+            }
+            DataType::Int8 => {
+                downcast_filter!(Int8Type, values, predicate)
+            }
+            DataType::Int16 => {
+                downcast_filter!(Int16Type, values, predicate)
+            }
+            DataType::Int32 => {
+                downcast_filter!(Int32Type, values, predicate)
+            }
+            DataType::Int64 => {
+                downcast_filter!(Int64Type, values, predicate)
+            }
+            DataType::UInt8 => {
+                downcast_filter!(UInt8Type, values, predicate)
+            }
+            DataType::UInt16 => {
+                downcast_filter!(UInt16Type, values, predicate)
+            }
+            DataType::UInt32 => {
+                downcast_filter!(UInt32Type, values, predicate)
+            }
+            DataType::UInt64 => {
+                downcast_filter!(UInt64Type, values, predicate)
+            }
+            DataType::Float32 => {
+                downcast_filter!(Float32Type, values, predicate)
+            }
+            DataType::Float64 => {
+                downcast_filter!(Float64Type, values, predicate)
+            }
+            DataType::Date32 => {
+                downcast_filter!(Date32Type, values, predicate)
+            }
+            DataType::Date64 => {
+                downcast_filter!(Date64Type, values, predicate)
+            }
+            DataType::Time32(Second) => {
+                downcast_filter!(Time32SecondType, values, predicate)
+            }
+            DataType::Time32(Millisecond) => {
+                downcast_filter!(Time32MillisecondType, values, predicate)
+            }
+            DataType::Time64(Microsecond) => {
+                downcast_filter!(Time64MicrosecondType, values, predicate)
+            }
+            DataType::Time64(Nanosecond) => {
+                downcast_filter!(Time64NanosecondType, values, predicate)
+            }
+            DataType::Timestamp(Second, _) => {
+                downcast_filter!(TimestampSecondType, values, predicate)
+            }
+            DataType::Timestamp(Millisecond, _) => {
+                downcast_filter!(TimestampMillisecondType, values, predicate)
+            }
+            DataType::Timestamp(Microsecond, _) => {
+                downcast_filter!(TimestampMicrosecondType, values, predicate)
+            }
+            DataType::Timestamp(Nanosecond, _) => {
+                downcast_filter!(TimestampNanosecondType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::YearMonth) => {
+                downcast_filter!(IntervalYearMonthType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::DayTime) => {
+                downcast_filter!(IntervalDayTimeType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::MonthDayNano) => {
+                downcast_filter!(IntervalMonthDayNanoType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Second) => {
+                downcast_filter!(DurationSecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Millisecond) => {
+                downcast_filter!(DurationMillisecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Microsecond) => {
+                downcast_filter!(DurationMicrosecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Nanosecond) => {
+                downcast_filter!(DurationNanosecondType, values, predicate)
+            }
+            DataType::Utf8 => {
+                let values = values
+                    .as_any()
+                    .downcast_ref::<GenericStringArray<i32>>()
+                    .unwrap();
+                Ok(Arc::new(filter_string::<i32>(values, predicate)))
+            }
+            DataType::LargeUtf8 => {
+                let values = values
+                    .as_any()
+                    .downcast_ref::<GenericStringArray<i64>>()
+                    .unwrap();
+                Ok(Arc::new(filter_string::<i64>(values, predicate)))
+            }
+            DataType::Dictionary(key_type, _) => match key_type.as_ref() {
+                DataType::Int8 => downcast_dict_filter!(Int8Type, values, 
predicate),
+                DataType::Int16 => downcast_dict_filter!(Int16Type, values, 
predicate),
+                DataType::Int32 => downcast_dict_filter!(Int32Type, values, 
predicate),
+                DataType::Int64 => downcast_dict_filter!(Int64Type, values, 
predicate),
+                DataType::UInt8 => downcast_dict_filter!(UInt8Type, values, 
predicate),
+                DataType::UInt16 => downcast_dict_filter!(UInt16Type, values, 
predicate),
+                DataType::UInt32 => downcast_dict_filter!(UInt32Type, values, 
predicate),
+                DataType::UInt64 => downcast_dict_filter!(UInt64Type, values, 
predicate),
+                t => unimplemented!("Take not supported for dictionary key 
type {:?}", t),
+            },
+            _ => {
+                // fallback to using MutableArrayData
+                let mut mutable = MutableArrayData::new(
+                    vec![values.data_ref()],
+                    false,
+                    predicate.count,
+                );
+
+                let iter = SlicesIterator::new(&predicate.filter);
+                iter.for_each(|(start, end)| mutable.extend(0, start, end));
+
+                let data = mutable.freeze();
+                Ok(make_array(data))
+            }
+        },
+    }
+}
+
+/// Computes a new null mask for `data` based on `predicate`
+///
+/// Returns `None` if no nulls in the result
+fn filter_null_mask(
+    data: &ArrayData,
+    predicate: &FilterPredicate,
+) -> Option<(usize, Buffer)> {
+    if data.null_count() == 0 {
+        return None;
+    }
+
+    let nulls = filter_bits(data.null_buffer()?, data.offset(), predicate);
+    let null_count = predicate.count - nulls.count_set_bits();

Review comment:
       this seems to assume that the filter is always `false` for any null 
location (e.g. `prep_null_mask_filter` has been called). Is that guaranteed? 
Perhaps we can use a `debug_assert_eq!` style thing to check

##########
File path: arrow/benches/filter_kernels.rs
##########
@@ -42,12 +42,24 @@ fn add_benchmark(c: &mut Criterion) {
     let dense_filter_array = create_boolean_array(size, 0.0, 1.0 - 1.0 / 
1024.0);
     let sparse_filter_array = create_boolean_array(size, 0.0, 1.0 / 1024.0);
 
-    let filter = build_filter(&filter_array).unwrap();
-    let dense_filter = build_filter(&dense_filter_array).unwrap();
-    let sparse_filter = build_filter(&sparse_filter_array).unwrap();
+    let filter = FilterBuilder::new(&filter_array).optimize().build();
+    let dense_filter = 
FilterBuilder::new(&dense_filter_array).optimize().build();
+    let sparse_filter = 
FilterBuilder::new(&sparse_filter_array).optimize().build();
 
     let data_array = create_primitive_array::<UInt8Type>(size, 0.0);
 
+    c.bench_function("filter optimize", |b| {
+        b.iter(|| FilterBuilder::new(&filter_array).optimize().build())
+    });
+
+    c.bench_function("filter optimize high selectivity", |b| {
+        b.iter(|| FilterBuilder::new(&dense_filter_array).optimize().build())
+    });
+
+    c.bench_function("filter optimize low selectivity", |b| {

Review comment:
       same suggestions for the other benchmarks

##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -119,17 +147,83 @@ impl<'a> Iterator for SlicesIterator<'a> {
     }
 }
 
+/// An iterator of `usize` whose index in [`BooleanArray`] is true
+///
+/// This provides the best performance on all but the most selective 
predicates, where the

Review comment:
       I think "least selective" means "high selectivity", confusingly

##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -185,79 +279,559 @@ pub fn prep_null_mask_filter(filter: &BooleanArray) -> 
BooleanArray {
 /// # Ok(())
 /// # }
 /// ```
-pub fn filter(array: &dyn Array, predicate: &BooleanArray) -> Result<ArrayRef> 
{
-    if predicate.null_count() > 0 {
-        // this greatly simplifies subsequent filtering code
-        // now we only have a boolean mask to deal with
-        let predicate = prep_null_mask_filter(predicate);
-        return filter(array, &predicate);
+pub fn filter(values: &dyn Array, predicate: &BooleanArray) -> 
Result<ArrayRef> {
+    let predicate = FilterBuilder::new(predicate).build();
+    filter_array(values, &predicate)
+}
+
+/// Returns a new [RecordBatch] with arrays containing only values matching 
the filter.
+pub fn filter_record_batch(
+    record_batch: &RecordBatch,
+    predicate: &BooleanArray,
+) -> Result<RecordBatch> {
+    let filter = FilterBuilder::new(predicate).optimize().build();
+
+    let filtered_arrays = record_batch
+        .columns()
+        .iter()
+        .map(|a| filter_array(a, &filter))
+        .collect::<Result<Vec<_>>>()?;
+
+    RecordBatch::try_new(record_batch.schema(), filtered_arrays)
+}
+
+/// A builder to construct [`FilterPredicate`]
+#[derive(Debug)]
+pub struct FilterBuilder {
+    filter: BooleanArray,
+    count: usize,
+    iterator: FilterIterator,
+}
+
+impl FilterBuilder {
+    /// Create a new [`FilterBuilder`] that can be used to construct a 
[`FilterPredicate`]
+    pub fn new(filter: &BooleanArray) -> Self {
+        let filter = match filter.null_count() {
+            0 => BooleanArray::from(filter.data().clone()),
+            _ => prep_null_mask_filter(filter),
+        };
+
+        let count = filter_count(&filter);
+        let selectivity_frac = count as f64 / filter.len() as f64;
+        let iterator = if selectivity_frac > 0.8 {
+            FilterIterator::SlicesIterator
+        } else {
+            FilterIterator::IndexIterator
+        };
+
+        Self {
+            filter,
+            count,
+            iterator,
+        }
+    }
+
+    /// Compute an optimised representation of the provided `filter` mask that 
can be
+    /// applied to an array more quickly.
+    ///
+    /// Note: There is limited benefit to calling this to then filter a single 
array
+    /// Note: This will likely have a larger memory footprint than the 
original mask
+    pub fn optimize(mut self) -> Self {
+        match self.iterator {
+            FilterIterator::SlicesIterator => {
+                let slices = SlicesIterator::new(&self.filter).collect();
+                self.iterator = FilterIterator::Slices(slices)
+            }
+            FilterIterator::IndexIterator => {
+                let indices = IndexIterator::new(&self.filter, 
self.count).collect();
+                self.iterator = FilterIterator::Indices(indices)
+            }
+            _ => {}
+        }
+        self
+    }
+
+    /// Construct the final `FilterPredicate`
+    pub fn build(self) -> FilterPredicate {
+        FilterPredicate {
+            filter: self.filter,
+            count: self.count,
+            iterator: self.iterator,
+        }
     }
+}
 
-    let filter_count = filter_count(predicate);
+/// The internal iterator type of [`FilterPredicate`]
+#[derive(Debug)]
+enum FilterIterator {
+    // A lazily evaluated iterator of ranges
+    SlicesIterator,
+    // A lazily evaluated iterator of indices
+    IndexIterator,
+    // A precomputed list of indices
+    Indices(Vec<usize>),
+    // A precomputed array of ranges
+    Slices(Vec<(usize, usize)>),
+}
+
+/// A filtering predicate that can be applied to an [`Array`]
+#[derive(Debug)]
+pub struct FilterPredicate {
+    filter: BooleanArray,
+    count: usize,
+    iterator: FilterIterator,
+}
 
-    match filter_count {
+impl FilterPredicate {
+    /// Selects rows from `values` based on this [`FilterPredicate`]
+    pub fn filter(&self, values: &dyn Array) -> Result<ArrayRef> {
+        filter_array(values, self)
+    }
+}
+
+fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> 
Result<ArrayRef> {
+    if predicate.filter.len() > values.len() {
+        return Err(ArrowError::InvalidArgumentError(format!(
+            "Filter predicate of length {} is larger than target array of 
length {}",
+            predicate.filter.len(),
+            values.len()
+        )));
+    }
+
+    match predicate.count {
         0 => {
             // return empty
-            Ok(new_empty_array(array.data_type()))
+            Ok(new_empty_array(values.data_type()))
         }
-        len if len == array.len() => {
+        len if len == values.len() => {
             // return all
-            let data = array.data().clone();
+            let data = values.data().clone();
             Ok(make_array(data))
         }
-        _ => {
-            // actually filter
-            let mut mutable =
-                MutableArrayData::new(vec![array.data_ref()], false, 
filter_count);
+        // actually filter
+        _ => match values.data_type() {
+            DataType::Boolean => {
+                let values = 
values.as_any().downcast_ref::<BooleanArray>().unwrap();
+                Ok(Arc::new(filter_boolean(values, predicate)))
+            }
+            DataType::Int8 => {
+                downcast_filter!(Int8Type, values, predicate)
+            }
+            DataType::Int16 => {
+                downcast_filter!(Int16Type, values, predicate)
+            }
+            DataType::Int32 => {
+                downcast_filter!(Int32Type, values, predicate)
+            }
+            DataType::Int64 => {
+                downcast_filter!(Int64Type, values, predicate)
+            }
+            DataType::UInt8 => {
+                downcast_filter!(UInt8Type, values, predicate)
+            }
+            DataType::UInt16 => {
+                downcast_filter!(UInt16Type, values, predicate)
+            }
+            DataType::UInt32 => {
+                downcast_filter!(UInt32Type, values, predicate)
+            }
+            DataType::UInt64 => {
+                downcast_filter!(UInt64Type, values, predicate)
+            }
+            DataType::Float32 => {
+                downcast_filter!(Float32Type, values, predicate)
+            }
+            DataType::Float64 => {
+                downcast_filter!(Float64Type, values, predicate)
+            }
+            DataType::Date32 => {
+                downcast_filter!(Date32Type, values, predicate)
+            }
+            DataType::Date64 => {
+                downcast_filter!(Date64Type, values, predicate)
+            }
+            DataType::Time32(Second) => {
+                downcast_filter!(Time32SecondType, values, predicate)
+            }
+            DataType::Time32(Millisecond) => {
+                downcast_filter!(Time32MillisecondType, values, predicate)
+            }
+            DataType::Time64(Microsecond) => {
+                downcast_filter!(Time64MicrosecondType, values, predicate)
+            }
+            DataType::Time64(Nanosecond) => {
+                downcast_filter!(Time64NanosecondType, values, predicate)
+            }
+            DataType::Timestamp(Second, _) => {
+                downcast_filter!(TimestampSecondType, values, predicate)
+            }
+            DataType::Timestamp(Millisecond, _) => {
+                downcast_filter!(TimestampMillisecondType, values, predicate)
+            }
+            DataType::Timestamp(Microsecond, _) => {
+                downcast_filter!(TimestampMicrosecondType, values, predicate)
+            }
+            DataType::Timestamp(Nanosecond, _) => {
+                downcast_filter!(TimestampNanosecondType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::YearMonth) => {
+                downcast_filter!(IntervalYearMonthType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::DayTime) => {
+                downcast_filter!(IntervalDayTimeType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::MonthDayNano) => {
+                downcast_filter!(IntervalMonthDayNanoType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Second) => {
+                downcast_filter!(DurationSecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Millisecond) => {
+                downcast_filter!(DurationMillisecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Microsecond) => {
+                downcast_filter!(DurationMicrosecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Nanosecond) => {
+                downcast_filter!(DurationNanosecondType, values, predicate)
+            }
+            DataType::Utf8 => {
+                let values = values
+                    .as_any()
+                    .downcast_ref::<GenericStringArray<i32>>()
+                    .unwrap();
+                Ok(Arc::new(filter_string::<i32>(values, predicate)))
+            }
+            DataType::LargeUtf8 => {
+                let values = values
+                    .as_any()
+                    .downcast_ref::<GenericStringArray<i64>>()
+                    .unwrap();
+                Ok(Arc::new(filter_string::<i64>(values, predicate)))
+            }
+            DataType::Dictionary(key_type, _) => match key_type.as_ref() {
+                DataType::Int8 => downcast_dict_filter!(Int8Type, values, 
predicate),
+                DataType::Int16 => downcast_dict_filter!(Int16Type, values, 
predicate),
+                DataType::Int32 => downcast_dict_filter!(Int32Type, values, 
predicate),
+                DataType::Int64 => downcast_dict_filter!(Int64Type, values, 
predicate),
+                DataType::UInt8 => downcast_dict_filter!(UInt8Type, values, 
predicate),
+                DataType::UInt16 => downcast_dict_filter!(UInt16Type, values, 
predicate),
+                DataType::UInt32 => downcast_dict_filter!(UInt32Type, values, 
predicate),
+                DataType::UInt64 => downcast_dict_filter!(UInt64Type, values, 
predicate),
+                t => unimplemented!("Take not supported for dictionary key 
type {:?}", t),

Review comment:
       ```suggestion
                   t => unimplemented!("Filter not supported for dictionary key 
type {:?}", t),
   ```

##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -185,79 +279,559 @@ pub fn prep_null_mask_filter(filter: &BooleanArray) -> 
BooleanArray {
 /// # Ok(())
 /// # }
 /// ```
-pub fn filter(array: &dyn Array, predicate: &BooleanArray) -> Result<ArrayRef> 
{
-    if predicate.null_count() > 0 {
-        // this greatly simplifies subsequent filtering code
-        // now we only have a boolean mask to deal with
-        let predicate = prep_null_mask_filter(predicate);
-        return filter(array, &predicate);
+pub fn filter(values: &dyn Array, predicate: &BooleanArray) -> 
Result<ArrayRef> {
+    let predicate = FilterBuilder::new(predicate).build();
+    filter_array(values, &predicate)
+}
+
+/// Returns a new [RecordBatch] with arrays containing only values matching 
the filter.

Review comment:
       ❤️ 

##########
File path: arrow/benches/filter_kernels.rs
##########
@@ -42,12 +42,24 @@ fn add_benchmark(c: &mut Criterion) {
     let dense_filter_array = create_boolean_array(size, 0.0, 1.0 - 1.0 / 
1024.0);
     let sparse_filter_array = create_boolean_array(size, 0.0, 1.0 / 1024.0);
 
-    let filter = build_filter(&filter_array).unwrap();
-    let dense_filter = build_filter(&dense_filter_array).unwrap();
-    let sparse_filter = build_filter(&sparse_filter_array).unwrap();
+    let filter = FilterBuilder::new(&filter_array).optimize().build();
+    let dense_filter = 
FilterBuilder::new(&dense_filter_array).optimize().build();
+    let sparse_filter = 
FilterBuilder::new(&sparse_filter_array).optimize().build();
 
     let data_array = create_primitive_array::<UInt8Type>(size, 0.0);
 
+    c.bench_function("filter optimize", |b| {
+        b.iter(|| FilterBuilder::new(&filter_array).optimize().build())
+    });
+
+    c.bench_function("filter optimize high selectivity", |b| {

Review comment:
       ```suggestion
       c.bench_function("filter optimize high selectivity (1023/1024)", |b| {
   ```

##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -185,79 +279,559 @@ pub fn prep_null_mask_filter(filter: &BooleanArray) -> 
BooleanArray {
 /// # Ok(())
 /// # }
 /// ```
-pub fn filter(array: &dyn Array, predicate: &BooleanArray) -> Result<ArrayRef> 
{
-    if predicate.null_count() > 0 {
-        // this greatly simplifies subsequent filtering code
-        // now we only have a boolean mask to deal with
-        let predicate = prep_null_mask_filter(predicate);
-        return filter(array, &predicate);
+pub fn filter(values: &dyn Array, predicate: &BooleanArray) -> 
Result<ArrayRef> {
+    let predicate = FilterBuilder::new(predicate).build();
+    filter_array(values, &predicate)
+}
+
+/// Returns a new [RecordBatch] with arrays containing only values matching 
the filter.
+pub fn filter_record_batch(
+    record_batch: &RecordBatch,
+    predicate: &BooleanArray,
+) -> Result<RecordBatch> {
+    let filter = FilterBuilder::new(predicate).optimize().build();
+
+    let filtered_arrays = record_batch
+        .columns()
+        .iter()
+        .map(|a| filter_array(a, &filter))
+        .collect::<Result<Vec<_>>>()?;
+
+    RecordBatch::try_new(record_batch.schema(), filtered_arrays)
+}
+
+/// A builder to construct [`FilterPredicate`]
+#[derive(Debug)]
+pub struct FilterBuilder {
+    filter: BooleanArray,
+    count: usize,
+    iterator: FilterIterator,
+}
+
+impl FilterBuilder {
+    /// Create a new [`FilterBuilder`] that can be used to construct a 
[`FilterPredicate`]
+    pub fn new(filter: &BooleanArray) -> Self {
+        let filter = match filter.null_count() {
+            0 => BooleanArray::from(filter.data().clone()),
+            _ => prep_null_mask_filter(filter),

Review comment:
       is the idea here that by setting the filter to `false` for rows that 
will be null in the output, that we skip copying array values which are null?

##########
File path: arrow/src/util/bench_util.rs
##########
@@ -110,6 +110,28 @@ pub fn create_string_array<Offset: StringOffsetSizeTrait>(
         .collect()
 }
 
+/// Creates an random (but fixed-seeded) array of a given size and null density

Review comment:
       ```suggestion
   /// Creates an random (but fixed-seeded) array of a given size and null 
density
   /// consisting of random 4 character alphanumeric strings
   ```

##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -119,17 +147,83 @@ impl<'a> Iterator for SlicesIterator<'a> {
     }
 }
 
+/// An iterator of `usize` whose index in [`BooleanArray`] is true
+///
+/// This provides the best performance on all but the most selective 
predicates, where the

Review comment:
       ```suggestion
   /// This provides the best performance on all but the least selective 
predicates (which keep most / all rows), where the
   ```

##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -185,79 +279,559 @@ pub fn prep_null_mask_filter(filter: &BooleanArray) -> 
BooleanArray {
 /// # Ok(())
 /// # }
 /// ```
-pub fn filter(array: &dyn Array, predicate: &BooleanArray) -> Result<ArrayRef> 
{
-    if predicate.null_count() > 0 {
-        // this greatly simplifies subsequent filtering code
-        // now we only have a boolean mask to deal with
-        let predicate = prep_null_mask_filter(predicate);
-        return filter(array, &predicate);
+pub fn filter(values: &dyn Array, predicate: &BooleanArray) -> 
Result<ArrayRef> {
+    let predicate = FilterBuilder::new(predicate).build();
+    filter_array(values, &predicate)
+}
+
+/// Returns a new [RecordBatch] with arrays containing only values matching 
the filter.
+pub fn filter_record_batch(
+    record_batch: &RecordBatch,
+    predicate: &BooleanArray,
+) -> Result<RecordBatch> {
+    let filter = FilterBuilder::new(predicate).optimize().build();
+
+    let filtered_arrays = record_batch
+        .columns()
+        .iter()
+        .map(|a| filter_array(a, &filter))
+        .collect::<Result<Vec<_>>>()?;
+
+    RecordBatch::try_new(record_batch.schema(), filtered_arrays)
+}
+
+/// A builder to construct [`FilterPredicate`]
+#[derive(Debug)]
+pub struct FilterBuilder {
+    filter: BooleanArray,
+    count: usize,
+    iterator: FilterIterator,
+}
+
+impl FilterBuilder {
+    /// Create a new [`FilterBuilder`] that can be used to construct a 
[`FilterPredicate`]
+    pub fn new(filter: &BooleanArray) -> Self {
+        let filter = match filter.null_count() {
+            0 => BooleanArray::from(filter.data().clone()),
+            _ => prep_null_mask_filter(filter),
+        };
+
+        let count = filter_count(&filter);
+        let selectivity_frac = count as f64 / filter.len() as f64;
+        let iterator = if selectivity_frac > 0.8 {

Review comment:
       I think some rationale for the `0.8` number in some comments might be 
justified here. Using a named symbolic constant like 
`FILTER_SLICES_SELECTIVTY_THRESHOLD` might also help document the rationale 
some more

##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -185,79 +279,559 @@ pub fn prep_null_mask_filter(filter: &BooleanArray) -> 
BooleanArray {
 /// # Ok(())
 /// # }
 /// ```
-pub fn filter(array: &dyn Array, predicate: &BooleanArray) -> Result<ArrayRef> 
{
-    if predicate.null_count() > 0 {
-        // this greatly simplifies subsequent filtering code
-        // now we only have a boolean mask to deal with
-        let predicate = prep_null_mask_filter(predicate);
-        return filter(array, &predicate);
+pub fn filter(values: &dyn Array, predicate: &BooleanArray) -> 
Result<ArrayRef> {
+    let predicate = FilterBuilder::new(predicate).build();
+    filter_array(values, &predicate)
+}
+
+/// Returns a new [RecordBatch] with arrays containing only values matching 
the filter.
+pub fn filter_record_batch(
+    record_batch: &RecordBatch,
+    predicate: &BooleanArray,
+) -> Result<RecordBatch> {
+    let filter = FilterBuilder::new(predicate).optimize().build();
+
+    let filtered_arrays = record_batch
+        .columns()
+        .iter()
+        .map(|a| filter_array(a, &filter))
+        .collect::<Result<Vec<_>>>()?;
+
+    RecordBatch::try_new(record_batch.schema(), filtered_arrays)
+}
+
+/// A builder to construct [`FilterPredicate`]
+#[derive(Debug)]
+pub struct FilterBuilder {
+    filter: BooleanArray,
+    count: usize,
+    iterator: FilterIterator,
+}
+
+impl FilterBuilder {
+    /// Create a new [`FilterBuilder`] that can be used to construct a 
[`FilterPredicate`]
+    pub fn new(filter: &BooleanArray) -> Self {
+        let filter = match filter.null_count() {
+            0 => BooleanArray::from(filter.data().clone()),
+            _ => prep_null_mask_filter(filter),
+        };
+
+        let count = filter_count(&filter);
+        let selectivity_frac = count as f64 / filter.len() as f64;
+        let iterator = if selectivity_frac > 0.8 {
+            FilterIterator::SlicesIterator
+        } else {
+            FilterIterator::IndexIterator
+        };
+
+        Self {
+            filter,
+            count,
+            iterator,
+        }
+    }
+
+    /// Compute an optimised representation of the provided `filter` mask that 
can be
+    /// applied to an array more quickly.
+    ///

Review comment:
       I see -- this effectively materializes the list of `usize` as a `Vec` 👍 

##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -696,7 +1273,8 @@ mod tests {
             .flat_map(|(idx, v)| v.then(|| idx))
             .collect();
 
-        assert_eq!(bits, expected_bits);
+        assert_eq!(slice_bits, expected_bits);
+        assert_eq!(index_bits, expected_bits);
     }
 
     #[test]

Review comment:
       Do you feel good about the level of testing in this module (specifically 
with filters of selectivity 0.8 and higher / lower)?

##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -185,79 +279,559 @@ pub fn prep_null_mask_filter(filter: &BooleanArray) -> 
BooleanArray {
 /// # Ok(())
 /// # }
 /// ```
-pub fn filter(array: &dyn Array, predicate: &BooleanArray) -> Result<ArrayRef> 
{
-    if predicate.null_count() > 0 {
-        // this greatly simplifies subsequent filtering code
-        // now we only have a boolean mask to deal with
-        let predicate = prep_null_mask_filter(predicate);
-        return filter(array, &predicate);
+pub fn filter(values: &dyn Array, predicate: &BooleanArray) -> 
Result<ArrayRef> {
+    let predicate = FilterBuilder::new(predicate).build();
+    filter_array(values, &predicate)
+}
+
+/// Returns a new [RecordBatch] with arrays containing only values matching 
the filter.
+pub fn filter_record_batch(
+    record_batch: &RecordBatch,
+    predicate: &BooleanArray,
+) -> Result<RecordBatch> {
+    let filter = FilterBuilder::new(predicate).optimize().build();
+
+    let filtered_arrays = record_batch
+        .columns()
+        .iter()
+        .map(|a| filter_array(a, &filter))
+        .collect::<Result<Vec<_>>>()?;
+
+    RecordBatch::try_new(record_batch.schema(), filtered_arrays)
+}
+
+/// A builder to construct [`FilterPredicate`]
+#[derive(Debug)]
+pub struct FilterBuilder {
+    filter: BooleanArray,
+    count: usize,
+    iterator: FilterIterator,
+}
+
+impl FilterBuilder {
+    /// Create a new [`FilterBuilder`] that can be used to construct a 
[`FilterPredicate`]
+    pub fn new(filter: &BooleanArray) -> Self {
+        let filter = match filter.null_count() {
+            0 => BooleanArray::from(filter.data().clone()),
+            _ => prep_null_mask_filter(filter),
+        };
+
+        let count = filter_count(&filter);
+        let selectivity_frac = count as f64 / filter.len() as f64;
+        let iterator = if selectivity_frac > 0.8 {
+            FilterIterator::SlicesIterator
+        } else {
+            FilterIterator::IndexIterator
+        };
+
+        Self {
+            filter,
+            count,
+            iterator,
+        }
+    }
+
+    /// Compute an optimised representation of the provided `filter` mask that 
can be
+    /// applied to an array more quickly.
+    ///
+    /// Note: There is limited benefit to calling this to then filter a single 
array
+    /// Note: This will likely have a larger memory footprint than the 
original mask
+    pub fn optimize(mut self) -> Self {
+        match self.iterator {
+            FilterIterator::SlicesIterator => {
+                let slices = SlicesIterator::new(&self.filter).collect();
+                self.iterator = FilterIterator::Slices(slices)
+            }
+            FilterIterator::IndexIterator => {
+                let indices = IndexIterator::new(&self.filter, 
self.count).collect();
+                self.iterator = FilterIterator::Indices(indices)
+            }
+            _ => {}
+        }
+        self
+    }
+
+    /// Construct the final `FilterPredicate`
+    pub fn build(self) -> FilterPredicate {
+        FilterPredicate {
+            filter: self.filter,
+            count: self.count,
+            iterator: self.iterator,
+        }
     }
+}
 
-    let filter_count = filter_count(predicate);
+/// The internal iterator type of [`FilterPredicate`]
+#[derive(Debug)]
+enum FilterIterator {
+    // A lazily evaluated iterator of ranges
+    SlicesIterator,
+    // A lazily evaluated iterator of indices
+    IndexIterator,
+    // A precomputed list of indices
+    Indices(Vec<usize>),
+    // A precomputed array of ranges
+    Slices(Vec<(usize, usize)>),
+}
+
+/// A filtering predicate that can be applied to an [`Array`]
+#[derive(Debug)]
+pub struct FilterPredicate {
+    filter: BooleanArray,
+    count: usize,
+    iterator: FilterIterator,
+}
 
-    match filter_count {
+impl FilterPredicate {
+    /// Selects rows from `values` based on this [`FilterPredicate`]
+    pub fn filter(&self, values: &dyn Array) -> Result<ArrayRef> {
+        filter_array(values, self)
+    }
+}
+
+fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> 
Result<ArrayRef> {
+    if predicate.filter.len() > values.len() {
+        return Err(ArrowError::InvalidArgumentError(format!(
+            "Filter predicate of length {} is larger than target array of 
length {}",
+            predicate.filter.len(),
+            values.len()
+        )));
+    }
+
+    match predicate.count {
         0 => {
             // return empty
-            Ok(new_empty_array(array.data_type()))
+            Ok(new_empty_array(values.data_type()))
         }
-        len if len == array.len() => {
+        len if len == values.len() => {
             // return all
-            let data = array.data().clone();
+            let data = values.data().clone();
             Ok(make_array(data))
         }
-        _ => {
-            // actually filter
-            let mut mutable =
-                MutableArrayData::new(vec![array.data_ref()], false, 
filter_count);
+        // actually filter
+        _ => match values.data_type() {
+            DataType::Boolean => {
+                let values = 
values.as_any().downcast_ref::<BooleanArray>().unwrap();
+                Ok(Arc::new(filter_boolean(values, predicate)))
+            }
+            DataType::Int8 => {
+                downcast_filter!(Int8Type, values, predicate)
+            }
+            DataType::Int16 => {
+                downcast_filter!(Int16Type, values, predicate)
+            }
+            DataType::Int32 => {
+                downcast_filter!(Int32Type, values, predicate)
+            }
+            DataType::Int64 => {
+                downcast_filter!(Int64Type, values, predicate)
+            }
+            DataType::UInt8 => {
+                downcast_filter!(UInt8Type, values, predicate)
+            }
+            DataType::UInt16 => {
+                downcast_filter!(UInt16Type, values, predicate)
+            }
+            DataType::UInt32 => {
+                downcast_filter!(UInt32Type, values, predicate)
+            }
+            DataType::UInt64 => {
+                downcast_filter!(UInt64Type, values, predicate)
+            }
+            DataType::Float32 => {
+                downcast_filter!(Float32Type, values, predicate)
+            }
+            DataType::Float64 => {
+                downcast_filter!(Float64Type, values, predicate)
+            }
+            DataType::Date32 => {
+                downcast_filter!(Date32Type, values, predicate)
+            }
+            DataType::Date64 => {
+                downcast_filter!(Date64Type, values, predicate)
+            }
+            DataType::Time32(Second) => {
+                downcast_filter!(Time32SecondType, values, predicate)
+            }
+            DataType::Time32(Millisecond) => {
+                downcast_filter!(Time32MillisecondType, values, predicate)
+            }
+            DataType::Time64(Microsecond) => {
+                downcast_filter!(Time64MicrosecondType, values, predicate)
+            }
+            DataType::Time64(Nanosecond) => {
+                downcast_filter!(Time64NanosecondType, values, predicate)
+            }
+            DataType::Timestamp(Second, _) => {
+                downcast_filter!(TimestampSecondType, values, predicate)
+            }
+            DataType::Timestamp(Millisecond, _) => {
+                downcast_filter!(TimestampMillisecondType, values, predicate)
+            }
+            DataType::Timestamp(Microsecond, _) => {
+                downcast_filter!(TimestampMicrosecondType, values, predicate)
+            }
+            DataType::Timestamp(Nanosecond, _) => {
+                downcast_filter!(TimestampNanosecondType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::YearMonth) => {
+                downcast_filter!(IntervalYearMonthType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::DayTime) => {
+                downcast_filter!(IntervalDayTimeType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::MonthDayNano) => {
+                downcast_filter!(IntervalMonthDayNanoType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Second) => {
+                downcast_filter!(DurationSecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Millisecond) => {
+                downcast_filter!(DurationMillisecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Microsecond) => {
+                downcast_filter!(DurationMicrosecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Nanosecond) => {
+                downcast_filter!(DurationNanosecondType, values, predicate)
+            }
+            DataType::Utf8 => {
+                let values = values
+                    .as_any()
+                    .downcast_ref::<GenericStringArray<i32>>()
+                    .unwrap();
+                Ok(Arc::new(filter_string::<i32>(values, predicate)))
+            }
+            DataType::LargeUtf8 => {
+                let values = values
+                    .as_any()
+                    .downcast_ref::<GenericStringArray<i64>>()
+                    .unwrap();
+                Ok(Arc::new(filter_string::<i64>(values, predicate)))
+            }
+            DataType::Dictionary(key_type, _) => match key_type.as_ref() {
+                DataType::Int8 => downcast_dict_filter!(Int8Type, values, 
predicate),
+                DataType::Int16 => downcast_dict_filter!(Int16Type, values, 
predicate),
+                DataType::Int32 => downcast_dict_filter!(Int32Type, values, 
predicate),
+                DataType::Int64 => downcast_dict_filter!(Int64Type, values, 
predicate),
+                DataType::UInt8 => downcast_dict_filter!(UInt8Type, values, 
predicate),
+                DataType::UInt16 => downcast_dict_filter!(UInt16Type, values, 
predicate),
+                DataType::UInt32 => downcast_dict_filter!(UInt32Type, values, 
predicate),
+                DataType::UInt64 => downcast_dict_filter!(UInt64Type, values, 
predicate),
+                t => unimplemented!("Take not supported for dictionary key 
type {:?}", t),
+            },
+            _ => {
+                // fallback to using MutableArrayData
+                let mut mutable = MutableArrayData::new(
+                    vec![values.data_ref()],
+                    false,
+                    predicate.count,
+                );
+
+                let iter = SlicesIterator::new(&predicate.filter);
+                iter.for_each(|(start, end)| mutable.extend(0, start, end));
+
+                let data = mutable.freeze();
+                Ok(make_array(data))
+            }
+        },
+    }
+}
+
+/// Computes a new null mask for `data` based on `predicate`
+///
+/// Returns `None` if no nulls in the result
+fn filter_null_mask(
+    data: &ArrayData,
+    predicate: &FilterPredicate,
+) -> Option<(usize, Buffer)> {
+    if data.null_count() == 0 {
+        return None;
+    }
+
+    let nulls = filter_bits(data.null_buffer()?, data.offset(), predicate);
+    let null_count = predicate.count - nulls.count_set_bits();
+
+    if null_count == 0 {
+        return None;
+    }
+
+    Some((null_count, nulls))
+}
+
+/// Filter the packed bitmask `buffer`, with `predicate` starting at bit 
offset `offset`
+fn filter_bits(buffer: &Buffer, offset: usize, predicate: &FilterPredicate) -> 
Buffer {
+    let src = buffer.as_slice();
+
+    match &predicate.iterator {
+        FilterIterator::IndexIterator => {
+            let bits = IndexIterator::new(&predicate.filter, predicate.count)
+                .map(|src_idx| bit_util::get_bit(src, src_idx + offset));
+
+            // SAFETY: `IndexIterator` reports its size correctly
+            unsafe { MutableBuffer::from_trusted_len_iter_bool(bits).into() }
+        }
+        FilterIterator::Indices(indices) => {
+            let bits = indices
+                .iter()
+                .map(|src_idx| bit_util::get_bit(src, *src_idx + offset));
 
-            let iter = SlicesIterator::new(predicate);
-            iter.for_each(|(start, end)| mutable.extend(0, start, end));
+            // SAFETY: `Vec::iter()` reports its size correctly
+            unsafe { MutableBuffer::from_trusted_len_iter_bool(bits).into() }
+        }
+        FilterIterator::SlicesIterator => {
+            let mut builder =
+                BooleanBufferBuilder::new(bit_util::ceil(predicate.count, 8));
+            for (start, end) in SlicesIterator::new(&predicate.filter) {
+                builder.append_packed_range(start..end, src)
+            }
+            builder.finish()
+        }
+        FilterIterator::Slices(slices) => {
+            let mut builder =
+                BooleanBufferBuilder::new(bit_util::ceil(predicate.count, 8));
+            for (start, end) in slices {
+                builder.append_packed_range(*start..*end, src)
+            }
+            builder.finish()
+        }
+    }
+}
 
-            let data = mutable.freeze();
-            Ok(make_array(data))
+/// `filter` implementation for boolean buffers
+fn filter_boolean(values: &BooleanArray, predicate: &FilterPredicate) -> 
BooleanArray {
+    let data = values.data();
+    assert_eq!(data.buffers().len(), 1);
+    assert_eq!(data.child_data().len(), 0);
+
+    let values = filter_bits(&data.buffers()[0], data.offset(), predicate);
+
+    let mut builder = ArrayDataBuilder::new(DataType::Boolean)
+        .len(predicate.count)
+        .add_buffer(values);
+
+    if let Some((null_count, nulls)) = filter_null_mask(data, predicate) {
+        builder = builder.null_count(null_count).null_bit_buffer(nulls);
+    }
+
+    let data = unsafe { builder.build_unchecked() };
+    BooleanArray::from(data)
+}
+
+/// `filter` implementation for primitive arrays
+fn filter_primitive<T>(
+    values: &PrimitiveArray<T>,
+    predicate: &FilterPredicate,
+) -> PrimitiveArray<T>
+where
+    T: ArrowPrimitiveType,
+{
+    let data = values.data();
+    assert_eq!(data.buffers().len(), 1);
+    assert_eq!(data.child_data().len(), 0);
+
+    let values = data.buffer::<T::Native>(0);
+    assert!(values.len() >= predicate.filter.len());
+
+    let buffer = match &predicate.iterator {
+        FilterIterator::SlicesIterator => {
+            let mut buffer =
+                MutableBuffer::with_capacity(predicate.count * 
T::get_byte_width());
+            for (start, end) in SlicesIterator::new(&predicate.filter) {
+                buffer.extend_from_slice(&values[start..end]);
+            }
+            buffer
         }
+        FilterIterator::Slices(slices) => {
+            let mut buffer =
+                MutableBuffer::with_capacity(predicate.count * 
T::get_byte_width());
+            for (start, end) in slices {
+                buffer.extend_from_slice(&values[*start..*end]);
+            }
+            buffer
+        }
+        FilterIterator::IndexIterator => {
+            let iter =
+                IndexIterator::new(&predicate.filter, predicate.count).map(|x| 
values[x]);
+
+            // SAFETY: IndexIterator is trusted length
+            unsafe { MutableBuffer::from_trusted_len_iter(iter) }
+        }
+        FilterIterator::Indices(indices) => {
+            let iter = indices.iter().map(|x| values[*x]);
+
+            // SAFETY: `Vec::iter` is trusted length
+            unsafe { MutableBuffer::from_trusted_len_iter(iter) }
+        }
+    };
+
+    let mut builder = ArrayDataBuilder::new(data.data_type().clone())
+        .len(predicate.count)
+        .add_buffer(buffer.into());
+
+    if let Some((null_count, nulls)) = filter_null_mask(data, predicate) {
+        builder = builder.null_count(null_count).null_bit_buffer(nulls);
     }
+
+    let data = unsafe { builder.build_unchecked() };
+    PrimitiveArray::from(data)
 }
 
-/// Returns a new [RecordBatch] with arrays containing only values matching 
the filter.
-pub fn filter_record_batch(
-    record_batch: &RecordBatch,
-    predicate: &BooleanArray,
-) -> Result<RecordBatch> {
-    if predicate.null_count() > 0 {
-        // this greatly simplifies subsequent filtering code
-        // now we only have a boolean mask to deal with
-        let predicate = prep_null_mask_filter(predicate);
-        return filter_record_batch(record_batch, &predicate);
+/// [`FilterString`] is created from a source [`GenericStringArray`] and can be
+/// used to build a new [`GenericStringArray`] by copying values from the 
source
+///
+/// TODO(raphael): Could this be used for the take kernel as well?
+struct FilterString<'a, OffsetSize> {
+    src_offsets: &'a [OffsetSize],
+    src_values: &'a [u8],
+    dst_offsets: MutableBuffer,
+    dst_values: MutableBuffer,

Review comment:
       I wonder if using `GenericStringBuilder<>` here would be possible

##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -119,17 +147,83 @@ impl<'a> Iterator for SlicesIterator<'a> {
     }
 }
 
+/// An iterator of `usize` whose index in [`BooleanArray`] is true
+///
+/// This provides the best performance on all but the most selective 
predicates, where the

Review comment:
       Is it also worth mentioning that it requires the filter to be entirely 
non-null?

##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -185,79 +279,559 @@ pub fn prep_null_mask_filter(filter: &BooleanArray) -> 
BooleanArray {
 /// # Ok(())
 /// # }
 /// ```
-pub fn filter(array: &dyn Array, predicate: &BooleanArray) -> Result<ArrayRef> 
{
-    if predicate.null_count() > 0 {
-        // this greatly simplifies subsequent filtering code
-        // now we only have a boolean mask to deal with
-        let predicate = prep_null_mask_filter(predicate);
-        return filter(array, &predicate);
+pub fn filter(values: &dyn Array, predicate: &BooleanArray) -> 
Result<ArrayRef> {
+    let predicate = FilterBuilder::new(predicate).build();
+    filter_array(values, &predicate)
+}
+
+/// Returns a new [RecordBatch] with arrays containing only values matching 
the filter.
+pub fn filter_record_batch(
+    record_batch: &RecordBatch,
+    predicate: &BooleanArray,
+) -> Result<RecordBatch> {
+    let filter = FilterBuilder::new(predicate).optimize().build();
+
+    let filtered_arrays = record_batch
+        .columns()
+        .iter()
+        .map(|a| filter_array(a, &filter))
+        .collect::<Result<Vec<_>>>()?;
+
+    RecordBatch::try_new(record_batch.schema(), filtered_arrays)
+}
+
+/// A builder to construct [`FilterPredicate`]
+#[derive(Debug)]
+pub struct FilterBuilder {
+    filter: BooleanArray,
+    count: usize,
+    iterator: FilterIterator,
+}
+
+impl FilterBuilder {
+    /// Create a new [`FilterBuilder`] that can be used to construct a 
[`FilterPredicate`]
+    pub fn new(filter: &BooleanArray) -> Self {
+        let filter = match filter.null_count() {
+            0 => BooleanArray::from(filter.data().clone()),
+            _ => prep_null_mask_filter(filter),
+        };
+
+        let count = filter_count(&filter);
+        let selectivity_frac = count as f64 / filter.len() as f64;
+        let iterator = if selectivity_frac > 0.8 {
+            FilterIterator::SlicesIterator
+        } else {
+            FilterIterator::IndexIterator
+        };
+
+        Self {
+            filter,
+            count,
+            iterator,
+        }
+    }
+
+    /// Compute an optimised representation of the provided `filter` mask that 
can be
+    /// applied to an array more quickly.
+    ///
+    /// Note: There is limited benefit to calling this to then filter a single 
array
+    /// Note: This will likely have a larger memory footprint than the 
original mask
+    pub fn optimize(mut self) -> Self {
+        match self.iterator {
+            FilterIterator::SlicesIterator => {
+                let slices = SlicesIterator::new(&self.filter).collect();
+                self.iterator = FilterIterator::Slices(slices)
+            }
+            FilterIterator::IndexIterator => {
+                let indices = IndexIterator::new(&self.filter, 
self.count).collect();
+                self.iterator = FilterIterator::Indices(indices)
+            }
+            _ => {}
+        }
+        self
+    }
+
+    /// Construct the final `FilterPredicate`
+    pub fn build(self) -> FilterPredicate {
+        FilterPredicate {
+            filter: self.filter,
+            count: self.count,
+            iterator: self.iterator,
+        }
     }
+}
 
-    let filter_count = filter_count(predicate);
+/// The internal iterator type of [`FilterPredicate`]
+#[derive(Debug)]
+enum FilterIterator {
+    // A lazily evaluated iterator of ranges
+    SlicesIterator,
+    // A lazily evaluated iterator of indices
+    IndexIterator,
+    // A precomputed list of indices
+    Indices(Vec<usize>),
+    // A precomputed array of ranges
+    Slices(Vec<(usize, usize)>),
+}
+
+/// A filtering predicate that can be applied to an [`Array`]
+#[derive(Debug)]
+pub struct FilterPredicate {
+    filter: BooleanArray,
+    count: usize,
+    iterator: FilterIterator,
+}
 
-    match filter_count {
+impl FilterPredicate {
+    /// Selects rows from `values` based on this [`FilterPredicate`]
+    pub fn filter(&self, values: &dyn Array) -> Result<ArrayRef> {
+        filter_array(values, self)
+    }
+}
+
+fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> 
Result<ArrayRef> {
+    if predicate.filter.len() > values.len() {
+        return Err(ArrowError::InvalidArgumentError(format!(
+            "Filter predicate of length {} is larger than target array of 
length {}",
+            predicate.filter.len(),
+            values.len()
+        )));
+    }
+
+    match predicate.count {
         0 => {
             // return empty
-            Ok(new_empty_array(array.data_type()))
+            Ok(new_empty_array(values.data_type()))
         }
-        len if len == array.len() => {
+        len if len == values.len() => {
             // return all
-            let data = array.data().clone();
+            let data = values.data().clone();
             Ok(make_array(data))
         }
-        _ => {
-            // actually filter
-            let mut mutable =
-                MutableArrayData::new(vec![array.data_ref()], false, 
filter_count);
+        // actually filter
+        _ => match values.data_type() {
+            DataType::Boolean => {
+                let values = 
values.as_any().downcast_ref::<BooleanArray>().unwrap();
+                Ok(Arc::new(filter_boolean(values, predicate)))
+            }
+            DataType::Int8 => {
+                downcast_filter!(Int8Type, values, predicate)
+            }
+            DataType::Int16 => {
+                downcast_filter!(Int16Type, values, predicate)
+            }
+            DataType::Int32 => {
+                downcast_filter!(Int32Type, values, predicate)
+            }
+            DataType::Int64 => {
+                downcast_filter!(Int64Type, values, predicate)
+            }
+            DataType::UInt8 => {
+                downcast_filter!(UInt8Type, values, predicate)
+            }
+            DataType::UInt16 => {
+                downcast_filter!(UInt16Type, values, predicate)
+            }
+            DataType::UInt32 => {
+                downcast_filter!(UInt32Type, values, predicate)
+            }
+            DataType::UInt64 => {
+                downcast_filter!(UInt64Type, values, predicate)
+            }
+            DataType::Float32 => {
+                downcast_filter!(Float32Type, values, predicate)
+            }
+            DataType::Float64 => {
+                downcast_filter!(Float64Type, values, predicate)
+            }
+            DataType::Date32 => {
+                downcast_filter!(Date32Type, values, predicate)
+            }
+            DataType::Date64 => {
+                downcast_filter!(Date64Type, values, predicate)
+            }
+            DataType::Time32(Second) => {
+                downcast_filter!(Time32SecondType, values, predicate)
+            }
+            DataType::Time32(Millisecond) => {
+                downcast_filter!(Time32MillisecondType, values, predicate)
+            }
+            DataType::Time64(Microsecond) => {
+                downcast_filter!(Time64MicrosecondType, values, predicate)
+            }
+            DataType::Time64(Nanosecond) => {
+                downcast_filter!(Time64NanosecondType, values, predicate)
+            }
+            DataType::Timestamp(Second, _) => {
+                downcast_filter!(TimestampSecondType, values, predicate)
+            }
+            DataType::Timestamp(Millisecond, _) => {
+                downcast_filter!(TimestampMillisecondType, values, predicate)
+            }
+            DataType::Timestamp(Microsecond, _) => {
+                downcast_filter!(TimestampMicrosecondType, values, predicate)
+            }
+            DataType::Timestamp(Nanosecond, _) => {
+                downcast_filter!(TimestampNanosecondType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::YearMonth) => {
+                downcast_filter!(IntervalYearMonthType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::DayTime) => {
+                downcast_filter!(IntervalDayTimeType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::MonthDayNano) => {
+                downcast_filter!(IntervalMonthDayNanoType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Second) => {
+                downcast_filter!(DurationSecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Millisecond) => {
+                downcast_filter!(DurationMillisecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Microsecond) => {
+                downcast_filter!(DurationMicrosecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Nanosecond) => {
+                downcast_filter!(DurationNanosecondType, values, predicate)
+            }
+            DataType::Utf8 => {
+                let values = values
+                    .as_any()
+                    .downcast_ref::<GenericStringArray<i32>>()
+                    .unwrap();
+                Ok(Arc::new(filter_string::<i32>(values, predicate)))
+            }
+            DataType::LargeUtf8 => {
+                let values = values
+                    .as_any()
+                    .downcast_ref::<GenericStringArray<i64>>()
+                    .unwrap();
+                Ok(Arc::new(filter_string::<i64>(values, predicate)))
+            }
+            DataType::Dictionary(key_type, _) => match key_type.as_ref() {
+                DataType::Int8 => downcast_dict_filter!(Int8Type, values, 
predicate),
+                DataType::Int16 => downcast_dict_filter!(Int16Type, values, 
predicate),
+                DataType::Int32 => downcast_dict_filter!(Int32Type, values, 
predicate),
+                DataType::Int64 => downcast_dict_filter!(Int64Type, values, 
predicate),
+                DataType::UInt8 => downcast_dict_filter!(UInt8Type, values, 
predicate),
+                DataType::UInt16 => downcast_dict_filter!(UInt16Type, values, 
predicate),
+                DataType::UInt32 => downcast_dict_filter!(UInt32Type, values, 
predicate),
+                DataType::UInt64 => downcast_dict_filter!(UInt64Type, values, 
predicate),
+                t => unimplemented!("Take not supported for dictionary key 
type {:?}", t),
+            },
+            _ => {
+                // fallback to using MutableArrayData
+                let mut mutable = MutableArrayData::new(
+                    vec![values.data_ref()],
+                    false,
+                    predicate.count,
+                );
+
+                let iter = SlicesIterator::new(&predicate.filter);
+                iter.for_each(|(start, end)| mutable.extend(0, start, end));
+
+                let data = mutable.freeze();
+                Ok(make_array(data))
+            }
+        },
+    }
+}
+
+/// Computes a new null mask for `data` based on `predicate`
+///
+/// Returns `None` if no nulls in the result
+fn filter_null_mask(
+    data: &ArrayData,
+    predicate: &FilterPredicate,
+) -> Option<(usize, Buffer)> {
+    if data.null_count() == 0 {
+        return None;
+    }
+
+    let nulls = filter_bits(data.null_buffer()?, data.offset(), predicate);
+    let null_count = predicate.count - nulls.count_set_bits();
+
+    if null_count == 0 {
+        return None;
+    }
+
+    Some((null_count, nulls))
+}
+
+/// Filter the packed bitmask `buffer`, with `predicate` starting at bit 
offset `offset`
+fn filter_bits(buffer: &Buffer, offset: usize, predicate: &FilterPredicate) -> 
Buffer {
+    let src = buffer.as_slice();
+
+    match &predicate.iterator {
+        FilterIterator::IndexIterator => {
+            let bits = IndexIterator::new(&predicate.filter, predicate.count)
+                .map(|src_idx| bit_util::get_bit(src, src_idx + offset));
+
+            // SAFETY: `IndexIterator` reports its size correctly
+            unsafe { MutableBuffer::from_trusted_len_iter_bool(bits).into() }
+        }
+        FilterIterator::Indices(indices) => {
+            let bits = indices
+                .iter()
+                .map(|src_idx| bit_util::get_bit(src, *src_idx + offset));
 
-            let iter = SlicesIterator::new(predicate);
-            iter.for_each(|(start, end)| mutable.extend(0, start, end));
+            // SAFETY: `Vec::iter()` reports its size correctly
+            unsafe { MutableBuffer::from_trusted_len_iter_bool(bits).into() }
+        }
+        FilterIterator::SlicesIterator => {
+            let mut builder =
+                BooleanBufferBuilder::new(bit_util::ceil(predicate.count, 8));
+            for (start, end) in SlicesIterator::new(&predicate.filter) {
+                builder.append_packed_range(start..end, src)
+            }
+            builder.finish()
+        }
+        FilterIterator::Slices(slices) => {
+            let mut builder =
+                BooleanBufferBuilder::new(bit_util::ceil(predicate.count, 8));
+            for (start, end) in slices {
+                builder.append_packed_range(*start..*end, src)
+            }
+            builder.finish()
+        }
+    }
+}
 
-            let data = mutable.freeze();
-            Ok(make_array(data))
+/// `filter` implementation for boolean buffers
+fn filter_boolean(values: &BooleanArray, predicate: &FilterPredicate) -> 
BooleanArray {
+    let data = values.data();
+    assert_eq!(data.buffers().len(), 1);
+    assert_eq!(data.child_data().len(), 0);
+
+    let values = filter_bits(&data.buffers()[0], data.offset(), predicate);
+
+    let mut builder = ArrayDataBuilder::new(DataType::Boolean)
+        .len(predicate.count)
+        .add_buffer(values);
+
+    if let Some((null_count, nulls)) = filter_null_mask(data, predicate) {
+        builder = builder.null_count(null_count).null_bit_buffer(nulls);
+    }
+
+    let data = unsafe { builder.build_unchecked() };
+    BooleanArray::from(data)
+}
+
+/// `filter` implementation for primitive arrays
+fn filter_primitive<T>(
+    values: &PrimitiveArray<T>,
+    predicate: &FilterPredicate,
+) -> PrimitiveArray<T>
+where
+    T: ArrowPrimitiveType,
+{
+    let data = values.data();
+    assert_eq!(data.buffers().len(), 1);
+    assert_eq!(data.child_data().len(), 0);
+
+    let values = data.buffer::<T::Native>(0);
+    assert!(values.len() >= predicate.filter.len());
+
+    let buffer = match &predicate.iterator {
+        FilterIterator::SlicesIterator => {
+            let mut buffer =
+                MutableBuffer::with_capacity(predicate.count * 
T::get_byte_width());
+            for (start, end) in SlicesIterator::new(&predicate.filter) {
+                buffer.extend_from_slice(&values[start..end]);
+            }
+            buffer
         }
+        FilterIterator::Slices(slices) => {
+            let mut buffer =
+                MutableBuffer::with_capacity(predicate.count * 
T::get_byte_width());
+            for (start, end) in slices {
+                buffer.extend_from_slice(&values[*start..*end]);
+            }
+            buffer
+        }
+        FilterIterator::IndexIterator => {
+            let iter =
+                IndexIterator::new(&predicate.filter, predicate.count).map(|x| 
values[x]);
+
+            // SAFETY: IndexIterator is trusted length
+            unsafe { MutableBuffer::from_trusted_len_iter(iter) }
+        }
+        FilterIterator::Indices(indices) => {
+            let iter = indices.iter().map(|x| values[*x]);
+
+            // SAFETY: `Vec::iter` is trusted length
+            unsafe { MutableBuffer::from_trusted_len_iter(iter) }
+        }
+    };
+
+    let mut builder = ArrayDataBuilder::new(data.data_type().clone())
+        .len(predicate.count)
+        .add_buffer(buffer.into());
+
+    if let Some((null_count, nulls)) = filter_null_mask(data, predicate) {
+        builder = builder.null_count(null_count).null_bit_buffer(nulls);
     }
+
+    let data = unsafe { builder.build_unchecked() };
+    PrimitiveArray::from(data)
 }
 
-/// Returns a new [RecordBatch] with arrays containing only values matching 
the filter.
-pub fn filter_record_batch(
-    record_batch: &RecordBatch,
-    predicate: &BooleanArray,
-) -> Result<RecordBatch> {
-    if predicate.null_count() > 0 {
-        // this greatly simplifies subsequent filtering code
-        // now we only have a boolean mask to deal with
-        let predicate = prep_null_mask_filter(predicate);
-        return filter_record_batch(record_batch, &predicate);
+/// [`FilterString`] is created from a source [`GenericStringArray`] and can be
+/// used to build a new [`GenericStringArray`] by copying values from the 
source
+///
+/// TODO(raphael): Could this be used for the take kernel as well?
+struct FilterString<'a, OffsetSize> {
+    src_offsets: &'a [OffsetSize],
+    src_values: &'a [u8],
+    dst_offsets: MutableBuffer,
+    dst_values: MutableBuffer,
+    cur_offset: OffsetSize,
+}
+
+impl<'a, OffsetSize> FilterString<'a, OffsetSize>
+where
+    OffsetSize: Zero + AddAssign + StringOffsetSizeTrait,
+{
+    fn new(capacity: usize, array: &'a GenericStringArray<OffsetSize>) -> Self 
{
+        let bytes_offset = (capacity + 1) * std::mem::size_of::<OffsetSize>();
+        let mut offsets = MutableBuffer::new(bytes_offset);
+        let values = MutableBuffer::new(0);
+        let cur_offset = OffsetSize::zero();
+        offsets.push(cur_offset);
+
+        Self {
+            src_offsets: array.value_offsets(),
+            src_values: &array.data().buffers()[1],
+            dst_offsets: offsets,
+            dst_values: values,
+            cur_offset,
+        }
+    }
+
+    /// Returns the byte offset at `idx`
+    #[inline]
+    fn get_value_offset(&self, idx: usize) -> usize {
+        self.src_offsets[idx].to_usize().expect("illegal offset")
     }
 
-    let num_columns = record_batch.columns().len();
+    /// Returns the start and end of the value at index `idx` along with its 
length
+    #[inline]
+    fn get_value_range(&self, idx: usize) -> (usize, usize, OffsetSize) {
+        // These can only fail if `array` contains invalid data
+        let start = self.get_value_offset(idx);
+        let end = self.get_value_offset(idx + 1);
+        let len = OffsetSize::from_usize(end - start).expect("illegal offset 
range");
+        (start, end, len)
+    }
 
-    let filtered_arrays = match num_columns {
-        1 => {
-            vec![filter(record_batch.columns()[0].as_ref(), predicate)?]
+    /// Extends the in-progress array by the indexes in the provided iterator
+    fn extend_idx(&mut self, iter: impl Iterator<Item = usize>) {
+        for idx in iter {
+            let (start, end, len) = self.get_value_range(idx);
+            self.cur_offset += len;
+            self.dst_offsets.push(self.cur_offset);
+            self.dst_values
+                .extend_from_slice(&self.src_values[start..end]);
         }
-        _ => {
-            let filter = build_filter(predicate)?;
-            record_batch
-                .columns()
-                .iter()
-                .map(|a| make_array(filter(a.data())))
-                .collect()
+    }
+
+    /// Extends the in-progress array by the ranges in the provided iterator
+    fn extend_slices(&mut self, iter: impl Iterator<Item = (usize, usize)>) {
+        for slice in iter {
+            // These can only fail if `array` contains invalid data
+            for idx in slice.0..slice.1 {
+                let (_, _, len) = self.get_value_range(idx);
+                self.cur_offset += len;
+                self.dst_offsets.push(self.cur_offset); // push_unchecked?
+            }
+
+            let start = self.get_value_offset(slice.0);
+            let end = self.get_value_offset(slice.1);
+            self.dst_values
+                .extend_from_slice(&self.src_values[start..end]);
         }
+    }
+}
+
+/// `filter` implementation for string arrays
+///
+/// Note: NULLs with a non-zero slot length in `array` will have the 
corresponding
+/// data copied across. This allows handling the null mask separately from the 
data

Review comment:
       I thought this maybe was handled by applying `prep_null_mask_filter` to 
the filter first?

##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -185,79 +279,559 @@ pub fn prep_null_mask_filter(filter: &BooleanArray) -> 
BooleanArray {
 /// # Ok(())
 /// # }
 /// ```
-pub fn filter(array: &dyn Array, predicate: &BooleanArray) -> Result<ArrayRef> 
{
-    if predicate.null_count() > 0 {
-        // this greatly simplifies subsequent filtering code
-        // now we only have a boolean mask to deal with
-        let predicate = prep_null_mask_filter(predicate);
-        return filter(array, &predicate);
+pub fn filter(values: &dyn Array, predicate: &BooleanArray) -> 
Result<ArrayRef> {
+    let predicate = FilterBuilder::new(predicate).build();
+    filter_array(values, &predicate)
+}
+
+/// Returns a new [RecordBatch] with arrays containing only values matching 
the filter.
+pub fn filter_record_batch(
+    record_batch: &RecordBatch,
+    predicate: &BooleanArray,
+) -> Result<RecordBatch> {
+    let filter = FilterBuilder::new(predicate).optimize().build();
+
+    let filtered_arrays = record_batch
+        .columns()
+        .iter()
+        .map(|a| filter_array(a, &filter))
+        .collect::<Result<Vec<_>>>()?;
+
+    RecordBatch::try_new(record_batch.schema(), filtered_arrays)
+}
+
+/// A builder to construct [`FilterPredicate`]
+#[derive(Debug)]
+pub struct FilterBuilder {
+    filter: BooleanArray,
+    count: usize,
+    iterator: FilterIterator,
+}
+
+impl FilterBuilder {
+    /// Create a new [`FilterBuilder`] that can be used to construct a 
[`FilterPredicate`]
+    pub fn new(filter: &BooleanArray) -> Self {
+        let filter = match filter.null_count() {
+            0 => BooleanArray::from(filter.data().clone()),
+            _ => prep_null_mask_filter(filter),
+        };
+
+        let count = filter_count(&filter);
+        let selectivity_frac = count as f64 / filter.len() as f64;
+        let iterator = if selectivity_frac > 0.8 {
+            FilterIterator::SlicesIterator
+        } else {
+            FilterIterator::IndexIterator
+        };
+
+        Self {
+            filter,
+            count,
+            iterator,
+        }
+    }
+
+    /// Compute an optimised representation of the provided `filter` mask that 
can be
+    /// applied to an array more quickly.
+    ///
+    /// Note: There is limited benefit to calling this to then filter a single 
array
+    /// Note: This will likely have a larger memory footprint than the 
original mask
+    pub fn optimize(mut self) -> Self {
+        match self.iterator {
+            FilterIterator::SlicesIterator => {
+                let slices = SlicesIterator::new(&self.filter).collect();
+                self.iterator = FilterIterator::Slices(slices)
+            }
+            FilterIterator::IndexIterator => {
+                let indices = IndexIterator::new(&self.filter, 
self.count).collect();
+                self.iterator = FilterIterator::Indices(indices)
+            }
+            _ => {}
+        }
+        self
+    }
+
+    /// Construct the final `FilterPredicate`
+    pub fn build(self) -> FilterPredicate {
+        FilterPredicate {
+            filter: self.filter,
+            count: self.count,
+            iterator: self.iterator,
+        }
     }
+}
 
-    let filter_count = filter_count(predicate);
+/// The internal iterator type of [`FilterPredicate`]
+#[derive(Debug)]
+enum FilterIterator {
+    // A lazily evaluated iterator of ranges
+    SlicesIterator,
+    // A lazily evaluated iterator of indices
+    IndexIterator,
+    // A precomputed list of indices
+    Indices(Vec<usize>),
+    // A precomputed array of ranges
+    Slices(Vec<(usize, usize)>),
+}
+
+/// A filtering predicate that can be applied to an [`Array`]
+#[derive(Debug)]
+pub struct FilterPredicate {
+    filter: BooleanArray,
+    count: usize,
+    iterator: FilterIterator,
+}
 
-    match filter_count {
+impl FilterPredicate {
+    /// Selects rows from `values` based on this [`FilterPredicate`]
+    pub fn filter(&self, values: &dyn Array) -> Result<ArrayRef> {
+        filter_array(values, self)
+    }
+}
+
+fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> 
Result<ArrayRef> {
+    if predicate.filter.len() > values.len() {
+        return Err(ArrowError::InvalidArgumentError(format!(
+            "Filter predicate of length {} is larger than target array of 
length {}",
+            predicate.filter.len(),
+            values.len()
+        )));
+    }
+
+    match predicate.count {
         0 => {
             // return empty
-            Ok(new_empty_array(array.data_type()))
+            Ok(new_empty_array(values.data_type()))
         }
-        len if len == array.len() => {
+        len if len == values.len() => {
             // return all
-            let data = array.data().clone();
+            let data = values.data().clone();
             Ok(make_array(data))
         }
-        _ => {
-            // actually filter
-            let mut mutable =
-                MutableArrayData::new(vec![array.data_ref()], false, 
filter_count);
+        // actually filter
+        _ => match values.data_type() {
+            DataType::Boolean => {
+                let values = 
values.as_any().downcast_ref::<BooleanArray>().unwrap();
+                Ok(Arc::new(filter_boolean(values, predicate)))
+            }
+            DataType::Int8 => {
+                downcast_filter!(Int8Type, values, predicate)
+            }
+            DataType::Int16 => {
+                downcast_filter!(Int16Type, values, predicate)
+            }
+            DataType::Int32 => {
+                downcast_filter!(Int32Type, values, predicate)
+            }
+            DataType::Int64 => {
+                downcast_filter!(Int64Type, values, predicate)
+            }
+            DataType::UInt8 => {
+                downcast_filter!(UInt8Type, values, predicate)
+            }
+            DataType::UInt16 => {
+                downcast_filter!(UInt16Type, values, predicate)
+            }
+            DataType::UInt32 => {
+                downcast_filter!(UInt32Type, values, predicate)
+            }
+            DataType::UInt64 => {
+                downcast_filter!(UInt64Type, values, predicate)
+            }
+            DataType::Float32 => {
+                downcast_filter!(Float32Type, values, predicate)
+            }
+            DataType::Float64 => {
+                downcast_filter!(Float64Type, values, predicate)
+            }
+            DataType::Date32 => {
+                downcast_filter!(Date32Type, values, predicate)
+            }
+            DataType::Date64 => {
+                downcast_filter!(Date64Type, values, predicate)
+            }
+            DataType::Time32(Second) => {
+                downcast_filter!(Time32SecondType, values, predicate)
+            }
+            DataType::Time32(Millisecond) => {
+                downcast_filter!(Time32MillisecondType, values, predicate)
+            }
+            DataType::Time64(Microsecond) => {
+                downcast_filter!(Time64MicrosecondType, values, predicate)
+            }
+            DataType::Time64(Nanosecond) => {
+                downcast_filter!(Time64NanosecondType, values, predicate)
+            }
+            DataType::Timestamp(Second, _) => {
+                downcast_filter!(TimestampSecondType, values, predicate)
+            }
+            DataType::Timestamp(Millisecond, _) => {
+                downcast_filter!(TimestampMillisecondType, values, predicate)
+            }
+            DataType::Timestamp(Microsecond, _) => {
+                downcast_filter!(TimestampMicrosecondType, values, predicate)
+            }
+            DataType::Timestamp(Nanosecond, _) => {
+                downcast_filter!(TimestampNanosecondType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::YearMonth) => {
+                downcast_filter!(IntervalYearMonthType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::DayTime) => {
+                downcast_filter!(IntervalDayTimeType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::MonthDayNano) => {
+                downcast_filter!(IntervalMonthDayNanoType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Second) => {
+                downcast_filter!(DurationSecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Millisecond) => {
+                downcast_filter!(DurationMillisecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Microsecond) => {
+                downcast_filter!(DurationMicrosecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Nanosecond) => {
+                downcast_filter!(DurationNanosecondType, values, predicate)
+            }
+            DataType::Utf8 => {
+                let values = values
+                    .as_any()
+                    .downcast_ref::<GenericStringArray<i32>>()
+                    .unwrap();
+                Ok(Arc::new(filter_string::<i32>(values, predicate)))
+            }
+            DataType::LargeUtf8 => {
+                let values = values
+                    .as_any()
+                    .downcast_ref::<GenericStringArray<i64>>()
+                    .unwrap();
+                Ok(Arc::new(filter_string::<i64>(values, predicate)))
+            }
+            DataType::Dictionary(key_type, _) => match key_type.as_ref() {
+                DataType::Int8 => downcast_dict_filter!(Int8Type, values, 
predicate),
+                DataType::Int16 => downcast_dict_filter!(Int16Type, values, 
predicate),
+                DataType::Int32 => downcast_dict_filter!(Int32Type, values, 
predicate),
+                DataType::Int64 => downcast_dict_filter!(Int64Type, values, 
predicate),
+                DataType::UInt8 => downcast_dict_filter!(UInt8Type, values, 
predicate),
+                DataType::UInt16 => downcast_dict_filter!(UInt16Type, values, 
predicate),
+                DataType::UInt32 => downcast_dict_filter!(UInt32Type, values, 
predicate),
+                DataType::UInt64 => downcast_dict_filter!(UInt64Type, values, 
predicate),
+                t => unimplemented!("Take not supported for dictionary key 
type {:?}", t),
+            },
+            _ => {
+                // fallback to using MutableArrayData
+                let mut mutable = MutableArrayData::new(
+                    vec![values.data_ref()],
+                    false,
+                    predicate.count,
+                );
+
+                let iter = SlicesIterator::new(&predicate.filter);
+                iter.for_each(|(start, end)| mutable.extend(0, start, end));
+
+                let data = mutable.freeze();
+                Ok(make_array(data))
+            }
+        },
+    }
+}
+
+/// Computes a new null mask for `data` based on `predicate`
+///
+/// Returns `None` if no nulls in the result
+fn filter_null_mask(
+    data: &ArrayData,
+    predicate: &FilterPredicate,
+) -> Option<(usize, Buffer)> {
+    if data.null_count() == 0 {
+        return None;
+    }
+
+    let nulls = filter_bits(data.null_buffer()?, data.offset(), predicate);
+    let null_count = predicate.count - nulls.count_set_bits();
+
+    if null_count == 0 {
+        return None;
+    }
+
+    Some((null_count, nulls))
+}
+
+/// Filter the packed bitmask `buffer`, with `predicate` starting at bit 
offset `offset`
+fn filter_bits(buffer: &Buffer, offset: usize, predicate: &FilterPredicate) -> 
Buffer {
+    let src = buffer.as_slice();
+
+    match &predicate.iterator {
+        FilterIterator::IndexIterator => {
+            let bits = IndexIterator::new(&predicate.filter, predicate.count)
+                .map(|src_idx| bit_util::get_bit(src, src_idx + offset));
+
+            // SAFETY: `IndexIterator` reports its size correctly
+            unsafe { MutableBuffer::from_trusted_len_iter_bool(bits).into() }
+        }
+        FilterIterator::Indices(indices) => {
+            let bits = indices
+                .iter()
+                .map(|src_idx| bit_util::get_bit(src, *src_idx + offset));
 
-            let iter = SlicesIterator::new(predicate);
-            iter.for_each(|(start, end)| mutable.extend(0, start, end));
+            // SAFETY: `Vec::iter()` reports its size correctly
+            unsafe { MutableBuffer::from_trusted_len_iter_bool(bits).into() }
+        }
+        FilterIterator::SlicesIterator => {
+            let mut builder =
+                BooleanBufferBuilder::new(bit_util::ceil(predicate.count, 8));
+            for (start, end) in SlicesIterator::new(&predicate.filter) {
+                builder.append_packed_range(start..end, src)
+            }
+            builder.finish()
+        }
+        FilterIterator::Slices(slices) => {
+            let mut builder =
+                BooleanBufferBuilder::new(bit_util::ceil(predicate.count, 8));
+            for (start, end) in slices {
+                builder.append_packed_range(*start..*end, src)
+            }
+            builder.finish()
+        }
+    }
+}
 
-            let data = mutable.freeze();
-            Ok(make_array(data))
+/// `filter` implementation for boolean buffers
+fn filter_boolean(values: &BooleanArray, predicate: &FilterPredicate) -> 
BooleanArray {
+    let data = values.data();
+    assert_eq!(data.buffers().len(), 1);
+    assert_eq!(data.child_data().len(), 0);
+
+    let values = filter_bits(&data.buffers()[0], data.offset(), predicate);
+
+    let mut builder = ArrayDataBuilder::new(DataType::Boolean)
+        .len(predicate.count)
+        .add_buffer(values);
+
+    if let Some((null_count, nulls)) = filter_null_mask(data, predicate) {
+        builder = builder.null_count(null_count).null_bit_buffer(nulls);
+    }
+
+    let data = unsafe { builder.build_unchecked() };
+    BooleanArray::from(data)
+}
+
+/// `filter` implementation for primitive arrays
+fn filter_primitive<T>(
+    values: &PrimitiveArray<T>,
+    predicate: &FilterPredicate,
+) -> PrimitiveArray<T>
+where
+    T: ArrowPrimitiveType,
+{
+    let data = values.data();
+    assert_eq!(data.buffers().len(), 1);
+    assert_eq!(data.child_data().len(), 0);
+
+    let values = data.buffer::<T::Native>(0);
+    assert!(values.len() >= predicate.filter.len());
+
+    let buffer = match &predicate.iterator {
+        FilterIterator::SlicesIterator => {
+            let mut buffer =
+                MutableBuffer::with_capacity(predicate.count * 
T::get_byte_width());
+            for (start, end) in SlicesIterator::new(&predicate.filter) {
+                buffer.extend_from_slice(&values[start..end]);
+            }
+            buffer
         }
+        FilterIterator::Slices(slices) => {
+            let mut buffer =
+                MutableBuffer::with_capacity(predicate.count * 
T::get_byte_width());
+            for (start, end) in slices {
+                buffer.extend_from_slice(&values[*start..*end]);
+            }
+            buffer
+        }
+        FilterIterator::IndexIterator => {
+            let iter =
+                IndexIterator::new(&predicate.filter, predicate.count).map(|x| 
values[x]);
+
+            // SAFETY: IndexIterator is trusted length
+            unsafe { MutableBuffer::from_trusted_len_iter(iter) }
+        }
+        FilterIterator::Indices(indices) => {
+            let iter = indices.iter().map(|x| values[*x]);
+
+            // SAFETY: `Vec::iter` is trusted length
+            unsafe { MutableBuffer::from_trusted_len_iter(iter) }
+        }
+    };
+
+    let mut builder = ArrayDataBuilder::new(data.data_type().clone())
+        .len(predicate.count)
+        .add_buffer(buffer.into());
+
+    if let Some((null_count, nulls)) = filter_null_mask(data, predicate) {
+        builder = builder.null_count(null_count).null_bit_buffer(nulls);
     }
+
+    let data = unsafe { builder.build_unchecked() };
+    PrimitiveArray::from(data)
 }
 
-/// Returns a new [RecordBatch] with arrays containing only values matching 
the filter.
-pub fn filter_record_batch(
-    record_batch: &RecordBatch,
-    predicate: &BooleanArray,
-) -> Result<RecordBatch> {
-    if predicate.null_count() > 0 {
-        // this greatly simplifies subsequent filtering code
-        // now we only have a boolean mask to deal with
-        let predicate = prep_null_mask_filter(predicate);
-        return filter_record_batch(record_batch, &predicate);
+/// [`FilterString`] is created from a source [`GenericStringArray`] and can be
+/// used to build a new [`GenericStringArray`] by copying values from the 
source
+///
+/// TODO(raphael): Could this be used for the take kernel as well?
+struct FilterString<'a, OffsetSize> {
+    src_offsets: &'a [OffsetSize],
+    src_values: &'a [u8],
+    dst_offsets: MutableBuffer,
+    dst_values: MutableBuffer,
+    cur_offset: OffsetSize,
+}
+
+impl<'a, OffsetSize> FilterString<'a, OffsetSize>
+where
+    OffsetSize: Zero + AddAssign + StringOffsetSizeTrait,
+{
+    fn new(capacity: usize, array: &'a GenericStringArray<OffsetSize>) -> Self 
{
+        let bytes_offset = (capacity + 1) * std::mem::size_of::<OffsetSize>();
+        let mut offsets = MutableBuffer::new(bytes_offset);

Review comment:
       ```suggestion
           let mut dst_offsets = MutableBuffer::new(bytes_offset);
   ```

##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -185,79 +279,559 @@ pub fn prep_null_mask_filter(filter: &BooleanArray) -> 
BooleanArray {
 /// # Ok(())
 /// # }
 /// ```
-pub fn filter(array: &dyn Array, predicate: &BooleanArray) -> Result<ArrayRef> 
{
-    if predicate.null_count() > 0 {
-        // this greatly simplifies subsequent filtering code
-        // now we only have a boolean mask to deal with
-        let predicate = prep_null_mask_filter(predicate);
-        return filter(array, &predicate);
+pub fn filter(values: &dyn Array, predicate: &BooleanArray) -> 
Result<ArrayRef> {
+    let predicate = FilterBuilder::new(predicate).build();
+    filter_array(values, &predicate)
+}
+
+/// Returns a new [RecordBatch] with arrays containing only values matching 
the filter.
+pub fn filter_record_batch(
+    record_batch: &RecordBatch,
+    predicate: &BooleanArray,
+) -> Result<RecordBatch> {
+    let filter = FilterBuilder::new(predicate).optimize().build();
+
+    let filtered_arrays = record_batch
+        .columns()
+        .iter()
+        .map(|a| filter_array(a, &filter))
+        .collect::<Result<Vec<_>>>()?;
+
+    RecordBatch::try_new(record_batch.schema(), filtered_arrays)
+}
+
+/// A builder to construct [`FilterPredicate`]
+#[derive(Debug)]
+pub struct FilterBuilder {
+    filter: BooleanArray,
+    count: usize,
+    iterator: FilterIterator,
+}
+
+impl FilterBuilder {
+    /// Create a new [`FilterBuilder`] that can be used to construct a 
[`FilterPredicate`]
+    pub fn new(filter: &BooleanArray) -> Self {
+        let filter = match filter.null_count() {
+            0 => BooleanArray::from(filter.data().clone()),
+            _ => prep_null_mask_filter(filter),
+        };
+
+        let count = filter_count(&filter);
+        let selectivity_frac = count as f64 / filter.len() as f64;
+        let iterator = if selectivity_frac > 0.8 {
+            FilterIterator::SlicesIterator
+        } else {
+            FilterIterator::IndexIterator
+        };
+
+        Self {
+            filter,
+            count,
+            iterator,
+        }
+    }
+
+    /// Compute an optimised representation of the provided `filter` mask that 
can be
+    /// applied to an array more quickly.
+    ///
+    /// Note: There is limited benefit to calling this to then filter a single 
array
+    /// Note: This will likely have a larger memory footprint than the 
original mask
+    pub fn optimize(mut self) -> Self {
+        match self.iterator {
+            FilterIterator::SlicesIterator => {
+                let slices = SlicesIterator::new(&self.filter).collect();
+                self.iterator = FilterIterator::Slices(slices)
+            }
+            FilterIterator::IndexIterator => {
+                let indices = IndexIterator::new(&self.filter, 
self.count).collect();
+                self.iterator = FilterIterator::Indices(indices)
+            }
+            _ => {}
+        }
+        self
+    }
+
+    /// Construct the final `FilterPredicate`
+    pub fn build(self) -> FilterPredicate {
+        FilterPredicate {
+            filter: self.filter,
+            count: self.count,
+            iterator: self.iterator,
+        }
     }
+}
 
-    let filter_count = filter_count(predicate);
+/// The internal iterator type of [`FilterPredicate`]
+#[derive(Debug)]
+enum FilterIterator {
+    // A lazily evaluated iterator of ranges
+    SlicesIterator,
+    // A lazily evaluated iterator of indices
+    IndexIterator,
+    // A precomputed list of indices
+    Indices(Vec<usize>),
+    // A precomputed array of ranges
+    Slices(Vec<(usize, usize)>),
+}
+
+/// A filtering predicate that can be applied to an [`Array`]
+#[derive(Debug)]
+pub struct FilterPredicate {
+    filter: BooleanArray,
+    count: usize,
+    iterator: FilterIterator,
+}
 
-    match filter_count {
+impl FilterPredicate {
+    /// Selects rows from `values` based on this [`FilterPredicate`]
+    pub fn filter(&self, values: &dyn Array) -> Result<ArrayRef> {
+        filter_array(values, self)
+    }
+}
+
+fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> 
Result<ArrayRef> {
+    if predicate.filter.len() > values.len() {
+        return Err(ArrowError::InvalidArgumentError(format!(
+            "Filter predicate of length {} is larger than target array of 
length {}",
+            predicate.filter.len(),
+            values.len()
+        )));
+    }
+
+    match predicate.count {
         0 => {
             // return empty
-            Ok(new_empty_array(array.data_type()))
+            Ok(new_empty_array(values.data_type()))
         }
-        len if len == array.len() => {
+        len if len == values.len() => {
             // return all
-            let data = array.data().clone();
+            let data = values.data().clone();
             Ok(make_array(data))
         }
-        _ => {
-            // actually filter
-            let mut mutable =
-                MutableArrayData::new(vec![array.data_ref()], false, 
filter_count);
+        // actually filter
+        _ => match values.data_type() {
+            DataType::Boolean => {
+                let values = 
values.as_any().downcast_ref::<BooleanArray>().unwrap();
+                Ok(Arc::new(filter_boolean(values, predicate)))
+            }
+            DataType::Int8 => {
+                downcast_filter!(Int8Type, values, predicate)
+            }
+            DataType::Int16 => {
+                downcast_filter!(Int16Type, values, predicate)
+            }
+            DataType::Int32 => {
+                downcast_filter!(Int32Type, values, predicate)
+            }
+            DataType::Int64 => {
+                downcast_filter!(Int64Type, values, predicate)
+            }
+            DataType::UInt8 => {
+                downcast_filter!(UInt8Type, values, predicate)
+            }
+            DataType::UInt16 => {
+                downcast_filter!(UInt16Type, values, predicate)
+            }
+            DataType::UInt32 => {
+                downcast_filter!(UInt32Type, values, predicate)
+            }
+            DataType::UInt64 => {
+                downcast_filter!(UInt64Type, values, predicate)
+            }
+            DataType::Float32 => {
+                downcast_filter!(Float32Type, values, predicate)
+            }
+            DataType::Float64 => {
+                downcast_filter!(Float64Type, values, predicate)
+            }
+            DataType::Date32 => {
+                downcast_filter!(Date32Type, values, predicate)
+            }
+            DataType::Date64 => {
+                downcast_filter!(Date64Type, values, predicate)
+            }
+            DataType::Time32(Second) => {
+                downcast_filter!(Time32SecondType, values, predicate)
+            }
+            DataType::Time32(Millisecond) => {
+                downcast_filter!(Time32MillisecondType, values, predicate)
+            }
+            DataType::Time64(Microsecond) => {
+                downcast_filter!(Time64MicrosecondType, values, predicate)
+            }
+            DataType::Time64(Nanosecond) => {
+                downcast_filter!(Time64NanosecondType, values, predicate)
+            }
+            DataType::Timestamp(Second, _) => {
+                downcast_filter!(TimestampSecondType, values, predicate)
+            }
+            DataType::Timestamp(Millisecond, _) => {
+                downcast_filter!(TimestampMillisecondType, values, predicate)
+            }
+            DataType::Timestamp(Microsecond, _) => {
+                downcast_filter!(TimestampMicrosecondType, values, predicate)
+            }
+            DataType::Timestamp(Nanosecond, _) => {
+                downcast_filter!(TimestampNanosecondType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::YearMonth) => {
+                downcast_filter!(IntervalYearMonthType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::DayTime) => {
+                downcast_filter!(IntervalDayTimeType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::MonthDayNano) => {
+                downcast_filter!(IntervalMonthDayNanoType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Second) => {
+                downcast_filter!(DurationSecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Millisecond) => {
+                downcast_filter!(DurationMillisecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Microsecond) => {
+                downcast_filter!(DurationMicrosecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Nanosecond) => {
+                downcast_filter!(DurationNanosecondType, values, predicate)
+            }
+            DataType::Utf8 => {
+                let values = values
+                    .as_any()
+                    .downcast_ref::<GenericStringArray<i32>>()
+                    .unwrap();
+                Ok(Arc::new(filter_string::<i32>(values, predicate)))
+            }
+            DataType::LargeUtf8 => {
+                let values = values
+                    .as_any()
+                    .downcast_ref::<GenericStringArray<i64>>()
+                    .unwrap();
+                Ok(Arc::new(filter_string::<i64>(values, predicate)))
+            }
+            DataType::Dictionary(key_type, _) => match key_type.as_ref() {
+                DataType::Int8 => downcast_dict_filter!(Int8Type, values, 
predicate),
+                DataType::Int16 => downcast_dict_filter!(Int16Type, values, 
predicate),
+                DataType::Int32 => downcast_dict_filter!(Int32Type, values, 
predicate),
+                DataType::Int64 => downcast_dict_filter!(Int64Type, values, 
predicate),
+                DataType::UInt8 => downcast_dict_filter!(UInt8Type, values, 
predicate),
+                DataType::UInt16 => downcast_dict_filter!(UInt16Type, values, 
predicate),
+                DataType::UInt32 => downcast_dict_filter!(UInt32Type, values, 
predicate),
+                DataType::UInt64 => downcast_dict_filter!(UInt64Type, values, 
predicate),
+                t => unimplemented!("Take not supported for dictionary key 
type {:?}", t),
+            },
+            _ => {
+                // fallback to using MutableArrayData
+                let mut mutable = MutableArrayData::new(
+                    vec![values.data_ref()],
+                    false,
+                    predicate.count,
+                );
+
+                let iter = SlicesIterator::new(&predicate.filter);
+                iter.for_each(|(start, end)| mutable.extend(0, start, end));
+
+                let data = mutable.freeze();
+                Ok(make_array(data))
+            }
+        },
+    }
+}
+
+/// Computes a new null mask for `data` based on `predicate`
+///
+/// Returns `None` if no nulls in the result
+fn filter_null_mask(
+    data: &ArrayData,
+    predicate: &FilterPredicate,
+) -> Option<(usize, Buffer)> {
+    if data.null_count() == 0 {
+        return None;
+    }
+
+    let nulls = filter_bits(data.null_buffer()?, data.offset(), predicate);
+    let null_count = predicate.count - nulls.count_set_bits();
+
+    if null_count == 0 {
+        return None;
+    }
+
+    Some((null_count, nulls))
+}
+
+/// Filter the packed bitmask `buffer`, with `predicate` starting at bit 
offset `offset`
+fn filter_bits(buffer: &Buffer, offset: usize, predicate: &FilterPredicate) -> 
Buffer {
+    let src = buffer.as_slice();
+
+    match &predicate.iterator {
+        FilterIterator::IndexIterator => {
+            let bits = IndexIterator::new(&predicate.filter, predicate.count)
+                .map(|src_idx| bit_util::get_bit(src, src_idx + offset));
+
+            // SAFETY: `IndexIterator` reports its size correctly
+            unsafe { MutableBuffer::from_trusted_len_iter_bool(bits).into() }
+        }
+        FilterIterator::Indices(indices) => {
+            let bits = indices
+                .iter()
+                .map(|src_idx| bit_util::get_bit(src, *src_idx + offset));
 
-            let iter = SlicesIterator::new(predicate);
-            iter.for_each(|(start, end)| mutable.extend(0, start, end));
+            // SAFETY: `Vec::iter()` reports its size correctly
+            unsafe { MutableBuffer::from_trusted_len_iter_bool(bits).into() }
+        }
+        FilterIterator::SlicesIterator => {
+            let mut builder =
+                BooleanBufferBuilder::new(bit_util::ceil(predicate.count, 8));
+            for (start, end) in SlicesIterator::new(&predicate.filter) {
+                builder.append_packed_range(start..end, src)
+            }
+            builder.finish()
+        }
+        FilterIterator::Slices(slices) => {
+            let mut builder =
+                BooleanBufferBuilder::new(bit_util::ceil(predicate.count, 8));
+            for (start, end) in slices {
+                builder.append_packed_range(*start..*end, src)
+            }
+            builder.finish()
+        }
+    }
+}
 
-            let data = mutable.freeze();
-            Ok(make_array(data))
+/// `filter` implementation for boolean buffers
+fn filter_boolean(values: &BooleanArray, predicate: &FilterPredicate) -> 
BooleanArray {
+    let data = values.data();
+    assert_eq!(data.buffers().len(), 1);
+    assert_eq!(data.child_data().len(), 0);
+
+    let values = filter_bits(&data.buffers()[0], data.offset(), predicate);
+
+    let mut builder = ArrayDataBuilder::new(DataType::Boolean)
+        .len(predicate.count)
+        .add_buffer(values);
+
+    if let Some((null_count, nulls)) = filter_null_mask(data, predicate) {
+        builder = builder.null_count(null_count).null_bit_buffer(nulls);
+    }
+
+    let data = unsafe { builder.build_unchecked() };
+    BooleanArray::from(data)
+}
+
+/// `filter` implementation for primitive arrays
+fn filter_primitive<T>(
+    values: &PrimitiveArray<T>,
+    predicate: &FilterPredicate,
+) -> PrimitiveArray<T>
+where
+    T: ArrowPrimitiveType,
+{
+    let data = values.data();
+    assert_eq!(data.buffers().len(), 1);
+    assert_eq!(data.child_data().len(), 0);
+
+    let values = data.buffer::<T::Native>(0);
+    assert!(values.len() >= predicate.filter.len());
+
+    let buffer = match &predicate.iterator {
+        FilterIterator::SlicesIterator => {
+            let mut buffer =
+                MutableBuffer::with_capacity(predicate.count * 
T::get_byte_width());
+            for (start, end) in SlicesIterator::new(&predicate.filter) {
+                buffer.extend_from_slice(&values[start..end]);
+            }
+            buffer
         }
+        FilterIterator::Slices(slices) => {
+            let mut buffer =
+                MutableBuffer::with_capacity(predicate.count * 
T::get_byte_width());
+            for (start, end) in slices {
+                buffer.extend_from_slice(&values[*start..*end]);
+            }
+            buffer
+        }
+        FilterIterator::IndexIterator => {
+            let iter =
+                IndexIterator::new(&predicate.filter, predicate.count).map(|x| 
values[x]);
+
+            // SAFETY: IndexIterator is trusted length
+            unsafe { MutableBuffer::from_trusted_len_iter(iter) }
+        }
+        FilterIterator::Indices(indices) => {
+            let iter = indices.iter().map(|x| values[*x]);
+
+            // SAFETY: `Vec::iter` is trusted length
+            unsafe { MutableBuffer::from_trusted_len_iter(iter) }
+        }
+    };
+
+    let mut builder = ArrayDataBuilder::new(data.data_type().clone())
+        .len(predicate.count)
+        .add_buffer(buffer.into());
+
+    if let Some((null_count, nulls)) = filter_null_mask(data, predicate) {
+        builder = builder.null_count(null_count).null_bit_buffer(nulls);
     }
+
+    let data = unsafe { builder.build_unchecked() };
+    PrimitiveArray::from(data)
 }
 
-/// Returns a new [RecordBatch] with arrays containing only values matching 
the filter.
-pub fn filter_record_batch(
-    record_batch: &RecordBatch,
-    predicate: &BooleanArray,
-) -> Result<RecordBatch> {
-    if predicate.null_count() > 0 {
-        // this greatly simplifies subsequent filtering code
-        // now we only have a boolean mask to deal with
-        let predicate = prep_null_mask_filter(predicate);
-        return filter_record_batch(record_batch, &predicate);
+/// [`FilterString`] is created from a source [`GenericStringArray`] and can be
+/// used to build a new [`GenericStringArray`] by copying values from the 
source
+///
+/// TODO(raphael): Could this be used for the take kernel as well?
+struct FilterString<'a, OffsetSize> {
+    src_offsets: &'a [OffsetSize],
+    src_values: &'a [u8],
+    dst_offsets: MutableBuffer,
+    dst_values: MutableBuffer,
+    cur_offset: OffsetSize,
+}
+
+impl<'a, OffsetSize> FilterString<'a, OffsetSize>
+where
+    OffsetSize: Zero + AddAssign + StringOffsetSizeTrait,
+{
+    fn new(capacity: usize, array: &'a GenericStringArray<OffsetSize>) -> Self 
{
+        let bytes_offset = (capacity + 1) * std::mem::size_of::<OffsetSize>();

Review comment:
       ```suggestion
           let num_offsets = (capacity + 1) * std::mem::size_of::<OffsetSize>();
   ```

##########
File path: arrow/src/compute/kernels/filter.rs
##########
@@ -185,79 +279,559 @@ pub fn prep_null_mask_filter(filter: &BooleanArray) -> 
BooleanArray {
 /// # Ok(())
 /// # }
 /// ```
-pub fn filter(array: &dyn Array, predicate: &BooleanArray) -> Result<ArrayRef> 
{
-    if predicate.null_count() > 0 {
-        // this greatly simplifies subsequent filtering code
-        // now we only have a boolean mask to deal with
-        let predicate = prep_null_mask_filter(predicate);
-        return filter(array, &predicate);
+pub fn filter(values: &dyn Array, predicate: &BooleanArray) -> 
Result<ArrayRef> {
+    let predicate = FilterBuilder::new(predicate).build();
+    filter_array(values, &predicate)
+}
+
+/// Returns a new [RecordBatch] with arrays containing only values matching 
the filter.
+pub fn filter_record_batch(
+    record_batch: &RecordBatch,
+    predicate: &BooleanArray,
+) -> Result<RecordBatch> {
+    let filter = FilterBuilder::new(predicate).optimize().build();
+
+    let filtered_arrays = record_batch
+        .columns()
+        .iter()
+        .map(|a| filter_array(a, &filter))
+        .collect::<Result<Vec<_>>>()?;
+
+    RecordBatch::try_new(record_batch.schema(), filtered_arrays)
+}
+
+/// A builder to construct [`FilterPredicate`]
+#[derive(Debug)]
+pub struct FilterBuilder {
+    filter: BooleanArray,
+    count: usize,
+    iterator: FilterIterator,
+}
+
+impl FilterBuilder {
+    /// Create a new [`FilterBuilder`] that can be used to construct a 
[`FilterPredicate`]
+    pub fn new(filter: &BooleanArray) -> Self {
+        let filter = match filter.null_count() {
+            0 => BooleanArray::from(filter.data().clone()),
+            _ => prep_null_mask_filter(filter),
+        };
+
+        let count = filter_count(&filter);
+        let selectivity_frac = count as f64 / filter.len() as f64;
+        let iterator = if selectivity_frac > 0.8 {
+            FilterIterator::SlicesIterator
+        } else {
+            FilterIterator::IndexIterator
+        };
+
+        Self {
+            filter,
+            count,
+            iterator,
+        }
+    }
+
+    /// Compute an optimised representation of the provided `filter` mask that 
can be
+    /// applied to an array more quickly.
+    ///
+    /// Note: There is limited benefit to calling this to then filter a single 
array
+    /// Note: This will likely have a larger memory footprint than the 
original mask
+    pub fn optimize(mut self) -> Self {
+        match self.iterator {
+            FilterIterator::SlicesIterator => {
+                let slices = SlicesIterator::new(&self.filter).collect();
+                self.iterator = FilterIterator::Slices(slices)
+            }
+            FilterIterator::IndexIterator => {
+                let indices = IndexIterator::new(&self.filter, 
self.count).collect();
+                self.iterator = FilterIterator::Indices(indices)
+            }
+            _ => {}
+        }
+        self
+    }
+
+    /// Construct the final `FilterPredicate`
+    pub fn build(self) -> FilterPredicate {
+        FilterPredicate {
+            filter: self.filter,
+            count: self.count,
+            iterator: self.iterator,
+        }
     }
+}
 
-    let filter_count = filter_count(predicate);
+/// The internal iterator type of [`FilterPredicate`]
+#[derive(Debug)]
+enum FilterIterator {
+    // A lazily evaluated iterator of ranges
+    SlicesIterator,
+    // A lazily evaluated iterator of indices
+    IndexIterator,
+    // A precomputed list of indices
+    Indices(Vec<usize>),
+    // A precomputed array of ranges
+    Slices(Vec<(usize, usize)>),
+}
+
+/// A filtering predicate that can be applied to an [`Array`]
+#[derive(Debug)]
+pub struct FilterPredicate {
+    filter: BooleanArray,
+    count: usize,
+    iterator: FilterIterator,
+}
 
-    match filter_count {
+impl FilterPredicate {
+    /// Selects rows from `values` based on this [`FilterPredicate`]
+    pub fn filter(&self, values: &dyn Array) -> Result<ArrayRef> {
+        filter_array(values, self)
+    }
+}
+
+fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> 
Result<ArrayRef> {
+    if predicate.filter.len() > values.len() {
+        return Err(ArrowError::InvalidArgumentError(format!(
+            "Filter predicate of length {} is larger than target array of 
length {}",
+            predicate.filter.len(),
+            values.len()
+        )));
+    }
+
+    match predicate.count {
         0 => {
             // return empty
-            Ok(new_empty_array(array.data_type()))
+            Ok(new_empty_array(values.data_type()))
         }
-        len if len == array.len() => {
+        len if len == values.len() => {
             // return all
-            let data = array.data().clone();
+            let data = values.data().clone();
             Ok(make_array(data))
         }
-        _ => {
-            // actually filter
-            let mut mutable =
-                MutableArrayData::new(vec![array.data_ref()], false, 
filter_count);
+        // actually filter
+        _ => match values.data_type() {
+            DataType::Boolean => {
+                let values = 
values.as_any().downcast_ref::<BooleanArray>().unwrap();
+                Ok(Arc::new(filter_boolean(values, predicate)))
+            }
+            DataType::Int8 => {
+                downcast_filter!(Int8Type, values, predicate)
+            }
+            DataType::Int16 => {
+                downcast_filter!(Int16Type, values, predicate)
+            }
+            DataType::Int32 => {
+                downcast_filter!(Int32Type, values, predicate)
+            }
+            DataType::Int64 => {
+                downcast_filter!(Int64Type, values, predicate)
+            }
+            DataType::UInt8 => {
+                downcast_filter!(UInt8Type, values, predicate)
+            }
+            DataType::UInt16 => {
+                downcast_filter!(UInt16Type, values, predicate)
+            }
+            DataType::UInt32 => {
+                downcast_filter!(UInt32Type, values, predicate)
+            }
+            DataType::UInt64 => {
+                downcast_filter!(UInt64Type, values, predicate)
+            }
+            DataType::Float32 => {
+                downcast_filter!(Float32Type, values, predicate)
+            }
+            DataType::Float64 => {
+                downcast_filter!(Float64Type, values, predicate)
+            }
+            DataType::Date32 => {
+                downcast_filter!(Date32Type, values, predicate)
+            }
+            DataType::Date64 => {
+                downcast_filter!(Date64Type, values, predicate)
+            }
+            DataType::Time32(Second) => {
+                downcast_filter!(Time32SecondType, values, predicate)
+            }
+            DataType::Time32(Millisecond) => {
+                downcast_filter!(Time32MillisecondType, values, predicate)
+            }
+            DataType::Time64(Microsecond) => {
+                downcast_filter!(Time64MicrosecondType, values, predicate)
+            }
+            DataType::Time64(Nanosecond) => {
+                downcast_filter!(Time64NanosecondType, values, predicate)
+            }
+            DataType::Timestamp(Second, _) => {
+                downcast_filter!(TimestampSecondType, values, predicate)
+            }
+            DataType::Timestamp(Millisecond, _) => {
+                downcast_filter!(TimestampMillisecondType, values, predicate)
+            }
+            DataType::Timestamp(Microsecond, _) => {
+                downcast_filter!(TimestampMicrosecondType, values, predicate)
+            }
+            DataType::Timestamp(Nanosecond, _) => {
+                downcast_filter!(TimestampNanosecondType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::YearMonth) => {
+                downcast_filter!(IntervalYearMonthType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::DayTime) => {
+                downcast_filter!(IntervalDayTimeType, values, predicate)
+            }
+            DataType::Interval(IntervalUnit::MonthDayNano) => {
+                downcast_filter!(IntervalMonthDayNanoType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Second) => {
+                downcast_filter!(DurationSecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Millisecond) => {
+                downcast_filter!(DurationMillisecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Microsecond) => {
+                downcast_filter!(DurationMicrosecondType, values, predicate)
+            }
+            DataType::Duration(TimeUnit::Nanosecond) => {
+                downcast_filter!(DurationNanosecondType, values, predicate)
+            }
+            DataType::Utf8 => {
+                let values = values
+                    .as_any()
+                    .downcast_ref::<GenericStringArray<i32>>()
+                    .unwrap();
+                Ok(Arc::new(filter_string::<i32>(values, predicate)))
+            }
+            DataType::LargeUtf8 => {
+                let values = values
+                    .as_any()
+                    .downcast_ref::<GenericStringArray<i64>>()
+                    .unwrap();
+                Ok(Arc::new(filter_string::<i64>(values, predicate)))
+            }
+            DataType::Dictionary(key_type, _) => match key_type.as_ref() {
+                DataType::Int8 => downcast_dict_filter!(Int8Type, values, 
predicate),
+                DataType::Int16 => downcast_dict_filter!(Int16Type, values, 
predicate),
+                DataType::Int32 => downcast_dict_filter!(Int32Type, values, 
predicate),
+                DataType::Int64 => downcast_dict_filter!(Int64Type, values, 
predicate),
+                DataType::UInt8 => downcast_dict_filter!(UInt8Type, values, 
predicate),
+                DataType::UInt16 => downcast_dict_filter!(UInt16Type, values, 
predicate),
+                DataType::UInt32 => downcast_dict_filter!(UInt32Type, values, 
predicate),
+                DataType::UInt64 => downcast_dict_filter!(UInt64Type, values, 
predicate),
+                t => unimplemented!("Take not supported for dictionary key 
type {:?}", t),
+            },
+            _ => {
+                // fallback to using MutableArrayData
+                let mut mutable = MutableArrayData::new(
+                    vec![values.data_ref()],
+                    false,
+                    predicate.count,
+                );
+
+                let iter = SlicesIterator::new(&predicate.filter);
+                iter.for_each(|(start, end)| mutable.extend(0, start, end));
+
+                let data = mutable.freeze();
+                Ok(make_array(data))
+            }
+        },
+    }
+}
+
+/// Computes a new null mask for `data` based on `predicate`
+///
+/// Returns `None` if no nulls in the result
+fn filter_null_mask(
+    data: &ArrayData,
+    predicate: &FilterPredicate,
+) -> Option<(usize, Buffer)> {
+    if data.null_count() == 0 {
+        return None;
+    }
+
+    let nulls = filter_bits(data.null_buffer()?, data.offset(), predicate);
+    let null_count = predicate.count - nulls.count_set_bits();
+
+    if null_count == 0 {
+        return None;
+    }
+
+    Some((null_count, nulls))
+}
+
+/// Filter the packed bitmask `buffer`, with `predicate` starting at bit 
offset `offset`
+fn filter_bits(buffer: &Buffer, offset: usize, predicate: &FilterPredicate) -> 
Buffer {
+    let src = buffer.as_slice();
+
+    match &predicate.iterator {
+        FilterIterator::IndexIterator => {
+            let bits = IndexIterator::new(&predicate.filter, predicate.count)
+                .map(|src_idx| bit_util::get_bit(src, src_idx + offset));
+
+            // SAFETY: `IndexIterator` reports its size correctly
+            unsafe { MutableBuffer::from_trusted_len_iter_bool(bits).into() }
+        }
+        FilterIterator::Indices(indices) => {
+            let bits = indices
+                .iter()
+                .map(|src_idx| bit_util::get_bit(src, *src_idx + offset));
 
-            let iter = SlicesIterator::new(predicate);
-            iter.for_each(|(start, end)| mutable.extend(0, start, end));
+            // SAFETY: `Vec::iter()` reports its size correctly
+            unsafe { MutableBuffer::from_trusted_len_iter_bool(bits).into() }
+        }
+        FilterIterator::SlicesIterator => {
+            let mut builder =
+                BooleanBufferBuilder::new(bit_util::ceil(predicate.count, 8));
+            for (start, end) in SlicesIterator::new(&predicate.filter) {
+                builder.append_packed_range(start..end, src)
+            }
+            builder.finish()
+        }
+        FilterIterator::Slices(slices) => {
+            let mut builder =
+                BooleanBufferBuilder::new(bit_util::ceil(predicate.count, 8));
+            for (start, end) in slices {
+                builder.append_packed_range(*start..*end, src)
+            }
+            builder.finish()
+        }
+    }
+}
 
-            let data = mutable.freeze();
-            Ok(make_array(data))
+/// `filter` implementation for boolean buffers
+fn filter_boolean(values: &BooleanArray, predicate: &FilterPredicate) -> 
BooleanArray {
+    let data = values.data();
+    assert_eq!(data.buffers().len(), 1);
+    assert_eq!(data.child_data().len(), 0);
+
+    let values = filter_bits(&data.buffers()[0], data.offset(), predicate);
+
+    let mut builder = ArrayDataBuilder::new(DataType::Boolean)
+        .len(predicate.count)
+        .add_buffer(values);
+
+    if let Some((null_count, nulls)) = filter_null_mask(data, predicate) {
+        builder = builder.null_count(null_count).null_bit_buffer(nulls);
+    }
+
+    let data = unsafe { builder.build_unchecked() };
+    BooleanArray::from(data)
+}
+
+/// `filter` implementation for primitive arrays
+fn filter_primitive<T>(
+    values: &PrimitiveArray<T>,
+    predicate: &FilterPredicate,
+) -> PrimitiveArray<T>
+where
+    T: ArrowPrimitiveType,
+{
+    let data = values.data();
+    assert_eq!(data.buffers().len(), 1);
+    assert_eq!(data.child_data().len(), 0);
+
+    let values = data.buffer::<T::Native>(0);
+    assert!(values.len() >= predicate.filter.len());
+
+    let buffer = match &predicate.iterator {
+        FilterIterator::SlicesIterator => {
+            let mut buffer =
+                MutableBuffer::with_capacity(predicate.count * 
T::get_byte_width());
+            for (start, end) in SlicesIterator::new(&predicate.filter) {
+                buffer.extend_from_slice(&values[start..end]);
+            }
+            buffer
         }
+        FilterIterator::Slices(slices) => {
+            let mut buffer =
+                MutableBuffer::with_capacity(predicate.count * 
T::get_byte_width());
+            for (start, end) in slices {
+                buffer.extend_from_slice(&values[*start..*end]);
+            }
+            buffer
+        }
+        FilterIterator::IndexIterator => {
+            let iter =
+                IndexIterator::new(&predicate.filter, predicate.count).map(|x| 
values[x]);
+
+            // SAFETY: IndexIterator is trusted length
+            unsafe { MutableBuffer::from_trusted_len_iter(iter) }
+        }
+        FilterIterator::Indices(indices) => {
+            let iter = indices.iter().map(|x| values[*x]);
+
+            // SAFETY: `Vec::iter` is trusted length
+            unsafe { MutableBuffer::from_trusted_len_iter(iter) }
+        }
+    };
+
+    let mut builder = ArrayDataBuilder::new(data.data_type().clone())
+        .len(predicate.count)
+        .add_buffer(buffer.into());
+
+    if let Some((null_count, nulls)) = filter_null_mask(data, predicate) {
+        builder = builder.null_count(null_count).null_bit_buffer(nulls);
     }
+
+    let data = unsafe { builder.build_unchecked() };
+    PrimitiveArray::from(data)
 }
 
-/// Returns a new [RecordBatch] with arrays containing only values matching 
the filter.
-pub fn filter_record_batch(
-    record_batch: &RecordBatch,
-    predicate: &BooleanArray,
-) -> Result<RecordBatch> {
-    if predicate.null_count() > 0 {
-        // this greatly simplifies subsequent filtering code
-        // now we only have a boolean mask to deal with
-        let predicate = prep_null_mask_filter(predicate);
-        return filter_record_batch(record_batch, &predicate);
+/// [`FilterString`] is created from a source [`GenericStringArray`] and can be
+/// used to build a new [`GenericStringArray`] by copying values from the 
source
+///
+/// TODO(raphael): Could this be used for the take kernel as well?
+struct FilterString<'a, OffsetSize> {
+    src_offsets: &'a [OffsetSize],
+    src_values: &'a [u8],
+    dst_offsets: MutableBuffer,
+    dst_values: MutableBuffer,
+    cur_offset: OffsetSize,
+}
+
+impl<'a, OffsetSize> FilterString<'a, OffsetSize>
+where
+    OffsetSize: Zero + AddAssign + StringOffsetSizeTrait,
+{
+    fn new(capacity: usize, array: &'a GenericStringArray<OffsetSize>) -> Self 
{
+        let bytes_offset = (capacity + 1) * std::mem::size_of::<OffsetSize>();
+        let mut offsets = MutableBuffer::new(bytes_offset);
+        let values = MutableBuffer::new(0);
+        let cur_offset = OffsetSize::zero();
+        offsets.push(cur_offset);
+
+        Self {
+            src_offsets: array.value_offsets(),
+            src_values: &array.data().buffers()[1],
+            dst_offsets: offsets,
+            dst_values: values,
+            cur_offset,
+        }
+    }
+
+    /// Returns the byte offset at `idx`
+    #[inline]
+    fn get_value_offset(&self, idx: usize) -> usize {
+        self.src_offsets[idx].to_usize().expect("illegal offset")
     }
 
-    let num_columns = record_batch.columns().len();
+    /// Returns the start and end of the value at index `idx` along with its 
length
+    #[inline]
+    fn get_value_range(&self, idx: usize) -> (usize, usize, OffsetSize) {
+        // These can only fail if `array` contains invalid data
+        let start = self.get_value_offset(idx);
+        let end = self.get_value_offset(idx + 1);
+        let len = OffsetSize::from_usize(end - start).expect("illegal offset 
range");
+        (start, end, len)
+    }
 
-    let filtered_arrays = match num_columns {
-        1 => {
-            vec![filter(record_batch.columns()[0].as_ref(), predicate)?]
+    /// Extends the in-progress array by the indexes in the provided iterator
+    fn extend_idx(&mut self, iter: impl Iterator<Item = usize>) {
+        for idx in iter {
+            let (start, end, len) = self.get_value_range(idx);
+            self.cur_offset += len;
+            self.dst_offsets.push(self.cur_offset);
+            self.dst_values
+                .extend_from_slice(&self.src_values[start..end]);
         }
-        _ => {
-            let filter = build_filter(predicate)?;
-            record_batch
-                .columns()
-                .iter()
-                .map(|a| make_array(filter(a.data())))
-                .collect()
+    }
+
+    /// Extends the in-progress array by the ranges in the provided iterator
+    fn extend_slices(&mut self, iter: impl Iterator<Item = (usize, usize)>) {
+        for slice in iter {
+            // These can only fail if `array` contains invalid data
+            for idx in slice.0..slice.1 {
+                let (_, _, len) = self.get_value_range(idx);
+                self.cur_offset += len;
+                self.dst_offsets.push(self.cur_offset); // push_unchecked?
+            }
+
+            let start = self.get_value_offset(slice.0);
+            let end = self.get_value_offset(slice.1);
+            self.dst_values
+                .extend_from_slice(&self.src_values[start..end]);
         }
+    }
+}
+
+/// `filter` implementation for string arrays
+///
+/// Note: NULLs with a non-zero slot length in `array` will have the 
corresponding
+/// data copied across. This allows handling the null mask separately from the 
data
+fn filter_string<OffsetSize>(
+    array: &GenericStringArray<OffsetSize>,
+    predicate: &FilterPredicate,
+) -> GenericStringArray<OffsetSize>
+where
+    OffsetSize: Zero + AddAssign + StringOffsetSizeTrait,
+{
+    let data = array.data();
+    assert_eq!(data.buffers().len(), 2);
+    assert_eq!(data.child_data().len(), 0);
+    let mut filter = FilterString::new(predicate.count, array);
+
+    match &predicate.iterator {
+        FilterIterator::SlicesIterator => {
+            filter.extend_slices(SlicesIterator::new(&predicate.filter))
+        }
+        FilterIterator::Slices(slices) => 
filter.extend_slices(slices.iter().cloned()),
+        FilterIterator::IndexIterator => {
+            filter.extend_idx(IndexIterator::new(&predicate.filter, 
predicate.count))
+        }
+        FilterIterator::Indices(indices) => 
filter.extend_idx(indices.iter().cloned()),
+    }
+
+    let mut builder = ArrayDataBuilder::new(data.data_type().clone())
+        .len(predicate.count)
+        .add_buffer(filter.dst_offsets.into())
+        .add_buffer(filter.dst_values.into());
+
+    if let Some((null_count, nulls)) = filter_null_mask(data, predicate) {
+        builder = builder.null_count(null_count).null_bit_buffer(nulls);
+    }
+
+    let data = unsafe { builder.build_unchecked() };
+    GenericStringArray::from(data)
+}
+
+/// `filter` implementation for dictionaries
+fn filter_dict<T>(
+    array: &DictionaryArray<T>,
+    predicate: &FilterPredicate,
+) -> DictionaryArray<T>
+where
+    T: ArrowPrimitiveType,
+    T::Native: num::Num,
+{
+    let filtered_keys = filter_primitive::<T>(array.keys(), predicate);
+    let filtered_data = filtered_keys.data_ref();
+
+    let data = unsafe {
+        ArrayData::new_unchecked(
+            array.data_type().clone(),
+            filtered_keys.len(),

Review comment:
       why not use `filtered_data` `ArrayData` consistently  rather than 
intermixing the array and array data?
   
   ```suggestion
               filtered_data.len(),
   ```
   
   -- not that this is wrong I am just curious




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to