alihan-synnada commented on code in PR #12531: URL: https://github.com/apache/datafusion/pull/12531#discussion_r1768251737
########## datafusion/physical-plan/src/joins/nested_loop_join.rs: ########## @@ -456,21 +458,72 @@ struct NestedLoopJoinStream { // null_equals_null: bool /// Join execution metrics join_metrics: BuildProbeJoinMetrics, + /// Cache for join indices calculations + indices_cache: (UInt64Array, UInt32Array), } +/// Creates a Cartesian product of two input batches, preserving the order of the right batch, +/// and applying a join filter if provided. +/// +/// # Example +/// Input: +/// left = [0, 1], right = [0, 1, 2] +/// +/// Output: +/// left_indices = [0, 1, 0, 1, 0, 1], right_indices = [0, 0, 1, 1, 2, 2] +/// +/// Input: +/// left = [0, 1, 2], right = [0, 1, 2, 3], filter = left.a != right.a +/// +/// Output: +/// left_indices = [1, 2, 0, 2, 0, 1, 0, 1, 2], right_indices = [0, 0, 1, 1, 2, 2, 3, 3, 3] fn build_join_indices( - right_row_index: usize, left_batch: &RecordBatch, right_batch: &RecordBatch, filter: Option<&JoinFilter>, + indices_cache: &mut (UInt64Array, UInt32Array), ) -> Result<(UInt64Array, UInt32Array)> { - // left indices: [0, 1, 2, 3, 4, ..., left_row_count] - // right indices: [right_index, right_index, ..., right_index] - let left_row_count = left_batch.num_rows(); - let left_indices = UInt64Array::from_iter_values(0..(left_row_count as u64)); - let right_indices = UInt32Array::from(vec![right_row_index as u32; left_row_count]); - // in the nested loop join, the filter can contain non-equal and equal condition. + let right_row_count = right_batch.num_rows(); + let output_row_count = left_row_count * right_row_count; + + // We always use the same indices before applying the filter, so we can cache them + let (left_indices_cache, right_indices_cache) = indices_cache; + let cached_output_row_count = left_indices_cache.len(); Review Comment: The chunks approach didn't change the performance, but it helped reduce the sizes of the intermediate batches. The 10% performance hit without a cache comes from the way the arrays are constructed and I couldn't find a faster approach for now. I suggest we go with the cached approach for now. When the issue that enables NLJ to emit massive batches is implemented, we can choose between the cached and chunked approaches depending on NLJ's output size. I'll open an issue about it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org