UBarney commented on code in PR #16443:
URL: https://github.com/apache/datafusion/pull/16443#discussion_r2194337073


##########
datafusion/physical-plan/src/joins/utils.rs:
##########
@@ -843,24 +844,56 @@ pub(crate) fn apply_join_filter_to_indices(
     probe_indices: UInt32Array,
     filter: &JoinFilter,
     build_side: JoinSide,
+    max_intermediate_size: Option<usize>,
 ) -> Result<(UInt64Array, UInt32Array)> {
     if build_indices.is_empty() && probe_indices.is_empty() {
         return Ok((build_indices, probe_indices));
     };
 
-    let intermediate_batch = build_batch_from_indices(
-        filter.schema(),
-        build_input_buffer,
-        probe_batch,
-        &build_indices,
-        &probe_indices,
-        filter.column_indices(),
-        build_side,
-    )?;
-    let filter_result = filter
-        .expression()
-        .evaluate(&intermediate_batch)?
-        .into_array(intermediate_batch.num_rows())?;
+    let filter_result = if let Some(max_size) = max_intermediate_size {
+        let mut filter_results =
+            Vec::with_capacity(build_indices.len().div_ceil(max_size));
+
+        for i in (0..build_indices.len()).step_by(max_size) {
+            let end = min(build_indices.len(), i + max_size);
+            let len = end - i;
+            let intermediate_batch = build_batch_from_indices(
+                filter.schema(),
+                build_input_buffer,
+                probe_batch,
+                &build_indices.slice(i, len),
+                &probe_indices.slice(i, len),
+                filter.column_indices(),
+                build_side,
+            )?;
+            let filter_result = filter
+                .expression()
+                .evaluate(&intermediate_batch)?
+                .into_array(intermediate_batch.num_rows())?;
+            filter_results.push(filter_result);

Review Comment:
    I'm not sure I fully understand your suggestions. Could you please 
elaborate?
   
   > What about executing the filter directly on build/probe indices slice here 
and concatenating the indices later?
   
   I'm a bit confused by this. Do you mean we can avoid constructing the 
`intermediate_batch`? It seems this approach would require rewriting the filter 
logic to work directly on index slices
   
   > and using coalescing kernel
   
   I'm also not sure why the `coalesce` kernel should be used here. The current 
function takes `build_indices` and `probe_indices`, builds an 
`intermediate_batch`, executes the filter, and returns the `(build_indices, 
probe_indices)` that passed the filter.
   
   My understanding is that `coalesce` is used to merge multiple 
`RecordBatch`es with a row count less than a target into  new `RecordBatch`es 
with a row count greater than the target. How would that apply in this 
situation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to