alamb commented on code in PR #8951:
URL: https://github.com/apache/arrow-rs/pull/8951#discussion_r2701054448


##########
arrow-select/src/coalesce/primitive.rs:
##########
@@ -95,6 +96,145 @@ impl<T: ArrowPrimitiveType + Debug> InProgressArray for 
InProgressPrimitiveArray
         Ok(())
     }
 
+    /// Copy rows using a predicate
+    fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), 
ArrowError> {
+        self.ensure_capacity();
+
+        let s = self
+            .source
+            .as_ref()
+            .ok_or_else(|| {
+                ArrowError::InvalidArgumentError(
+                    "Internal Error: InProgressPrimitiveArray: source not 
set".to_string(),
+                )
+            })?
+            .as_primitive::<T>();
+
+        let values = s.values();
+        let count = filter.count();
+
+        // Use the predicate's strategy for optimal iteration
+        match filter.strategy() {
+            IterationStrategy::SlicesIterator => {
+                // Copy values, nulls using slices
+                if let Some(nulls) = s.nulls().filter(|n| n.null_count() > 0) {
+                    for (start, end) in 
SlicesIterator::new(filter.filter_array()) {
+                        // SAFETY: slices are derived from filter predicate
+                        self.current
+                            .extend_from_slice(unsafe { 
values.get_unchecked(start..end) });
+                        let slice = nulls.slice(start, end - start);

Review Comment:
   One thing we could also check is adding a `nulls.append_buffer_sliced` or 
something to avoid having to call "slice" on the buffer (and doing an atomic 
increment each time)



##########
arrow-select/src/coalesce.rs:
##########
@@ -605,6 +714,15 @@ trait InProgressArray: std::fmt::Debug + Send + Sync {
     /// Return an error if the source array is not set
     fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), 
ArrowError>;
 
+    /// Copy rows at the given indices from the current source array into the 
in-progress array
+    fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), 
ArrowError> {
+        // Default implementation: iterate over indices from the filter
+        for idx in IndexIterator::new(filter.filter_array(), filter.count()) {

Review Comment:
   I found it strange that the default implementation copied with a different 
iteration strategy -- Upon review, it seems like this code should never be 
called
   
   Therefore I would recommend making the default implementation `panic`
   
   
   Another potential way to make this clearer would be add a method like 
`supports_copy_rows_by_filter` to avoid having to special case "is primitive" 



##########
arrow-select/src/filter.rs:
##########
@@ -37,9 +37,9 @@ use arrow_schema::*;
 /// [`SlicesIterator`] to copy ranges of values. Otherwise iterate
 /// over individual rows using [`IndexIterator`]
 ///
-/// Threshold of 0.8 chosen based on 
<https://dl.acm.org/doi/abs/10.1145/3465998.3466009>
+/// Threshold of 0.9 chosen based on benchmarking results
 ///
-const FILTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.8;
+const FILTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.9;

Review Comment:
   This change to the filter heuristic seems like we should pull it out into 
its own PR so we can test / make it clear what changed. 



##########
arrow-select/src/coalesce.rs:
##########
@@ -571,6 +788,15 @@ trait InProgressArray: std::fmt::Debug + Send + Sync {
     /// Return an error if the source array is not set
     fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), 
ArrowError>;
 
+    /// Copy rows at the given indices from the current source array into the 
in-progress array
+    fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), 
ArrowError> {
+        // Default implementation: iterate over indices from the filter

Review Comment:
   - https://github.com/apache/arrow-rs/issues/9143



##########
arrow-select/src/filter.rs:
##########
@@ -321,7 +341,13 @@ enum IterationStrategy {
 impl IterationStrategy {
     /// The default [`IterationStrategy`] for a filter of length 
`filter_length`
     /// and selecting `filter_count` rows
-    fn default_strategy(filter_length: usize, filter_count: usize) -> Self {
+    ///
+    /// Returns:
+    /// - [`IterationStrategy::None`] if `filter_count` is 0
+    /// - [`IterationStrategy::All`] if `filter_count == filter_length`
+    /// - [`IterationStrategy::SlicesIterator`] if selectivity > 80%

Review Comment:
   I think you changed the threshold to 90%



##########
arrow-select/src/coalesce.rs:
##########
@@ -238,10 +245,112 @@ impl BatchCoalescer {
         batch: RecordBatch,
         filter: &BooleanArray,
     ) -> Result<(), ArrowError> {
-        // TODO: optimize this to avoid materializing (copying the results
-        // of filter to a new batch)
-        let filtered_batch = filter_record_batch(&batch, filter)?;
-        self.push_batch(filtered_batch)
+        // We only support primitve now, fallback to filter_record_batch for 
other types
+        // Also, skip optimization when filter is not very selective
+        if batch
+            .schema()
+            .fields()
+            .iter()
+            .any(|field| !field.data_type().is_primitive())

Review Comment:
   Yeah I am thinking about that as well



##########
arrow-select/src/coalesce/primitive.rs:
##########
@@ -95,6 +96,145 @@ impl<T: ArrowPrimitiveType + Debug> InProgressArray for 
InProgressPrimitiveArray
         Ok(())
     }
 
+    /// Copy rows using a predicate
+    fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), 
ArrowError> {
+        self.ensure_capacity();
+
+        let s = self
+            .source
+            .as_ref()
+            .ok_or_else(|| {
+                ArrowError::InvalidArgumentError(
+                    "Internal Error: InProgressPrimitiveArray: source not 
set".to_string(),
+                )
+            })?
+            .as_primitive::<T>();
+
+        let values = s.values();
+        let count = filter.count();
+
+        // Use the predicate's strategy for optimal iteration
+        match filter.strategy() {
+            IterationStrategy::SlicesIterator => {
+                // Copy values, nulls using slices
+                if let Some(nulls) = s.nulls().filter(|n| n.null_count() > 0) {

Review Comment:
   I don't know if it would matter, but one difference with the filter kernel 
is that the filter kernel handles values in one loop and then nulls in a second 
(`filter_bits`).  (as in a single loop that copies the values and then a second 
loop/iterator that copies the nulls, if any)
   
   Doing so would keep the inner loop smaller and make it easier to reuse the 
null filtering code between kernels 
   
   However, this is also something we can do as a follow on PR / refactor when 
we add a second array type
   
   



##########
arrow-select/src/coalesce.rs:
##########
@@ -238,10 +245,112 @@ impl BatchCoalescer {
         batch: RecordBatch,
         filter: &BooleanArray,
     ) -> Result<(), ArrowError> {
-        // TODO: optimize this to avoid materializing (copying the results
-        // of filter to a new batch)
-        let filtered_batch = filter_record_batch(&batch, filter)?;
-        self.push_batch(filtered_batch)
+        // We only support primitve now, fallback to filter_record_batch for 
other types
+        // Also, skip optimization when filter is not very selective
+        if batch
+            .schema()
+            .fields()
+            .iter()
+            .any(|field| !field.data_type().is_primitive())
+            || self
+                .biggest_coalesce_batch_size
+                .map(|biggest_size| filter.true_count() > biggest_size)
+                .unwrap_or(false)
+        {
+            let batch = filter_record_batch(&batch, filter)?;
+
+            self.push_batch(batch)?;
+            return Ok(());
+        }
+
+        // Build an optimized filter predicate that chooses the best iteration 
strategy
+        let is_optimize_beneficial = 
is_optimize_beneficial_record_batch(&batch);
+        let selected_count = filter.true_count();
+
+        // Fast path: skip if no rows selected
+        if selected_count == 0 {
+            return Ok(());
+        }
+
+        // Fast path: if all rows selected, just push the batch
+        if selected_count == batch.num_rows() {
+            return self.push_batch(batch);
+        }
+
+        let (_schema, arrays, _num_rows) = batch.into_parts();
+
+        // Setup input arrays as sources
+        assert_eq!(arrays.len(), self.in_progress_arrays.len());
+        self.in_progress_arrays
+            .iter_mut()
+            .zip(&arrays)
+            .for_each(|(in_progress, array)| {
+                in_progress.set_source(Some(Arc::clone(array)));
+            });

Review Comment:
   I think you can avoid this Arc::clone (probably not a big deal but every 
little bit helps)
   
   ```suggestion
          self.in_progress_arrays
               .iter_mut()
               .zip(arrays)
               .for_each(|(in_progress, array)| {
                   in_progress.set_source(Some(array));
               });
   ```



##########
arrow-select/src/coalesce.rs:
##########
@@ -237,10 +243,101 @@ impl BatchCoalescer {
         batch: RecordBatch,
         filter: &BooleanArray,
     ) -> Result<(), ArrowError> {
-        // TODO: optimize this to avoid materializing (copying the results
-        // of filter to a new batch)
-        let filtered_batch = filter_record_batch(&batch, filter)?;
-        self.push_batch(filtered_batch)
+        // We only support primitve now, fallback to filter_record_batch for 
other types
+        // Also, skip optimization when filter is not very selective

Review Comment:
   Yeah, I do think it would be good to take into account 
`biggest_coalesce_batch_size` but maybe we can do so as a follow on PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to