pepijnve commented on code in PR #18152:
URL: https://github.com/apache/datafusion/pull/18152#discussion_r2463854008


##########
datafusion/physical-expr/src/expressions/case.rs:
##########
@@ -122,6 +124,327 @@ fn is_cheap_and_infallible(expr: &Arc<dyn PhysicalExpr>) 
-> bool {
     expr.as_any().is::<Column>()
 }
 
+/// Creates a [FilterPredicate] from a boolean array.
+fn create_filter(predicate: &BooleanArray) -> FilterPredicate {
+    let mut filter_builder = FilterBuilder::new(predicate);
+    // Always optimize the filter since we use them multiple times.
+    filter_builder = filter_builder.optimize();
+    filter_builder.build()
+}
+
+// This should be removed when https://github.com/apache/arrow-rs/pull/8693
+// is merged and becomes available.
+fn filter_record_batch(
+    record_batch: &RecordBatch,
+    filter: &FilterPredicate,
+) -> std::result::Result<RecordBatch, ArrowError> {
+    let filtered_columns = record_batch
+        .columns()
+        .iter()
+        .map(|a| filter_array(a, filter))
+        .collect::<std::result::Result<Vec<_>, _>>()?;
+    // SAFETY: since we start from a valid RecordBatch, there's no need to 
revalidate the schema
+    // since the set of columns has not changed.
+    // The input column arrays all had the same length (since they're coming 
from a valid RecordBatch)
+    // and the filtering them with the same filter will produces a new set of 
arrays with identical
+    // lengths.
+    unsafe {
+        Ok(RecordBatch::new_unchecked(
+            record_batch.schema(),
+            filtered_columns,
+            filter.count(),
+        ))
+    }
+}
+
+// This function exists purely to be able to use the same call style
+// for `filter_record_batch` and `filter_array` at the point of use.
+// When https://github.com/apache/arrow-rs/pull/8693 is available, replace
+// both with method calls on `FilterPredicate`.
+#[inline(always)]
+fn filter_array(
+    array: &dyn Array,
+    filter: &FilterPredicate,
+) -> std::result::Result<ArrayRef, ArrowError> {
+    filter.filter(array)
+}
+
+const MERGE_NULL_MARKER: usize = usize::MAX;
+
+/// Merges elements by index from a list of [`ArrayData`], creating a new 
[`ColumnarValue`] from
+/// those values.
+///
+/// Each element in `indices` is the index of an array in `values`. The 
`indices` array is processed
+/// sequentially. The first occurrence of index value `n` will be mapped to 
the first
+/// value of the array at index `n`. The second occurrence to the second 
value, and so on.
+/// An index value of `usize::MAX` is used to indicate null values.
+///
+/// # Implementation notes
+///
+/// This algorithm is similar in nature to both `zip` and `interleave`, but 
there are some important
+/// differences.
+///
+/// In contrast to `zip`, this function supports multiple input arrays. 
Instead of a boolean
+/// selection vector, an index array is to take values from the input arrays, 
and a special marker
+/// value is used to indicate null values.
+///
+/// In contrast to `interleave`, this function does not use pairs of indices. 
The values in
+/// `indices` serve the same purpose as the first value in the pairs passed to 
`interleave`.
+/// The index in the array is implicit and is derived from the number of times 
a particular array
+/// index occurs.
+/// The more constrained indexing mechanism used by this algorithm makes it 
easier to copy values
+/// in contiguous slices. In the example below, the two subsequent elements 
from array `2` can be
+/// copied in a single operation from the source array instead of copying them 
one by one.
+/// Long spans of null values are also especially cheap because they do not 
need to be represented
+/// in an input array.
+///
+/// # Safety
+///
+/// This function does not check that the number of occurrences of any 
particular array index matches
+/// the length of the corresponding input array. If an array contains more 
values than required, the
+/// spurious values will be ignored. If an array contains fewer values than 
necessary, this function
+/// will panic.
+///
+/// # Example
+///
+/// ```text
+/// ┌───────────┐  ┌─────────┐                             ┌─────────┐
+/// │┌─────────┐│  │   MAX   │                             │   NULL  │
+/// ││    A    ││  ├─────────┤                             ├─────────┤
+/// │└─────────┘│  │    1    │                             │    B    │
+/// │┌─────────┐│  ├─────────┤                             ├─────────┤
+/// ││    B    ││  │    0    │    merge(values, indices)   │    A    │
+/// │└─────────┘│  ├─────────┤  ─────────────────────────▶ ├─────────┤
+/// │┌─────────┐│  │   MAX   │                             │   NULL  │
+/// ││    C    ││  ├─────────┤                             ├─────────┤
+/// │├─────────┤│  │    2    │                             │    C    │
+/// ││    D    ││  ├─────────┤                             ├─────────┤
+/// │└─────────┘│  │    2    │                             │    D    │
+/// └───────────┘  └─────────┘                             └─────────┘
+///    values        indices                                   result
+///
+/// ```
+fn merge(values: &[ArrayData], indices: &[usize]) -> Result<ArrayRef> {
+    let data_refs = values.iter().collect();
+    let mut mutable = MutableArrayData::new(data_refs, true, indices.len());
+
+    // This loop extends the mutable array by taking slices from the partial 
results.
+    //
+    // take_offsets keeps track of how many values have been taken from each 
array.
+    let mut take_offsets = vec![0; values.len() + 1];
+    let mut start_row_ix = 0;
+    loop {
+        let array_ix = indices[start_row_ix];
+
+        // Determine the length of the slice to take.
+        let mut end_row_ix = start_row_ix + 1;
+        while end_row_ix < indices.len() && indices[end_row_ix] == array_ix {
+            end_row_ix += 1;
+        }
+        let slice_length = end_row_ix - start_row_ix;
+
+        // Extend mutable with either nulls or with values from the array.
+        if array_ix == MERGE_NULL_MARKER {

Review Comment:
   > other than using 16 bytes instead of 8
   
   That's the reason. Creating the index arrays ended up showing up as a 
hotspot during profiling. For the typical batch size of 8192, that's an 
allocation of either 64k or 128k per invocation. Writing that now, I'm 
wondering if it should be even smaller than `usize`. `u32` for case branches 
should be more than enough.
   
   I'll see about adding an enum to plug the small error risk. Still thinking 
in C...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to