This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d078d8c11 Fix SortMergeJoin antijoin flaky condition (#11604)
7d078d8c11 is described below

commit 7d078d8c11155fd098595126b1ed60cad9afce5a
Author: Oleks V <[email protected]>
AuthorDate: Mon Jul 22 11:10:53 2024 -0700

    Fix SortMergeJoin antijoin flaky condition (#11604)
---
 datafusion/physical-plan/src/joins/sort_merge_join.rs | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 5fde028c7f..96d5ba728a 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -1681,22 +1681,25 @@ fn get_filtered_join_mask(
         JoinType::LeftAnti => {
             // have we seen a filter match for a streaming index before
             for i in 0..streamed_indices_length {
-                if mask.value(i) && !seen_as_true {
+                let streamed_idx = streamed_indices.value(i);
+                if mask.value(i)
+                    && !seen_as_true
+                    && !matched_indices.contains(&streamed_idx)
+                {
                     seen_as_true = true;
-                    filter_matched_indices.push(streamed_indices.value(i));
+                    filter_matched_indices.push(streamed_idx);
                 }
 
                 // Reset `seen_as_true` flag and calculate mask for the 
current streaming index
                 // - if within the batch it switched to next streaming 
index(e.g. from 0 to 1, or from 1 to 2)
                 // - if it is at the end of the all buffered batches for the 
given streaming index, 0 index comes last
                 if (i < streamed_indices_length - 1
-                    && streamed_indices.value(i) != streamed_indices.value(i + 
1))
+                    && streamed_idx != streamed_indices.value(i + 1))
                     || (i == streamed_indices_length - 1
                         && *scanning_buffered_offset == 0)
                 {
                     corrected_mask.append_value(
-                        !matched_indices.contains(&streamed_indices.value(i))
-                            && !seen_as_true,
+                        !matched_indices.contains(&streamed_idx) && 
!seen_as_true,
                     );
                     seen_as_true = false;
                 } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to