stuhood commented on code in PR #20806:
URL: https://github.com/apache/datafusion/pull/20806#discussion_r2908102325


##########
datafusion/physical-plan/src/joins/semi_anti_sort_merge_join/stream.rs:
##########
@@ -0,0 +1,1164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Stream implementation for semi/anti sort-merge joins.
+
+use std::cmp::Ordering;
+use std::fs::File;
+use std::io::BufReader;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use crate::RecordBatchStream;
+use crate::joins::utils::{JoinFilter, compare_join_arrays};
+use crate::metrics::{
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder,
+};
+use crate::spill::spill_manager::SpillManager;
+use arrow::array::{Array, ArrayRef, BooleanArray, BooleanBufferBuilder, 
RecordBatch};
+use arrow::compute::{BatchCoalescer, SortOptions, filter_record_batch, not};
+use arrow::datatypes::SchemaRef;
+use arrow::ipc::reader::StreamReader;
+use arrow::util::bit_chunk_iterator::UnalignedBitChunk;
+use arrow::util::bit_util::apply_bitwise_binary_op;
+use datafusion_common::{
+    JoinSide, JoinType, NullEquality, Result, ScalarValue, internal_err,
+};
+use datafusion_execution::SendableRecordBatchStream;
+use datafusion_execution::disk_manager::RefCountedTempFile;
+use datafusion_execution::memory_pool::MemoryReservation;
+use datafusion_physical_expr_common::physical_expr::PhysicalExprRef;
+
+use futures::{Stream, StreamExt, ready};
+
+/// Evaluates join key expressions against a batch, returning one array per 
key.
+fn evaluate_join_keys(
+    batch: &RecordBatch,
+    on: &[PhysicalExprRef],
+) -> Result<Vec<ArrayRef>> {
+    on.iter()
+        .map(|expr| {
+            let num_rows = batch.num_rows();
+            let val = expr.evaluate(batch)?;
+            val.into_array(num_rows)
+        })
+        .collect()
+}
+
+/// Find the first index in `key_arrays` starting from `from` where the key
+/// differs from the key at `from`. Uses `compare_join_arrays` for zero-alloc
+/// ordinal comparison.
+///
+/// Optimized for join workloads: checks adjacent and boundary keys before
+/// falling back to binary search, since most key groups are small (often 1).
+fn find_key_group_end(
+    key_arrays: &[ArrayRef],
+    from: usize,
+    len: usize,
+    sort_options: &[SortOptions],
+    null_equality: NullEquality,
+) -> Result<usize> {
+    let next = from + 1;
+    if next >= len {
+        return Ok(len);
+    }
+
+    // Fast path: single-row group (common with unique keys).
+    if compare_join_arrays(
+        key_arrays,
+        from,
+        key_arrays,
+        next,
+        sort_options,
+        null_equality,
+    )? != Ordering::Equal
+    {
+        return Ok(next);
+    }
+
+    // Check if the entire remaining batch shares this key.
+    let last = len - 1;
+    if compare_join_arrays(
+        key_arrays,
+        from,
+        key_arrays,
+        last,
+        sort_options,
+        null_equality,
+    )? == Ordering::Equal
+    {
+        return Ok(len);
+    }

Review Comment:
   I believe that these cases are equivalent, and so you can just have the 
"Check if the entire remaining batch shares this key." case?



##########
datafusion/physical-plan/src/joins/semi_anti_sort_merge_join/stream.rs:
##########
@@ -0,0 +1,1164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Stream implementation for semi/anti sort-merge joins.
+
+use std::cmp::Ordering;
+use std::fs::File;
+use std::io::BufReader;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use crate::RecordBatchStream;
+use crate::joins::utils::{JoinFilter, compare_join_arrays};
+use crate::metrics::{
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder,
+};
+use crate::spill::spill_manager::SpillManager;
+use arrow::array::{Array, ArrayRef, BooleanArray, BooleanBufferBuilder, 
RecordBatch};
+use arrow::compute::{BatchCoalescer, SortOptions, filter_record_batch, not};
+use arrow::datatypes::SchemaRef;
+use arrow::ipc::reader::StreamReader;
+use arrow::util::bit_chunk_iterator::UnalignedBitChunk;
+use arrow::util::bit_util::apply_bitwise_binary_op;
+use datafusion_common::{
+    JoinSide, JoinType, NullEquality, Result, ScalarValue, internal_err,
+};
+use datafusion_execution::SendableRecordBatchStream;
+use datafusion_execution::disk_manager::RefCountedTempFile;
+use datafusion_execution::memory_pool::MemoryReservation;
+use datafusion_physical_expr_common::physical_expr::PhysicalExprRef;
+
+use futures::{Stream, StreamExt, ready};
+
+/// Evaluates join key expressions against a batch, returning one array per 
key.
+fn evaluate_join_keys(
+    batch: &RecordBatch,
+    on: &[PhysicalExprRef],
+) -> Result<Vec<ArrayRef>> {
+    on.iter()
+        .map(|expr| {
+            let num_rows = batch.num_rows();
+            let val = expr.evaluate(batch)?;
+            val.into_array(num_rows)
+        })
+        .collect()
+}
+
+/// Find the first index in `key_arrays` starting from `from` where the key
+/// differs from the key at `from`. Uses `compare_join_arrays` for zero-alloc
+/// ordinal comparison.
+///
+/// Optimized for join workloads: checks adjacent and boundary keys before
+/// falling back to binary search, since most key groups are small (often 1).
+fn find_key_group_end(
+    key_arrays: &[ArrayRef],
+    from: usize,
+    len: usize,
+    sort_options: &[SortOptions],
+    null_equality: NullEquality,
+) -> Result<usize> {
+    let next = from + 1;
+    if next >= len {
+        return Ok(len);
+    }
+
+    // Fast path: single-row group (common with unique keys).
+    if compare_join_arrays(
+        key_arrays,
+        from,
+        key_arrays,
+        next,
+        sort_options,
+        null_equality,
+    )? != Ordering::Equal
+    {
+        return Ok(next);
+    }
+
+    // Check if the entire remaining batch shares this key.
+    let last = len - 1;
+    if compare_join_arrays(
+        key_arrays,
+        from,
+        key_arrays,
+        last,
+        sort_options,
+        null_equality,
+    )? == Ordering::Equal
+    {
+        return Ok(len);
+    }
+
+    // Binary search the interior: key at `next` matches, key at `last` 
doesn't.
+    let mut lo = next + 1;
+    let mut hi = last;
+    while lo < hi {
+        let mid = lo + (hi - lo) / 2;
+        if compare_join_arrays(
+            key_arrays,
+            from,
+            key_arrays,
+            mid,
+            sort_options,
+            null_equality,
+        )? == Ordering::Equal
+        {
+            lo = mid + 1;
+        } else {
+            hi = mid;
+        }
+    }
+    Ok(lo)
+}
+
+/// When an outer key group spans a batch boundary, the boundary loop emits
+/// the current batch, then polls for the next. If that poll returns Pending,
+/// `ready!` exits `poll_join` and we re-enter from the top on the next call.
+/// Without this state, the new batch would be processed fresh by the
+/// merge-scan — but inner already advanced past this key, so the matching
+/// outer rows would be skipped via `Ordering::Less` and never marked.
+///
+/// This enum carries the last key (as single-row sliced arrays) from the
+/// previous batch so we can check whether the next batch continues the same
+/// key group. Stored as `Option<PendingBoundary>`: `None` means normal
+/// processing.
+#[derive(Debug)]
+enum PendingBoundary {
+    /// Resuming a no-filter boundary loop.
+    NoFilter { saved_keys: Vec<ArrayRef> },
+    /// Resuming a filtered boundary loop. Inner key data remains in the
+    /// buffer (or spill file) for the resumed loop.
+    Filtered { saved_keys: Vec<ArrayRef> },
+}
+
+pub(super) struct SemiAntiSortMergeJoinStream {
+    /// true for semi (emit matched), false for anti (emit unmatched)
+    is_semi: bool,
+
+    // Input streams — "outer" is the streamed side whose rows we output,
+    // "inner" is the buffered side we match against.
+    outer: SendableRecordBatchStream,
+    inner: SendableRecordBatchStream,

Review Comment:
   Nit: worth calling them `streamed` vs `buffered` to reduce the cognitive 
load for readers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to