This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 33b86fe02e perf: Cache num_output_rows in sort merge join to avoid 
O(n) recount (#20478)
33b86fe02e is described below

commit 33b86fe02e7bbe63135995c2dbb47bf83c08143c
Author: Andy Grove <[email protected]>
AuthorDate: Wed Feb 25 16:21:48 2026 -0700

    perf: Cache num_output_rows in sort merge join to avoid O(n) recount 
(#20478)
    
    ## Which issue does this PR close?
    
    N/A - performance optimization
    
    ## Rationale for this change
    
    In the SMJ tight loop (`join_partial`), `num_unfrozen_pairs()` was
    called **twice per iteration**: once in the loop guard and once inside
    `append_output_pair`. This method iterates all chunks in
    `output_indices` and sums their lengths — O(num_chunks). Over a full
    batch of `batch_size` iterations, this makes the inner loop O(batch_size
    * num_chunks) instead of O(batch_size).
    
    ## What changes are included in this PR?
    
    Add a `num_output_rows` field to `StreamedBatch` that is incremented on
    each append and reset on freeze, replacing the O(n) summation with an
    O(1) field read.
    
    - Added `num_output_rows: usize` field to `StreamedBatch`, initialized
    to `0`
    - Increment `num_output_rows` in `append_output_pair()` after each
    append
    - `num_output_rows()` now returns the cached field directly
    - Reset to `0` in `freeze_streamed()` when `output_indices` is cleared
    - Removed the `num_unfrozen_pairs` parameter from `append_output_pair()`
    since it can now read `self.num_output_rows` directly
    
    ## Are these changes tested?
    
    Yes — all 48 existing `sort_merge_join` tests pass. This is a pure
    refactor of an internal counter with no behavioral change.
    
    ## Performance
    
    Very minor improvement.
    
    ### Before
    
    ```
    sort_merge_join/inner_1to1/100000
                            time:   [3.8146 ms 3.8229 ms 3.8314 ms]
    sort_merge_join/inner_1to10/100000
                            time:   [16.094 ms 16.125 ms 16.161 ms]
    Found 7 outliers among 100 measurements (7.00%)
      6 (6.00%) high mild
      1 (1.00%) high severe
    sort_merge_join/left_1to1_unmatched/100000
                            time:   [3.7823 ms 3.7861 ms 3.7902 ms]
    Found 4 outliers among 100 measurements (4.00%)
      4 (4.00%) high mild
    sort_merge_join/left_semi_1to10/100000
                            time:   [3.0523 ms 3.0755 ms 3.1023 ms]
    Found 14 outliers among 100 measurements (14.00%)
      3 (3.00%) high mild
      11 (11.00%) high severe
    sort_merge_join/left_anti_partial/100000
                            time:   [3.3458 ms 3.3498 ms 3.3542 ms]
    Found 12 outliers among 100 measurements (12.00%)
      8 (8.00%) high mild
      4 (4.00%) high severe
    ```
    
    ### After
    
    ```
    sort_merge_join/inner_1to1/100000
                            time:   [3.7162 ms 3.7207 ms 3.7254 ms]
                            change: [−4.2320% −3.9309% −3.6431%] (p = 0.00 < 
0.05)
                            Performance has improved.
    Found 4 outliers among 100 measurements (4.00%)
      4 (4.00%) high mild
    sort_merge_join/inner_1to10/100000
                            time:   [15.556 ms 15.589 ms 15.626 ms]
                            change: [−5.2786% −4.8329% −4.4351%] (p = 0.00 < 
0.05)
                            Performance has improved.
    Found 4 outliers among 100 measurements (4.00%)
      1 (1.00%) high mild
      3 (3.00%) high severe
    sort_merge_join/left_1to1_unmatched/100000
                            time:   [3.7059 ms 3.7101 ms 3.7146 ms]
                            change: [−4.4526% −4.1565% −3.8660%] (p = 0.00 < 
0.05)
                            Performance has improved.
    Found 2 outliers among 100 measurements (2.00%)
      2 (2.00%) high mild
    sort_merge_join/left_semi_1to10/100000
                            time:   [3.0832 ms 3.0899 ms 3.0981 ms]
                            change: [−4.0965% −3.4158% −2.7657%] (p = 0.00 < 
0.05)
                            Performance has improved.
    Found 3 outliers among 100 measurements (3.00%)
      1 (1.00%) high mild
      2 (2.00%) high severe
    sort_merge_join/left_anti_partial/100000
                            time:   [3.2963 ms 3.3048 ms 3.3153 ms]
                            change: [−3.9413% −3.5316% −3.0884%] (p = 0.00 < 
0.05)
                            Performance has improved.
    Found 8 outliers among 100 measurements (8.00%)
      3 (3.00%) high mild
      5 (5.00%) high severe
    ```
    
    
    ## Are there any user-facing changes?
    
    No.
    
    🤖 Generated with [Claude Code](https://claude.com/claude-code)
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../src/joins/sort_merge_join/stream.rs            | 29 ++++++++++------------
 1 file changed, 13 insertions(+), 16 deletions(-)

diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
index 11e4a903ac..4dcbe1f647 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
@@ -128,6 +128,8 @@ pub(super) struct StreamedBatch {
     pub join_arrays: Vec<ArrayRef>,
     /// Chunks of indices from buffered side (may be nulls) joined to streamed
     pub output_indices: Vec<StreamedJoinedChunk>,
+    /// Total number of output rows across all chunks in `output_indices`
+    pub num_output_rows: usize,
     /// Index of currently scanned batch from buffered data
     pub buffered_batch_idx: Option<usize>,
     /// Indices that found a match for the given join filter
@@ -144,6 +146,7 @@ impl StreamedBatch {
             idx: 0,
             join_arrays,
             output_indices: vec![],
+            num_output_rows: 0,
             buffered_batch_idx: None,
             join_filter_matched_idxs: HashSet::new(),
         }
@@ -155,6 +158,7 @@ impl StreamedBatch {
             idx: 0,
             join_arrays: vec![],
             output_indices: vec![],
+            num_output_rows: 0,
             buffered_batch_idx: None,
             join_filter_matched_idxs: HashSet::new(),
         }
@@ -162,10 +166,7 @@ impl StreamedBatch {
 
     /// Number of unfrozen output pairs in this streamed batch
     fn num_output_rows(&self) -> usize {
-        self.output_indices
-            .iter()
-            .map(|chunk| chunk.streamed_indices.len())
-            .sum()
+        self.num_output_rows
     }
 
     /// Appends new pair consisting of current streamed index and 
`buffered_idx`
@@ -175,7 +176,6 @@ impl StreamedBatch {
         buffered_batch_idx: Option<usize>,
         buffered_idx: Option<usize>,
         batch_size: usize,
-        num_unfrozen_pairs: usize,
     ) {
         // If no current chunk exists or current chunk is not for current 
buffered batch,
         // create a new chunk
@@ -183,12 +183,13 @@ impl StreamedBatch {
         {
             // Compute capacity only when creating a new chunk (infrequent 
operation).
             // The capacity is the remaining space to reach batch_size.
-            // This should always be >= 1 since we only call this when 
num_unfrozen_pairs < batch_size.
+            // This should always be >= 1 since we only call this when 
num_output_rows < batch_size.
             debug_assert!(
-                batch_size > num_unfrozen_pairs,
-                "batch_size ({batch_size}) must be > num_unfrozen_pairs 
({num_unfrozen_pairs})"
+                batch_size > self.num_output_rows,
+                "batch_size ({batch_size}) must be > num_output_rows ({})",
+                self.num_output_rows
             );
-            let capacity = batch_size - num_unfrozen_pairs;
+            let capacity = batch_size - self.num_output_rows;
             self.output_indices.push(StreamedJoinedChunk {
                 buffered_batch_idx,
                 streamed_indices: UInt64Builder::with_capacity(capacity),
@@ -205,6 +206,7 @@ impl StreamedBatch {
         } else {
             current_chunk.buffered_indices.append_null();
         }
+        self.num_output_rows += 1;
     }
 }
 
@@ -1134,13 +1136,10 @@ impl SortMergeJoinStream {
                 let scanning_idx = self.buffered_data.scanning_idx();
                 if join_streamed {
                     // Join streamed row and buffered row
-                    // Pass batch_size and num_unfrozen_pairs to compute 
capacity only when
-                    // creating a new chunk (when buffered_batch_idx changes), 
not on every iteration.
                     self.streamed_batch.append_output_pair(
                         Some(self.buffered_data.scanning_batch_idx),
                         Some(scanning_idx),
                         self.batch_size,
-                        self.num_unfrozen_pairs(),
                     );
                 } else {
                     // Join nulls and buffered row for FULL join
@@ -1166,13 +1165,10 @@ impl SortMergeJoinStream {
             // For Mark join we store a dummy id to indicate the row has a 
match
             let scanning_idx = mark_row_as_match.then_some(0);
 
-            // Pass batch_size=1 and num_unfrozen_pairs=0 to get capacity of 1,
-            // since we only append a single null-joined pair here (not in a 
loop).
             self.streamed_batch.append_output_pair(
                 scanning_batch_idx,
                 scanning_idx,
-                1,
-                0,
+                self.batch_size,
             );
             self.buffered_data.scanning_finish();
             self.streamed_joined = true;
@@ -1471,6 +1467,7 @@ impl SortMergeJoinStream {
         }
 
         self.streamed_batch.output_indices.clear();
+        self.streamed_batch.num_output_rows = 0;
 
         Ok(())
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to