2010YOUY01 commented on code in PR #16996:
URL: https://github.com/apache/datafusion/pull/16996#discussion_r2253750647


##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -660,529 +684,1048 @@ async fn collect_left_input(
     ))
 }
 
-/// This enumeration represents various states of the nested loop join 
algorithm.
-#[derive(Debug, Clone)]
-enum NestedLoopJoinStreamState {
-    /// The initial state, indicating that build-side data not collected yet
-    WaitBuildSide,
-    /// Indicates that build-side has been collected, and stream is ready for
-    /// fetching probe-side
-    FetchProbeBatch,
-    /// Indicates that a non-empty batch has been fetched from probe-side, and
-    /// is ready to be processed
-    ProcessProbeBatch(RecordBatch),
-    /// Preparation phase: Gathers the indices of unmatched rows from the 
build-side.
-    /// This state is entered for join types that emit unmatched build-side 
rows
-    /// (e.g., LEFT and FULL joins) after the entire probe-side input has been 
consumed.
-    PrepareUnmatchedBuildRows,
-    /// Output unmatched build-side rows.
-    /// The indices for rows to output has already been calculated in the 
previous
-    /// `PrepareUnmatchedBuildRows` state. In this state the final batch will 
be materialized incrementally.
-    // The inner `RecordBatch` is an empty dummy batch used to get right 
schema.
-    OutputUnmatchedBuildRows(RecordBatch),
-    /// Indicates that NestedLoopJoinStream execution is completed
-    Completed,
-}
-
-impl NestedLoopJoinStreamState {
-    /// Tries to extract a `ProcessProbeBatchState` from the
-    /// `NestedLoopJoinStreamState` enum. Returns an error if state is not
-    /// `ProcessProbeBatchState`.
-    fn try_as_process_probe_batch(&mut self) -> Result<&RecordBatch> {
-        match self {
-            NestedLoopJoinStreamState::ProcessProbeBatch(state) => Ok(state),
-            _ => internal_err!("Expected join stream in ProcessProbeBatch 
state"),
-        }
-    }
-}
-
-/// Tracks incremental output of join result batches.
-///
-/// Initialized with all matching pairs that satisfy the join predicate.
-/// Pairs are stored as indices in `build_indices` and `probe_indices`
-/// Each poll outputs a batch within the configured size limit and updates
-/// processed_count until all pairs are consumed.
-///
-/// Example: 5000 matches, batch size limit is 100
-/// - Poll 1: output batch[0..100], processed_count = 100  
-/// - Poll 2: output batch[100..200], processed_count = 200
-/// - ...continues until processed_count = 5000
-struct JoinResultProgress {
-    /// Row indices from build-side table (left table).
-    build_indices: PrimitiveArray<UInt64Type>,
-    /// Row indices from probe-side table (right table).
-    probe_indices: PrimitiveArray<UInt32Type>,
-    /// Number of index pairs already processed into output batches.
-    /// We have completed join result for indices [0..processed_count).
-    processed_count: usize,
+/// States for join processing. See `poll_next()` comment for more details 
about
+/// state transitions.
+#[derive(Debug, Clone, Copy)]
+enum NLJState {
+    BufferingLeft,
+    FetchingRight,
+    ProbeRight,
+    EmitRightUnmatched,
+    EmitLeftUnmatched,
+    Done,
 }
-
-/// A stream that issues [RecordBatch]es as they arrive from the right  of the 
join.
-struct NestedLoopJoinStream {
-    /// Input schema
-    schema: Arc<Schema>,
+pub(crate) struct NLJStream {
+    // ========================================================================
+    // PROPERTIES:
+    // Operator's properties that remain constant
+    //
+    // Note: The implementation uses the terms left/inner/build-side table and
+    // right/outer/probe-side table interchangeably.
+    // ========================================================================
+    /// Output schema
+    pub(crate) output_schema: Arc<Schema>,
     /// join filter
-    filter: Option<JoinFilter>,
+    pub(crate) join_filter: Option<JoinFilter>,
     /// type of the join
-    join_type: JoinType,
-    /// the outer table data of the nested loop join
-    outer_table: SendableRecordBatchStream,
-    /// the inner table data of the nested loop join
-    inner_table: OnceFut<JoinLeftData>,
-    /// Information of index and left / right placement of columns
-    column_indices: Vec<ColumnIndex>,
-    // TODO: support null aware equal
-    // null_equality: NullEquality,
+    pub(crate) join_type: JoinType,
+    /// the outer(right) table data of the nested loop join
+    pub(crate) outer_table: SendableRecordBatchStream,
+    /// the inner(left) table data of the nested loop join
+    pub(crate) inner_table: OnceFut<JoinLeftData>,
+    /// Projection to construct the output schema from the left and right 
tables.
+    /// Example:
+    /// - output_schema: ['a', 'c']
+    /// - left_schema: ['a', 'b']
+    /// - right_schema: ['c']
+    ///
+    /// The column indices would be [(left, 0), (right, 0)] -- taking the left
+    /// 0th column and right 0th column can construct the output schema.
+    ///
+    /// Note there are other columns ('b' in the example) still kept after
+    /// projection pushdown; this is because they might be used to evaluate
+    /// the join filter (e.g., `JOIN ON (b+c)>0`).
+    pub(crate) column_indices: Vec<ColumnIndex>,
     /// Join execution metrics
-    join_metrics: BuildProbeJoinMetrics,
-    /// Cache for join indices calculations
-    indices_cache: (UInt64Array, UInt32Array),
-    /// Whether the right side is ordered
-    right_side_ordered: bool,
-    /// Current state of the stream
-    state: NestedLoopJoinStreamState,
-    /// Result of the left data future
-    left_data: Option<Arc<JoinLeftData>>,
-
-    /// Tracks progress when building join result batches incrementally.
-    join_result_status: Option<JoinResultProgress>,
-
-    intermediate_batch_size: usize,
+    pub(crate) join_metrics: BuildProbeJoinMetrics,
+
+    /// `batch_size` from configuration
+    cfg_batch_size: usize,
+
+    /// Should we use a bitmap to track each incoming right batch's each row's
+    /// 'joined' status.
+    /// For example in right joins, we have to use a bit map to track matched
+    /// right side rows, and later enter a `EmitRightUnmatched` stage to emit
+    /// unmatched right rows.
+    should_track_unmatched_right: bool,
+
+    // ========================================================================
+    // STATE FLAGS/BUFFERS:
+    // Fields that hold intermediate data/flags during execution
+    // ========================================================================
+    /// State Tracking
+    state: NLJState,
+    /// Output buffer holds the join result to output. It will emit eagerly 
when
+    /// the threshold is reached.
+    output_buffer: Box<BatchCoalescer>,
+    /// See comments in [`NLJState::Done`] for its purpose
+    handled_empty_output: bool,
+
+    // Buffer(left) side
+    // -----------------
+    /// The current buffered left data to join
+    buffered_left_data: Option<Arc<JoinLeftData>>,
+    /// Index into the left buffered batch. Used in `ProbeRight` state
+    l_probe_idx: usize,
+    /// Index into the left buffered batch. Used in `EmitLeftUnmatched` state
+    l_emit_idx: usize,
+    /// Should we go back to `BufferingLeft` state again after 
`EmitLeftUnmatched`
+    /// state is over.
+    left_exhausted: bool,
+    /// If we can buffer all left data in one pass
+    /// TODO(now): this is for the (unimplemented) memory-limited execution
+    #[allow(dead_code)]
+    left_buffered_in_one_pass: bool,
+
+    // Probe(right) side
+    // -----------------
+    /// The current probe batch to process
+    current_right_batch: Option<RecordBatch>,
+    // For right join, keep track of matched rows in `current_right_batch`
+    // Constructed when fetching each new incoming right batch in 
`FetchingRight` state.
+    current_right_batch_matched: Option<BooleanArray>,
 }
 
-/// Creates a Cartesian product of two input batches, preserving the order of 
the right batch,
-/// and applying a join filter if provided.
-///
-/// # Example
-/// Input:
-/// left = [0, 1], right = [0, 1, 2]
-///
-/// Output:
-/// left_indices = [0, 1, 0, 1, 0, 1], right_indices = [0, 0, 1, 1, 2, 2]
-///
-/// Input:
-/// left = [0, 1, 2], right = [0, 1, 2, 3], filter = left.a != right.a
-///
-/// Output:
-/// left_indices = [1, 2, 0, 2, 0, 1, 0, 1, 2], right_indices = [0, 0, 1, 1, 
2, 2, 3, 3, 3]
-fn build_join_indices(
-    left_batch: &RecordBatch,
-    right_batch: &RecordBatch,
-    filter: Option<&JoinFilter>,
-    indices_cache: &mut (UInt64Array, UInt32Array),
-    max_intermediate_batch_size: usize,
-) -> Result<(UInt64Array, UInt32Array)> {
-    let left_row_count = left_batch.num_rows();
-    let right_row_count = right_batch.num_rows();
-    let output_row_count = left_row_count * right_row_count;
-
-    // We always use the same indices before applying the filter, so we can 
cache them
-    let (left_indices_cache, right_indices_cache) = indices_cache;
-    let cached_output_row_count = left_indices_cache.len();
-
-    let (left_indices, right_indices) =
-        match output_row_count.cmp(&cached_output_row_count) {
-            std::cmp::Ordering::Equal => {
-                // Reuse the cached indices
-                (left_indices_cache.clone(), right_indices_cache.clone())
-            }
-            std::cmp::Ordering::Less => {
-                // Left_row_count never changes because it's the build side. 
The changes to the
-                // right_row_count can be handled trivially by taking the 
first output_row_count
-                // elements of the cache because of how the indices are 
generated.
-                // (See the Ordering::Greater match arm)
-                (
-                    left_indices_cache.slice(0, output_row_count),
-                    right_indices_cache.slice(0, output_row_count),
-                )
-            }
-            std::cmp::Ordering::Greater => {
-                // Rebuild the indices cache
-
-                // Produces 0, 1, 2, 0, 1, 2, 0, 1, 2, ...
-                *left_indices_cache = UInt64Array::from_iter_values(
-                    (0..output_row_count as u64).map(|i| i % left_row_count as 
u64),
-                );
-
-                // Produces 0, 0, 0, 1, 1, 1, 2, 2, 2, ...
-                *right_indices_cache = UInt32Array::from_iter_values(
-                    (0..output_row_count as u32).map(|i| i / left_row_count as 
u32),
-                );
-
-                (left_indices_cache.clone(), right_indices_cache.clone())
-            }
-        };
-
-    if let Some(filter) = filter {
-        apply_join_filter_to_indices(
-            left_batch,
-            right_batch,
-            left_indices,
-            right_indices,
-            filter,
-            JoinSide::Left,
-            Some(max_intermediate_batch_size),
-        )
-    } else {
-        Ok((left_indices, right_indices))
-    }
-}
+impl Stream for NLJStream {
+    type Item = Result<RecordBatch>;
 
-impl NestedLoopJoinStream {
-    fn poll_next_impl(
-        &mut self,
+    /// See the comments [`NestedLoopJoinExec`] for high-level design ideas.
+    ///
+    /// # Implementation
+    ///
+    /// This function is the entry point of NLJ operator's state machine
+    /// transitions. The rough state transition graph is as follow, for more
+    /// details see the comment in each state's matching arm.
+    ///
+    /// ============================
+    /// State transition graph:
+    /// ============================
+    ///
+    /// (start) --> BufferingLeft
+    /// ----------------------------
+    /// BufferingLeft → FetchingRight
+    ///
+    /// FetchingRight → ProbeRight (if right batch available)
+    /// FetchingRight → EmitLeftUnmatched (if right exhausted)
+    ///
+    /// ProbeRight → ProbeRight (next left row or after yielding output)
+    /// ProbeRight → EmitRightUnmatched (for special join types like right 
join)
+    /// ProbeRight → FetchingRight (done with the current right batch)
+    ///
+    /// EmitRightUnmatched → FetchingRight
+    ///
+    /// EmitLeftUnmatched → EmitLeftUnmatched (only process 1 chunk for each
+    /// iteration)
+    /// EmitLeftUnmatched → Done (if finished)
+    /// ----------------------------
+    /// Done → (end)
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
-    ) -> Poll<Option<Result<RecordBatch>>> {
+    ) -> Poll<Option<Self::Item>> {
         loop {
-            return match self.state {
-                NestedLoopJoinStreamState::WaitBuildSide => {
-                    handle_state!(ready!(self.collect_build_side(cx)))
+            match self.state {
+                // # NLJState transitions
+                // --> FetchingRight
+                // This state will prepare the left side batches, next state
+                // `FetchingRight` is responsible for preparing a single probe
+                // side batch, before start joining.
+                NLJState::BufferingLeft => {
+                    debug!("[NLJState] Entering: {:?}", self.state);
+                    match ready!(self.inner_table.get_shared(cx)) {
+                        Ok(left_data) => {
+                            self.buffered_left_data = Some(left_data);
+                            // TODO: implement memory-limited case
+                            self.left_exhausted = true;
+                            self.state = NLJState::FetchingRight;
+                            continue;
+                        }
+                        Err(e) => return Poll::Ready(Some(Err(e))),
+                    }
                 }
-                NestedLoopJoinStreamState::FetchProbeBatch => {
-                    handle_state!(ready!(self.fetch_probe_batch(cx)))
+
+                // # NLJState transitions:
+                // 1. --> ProbeRight
+                //    Start processing the join for the newly fetched right
+                //    batch.
+                // 2. --> EmitLeftUnmatched: When the right side input is 
exhausted, (maybe) emit
+                //    unmatched left side rows.
+                //
+                // After fetching a new batch from the right side, it will
+                // process all rows from the buffered left data:
+                // ```text
+                // for batch in right_side:
+                //     for row in left_buffer:
+                //         join(batch, row)
+                // ```
+                // Note: the implementation does this step incrementally,
+                // instead of materializing all intermediate Cartesian products
+                // at once in memory.
+                //
+                // So after the right side input is exhausted, the join phase
+                // for the current buffered left data is finished. We can go to
+                // the next `EmitLeftUnmatched` phase to check if there is any
+                // special handling (e.g., in cases like left join).
+                NLJState::FetchingRight => {
+                    debug!("[NLJState] Entering: {:?}", self.state);
+                    match ready!(self.outer_table.poll_next_unpin(cx)) {
+                        Some(Ok(right_batch)) => {
+                            let right_batch_size = right_batch.num_rows();
+
+                            // Skip the empty batch
+                            if right_batch_size == 0 {
+                                continue;
+                            }
+
+                            self.current_right_batch = Some(right_batch);
+
+                            // Prepare right bitmap
+                            if self.should_track_unmatched_right {
+                                let zeroed_buf =
+                                    BooleanBuffer::new_unset(right_batch_size);
+                                self.current_right_batch_matched =
+                                    Some(BooleanArray::new(zeroed_buf, None));
+                            }
+
+                            self.l_probe_idx = 0;
+                            self.state = NLJState::ProbeRight;
+                            continue;
+                        }
+                        Some(Err(e)) => return Poll::Ready(Some(Err(e))),
+                        None => {
+                            // Right stream exhausted/as
+                            self.state = NLJState::EmitLeftUnmatched;
+                            continue;
+                        }
+                    }
                 }
-                NestedLoopJoinStreamState::ProcessProbeBatch(_) => {
-                    let poll = handle_state!(self.process_probe_batch());
-                    self.join_metrics.baseline.record_poll(poll)
+
+                // NLJState transitions:
+                // 1. --> ProbeRight(1)
+                //    If we have already buffered enough output to yield, it
+                //    will first give back control to the parent state machine,
+                //    then resume at the same place.
+                // 2. --> ProbeRight(2)
+                //    After probing one right batch, and evaluating the
+                //    join filter on (left-row x right-batch), it will advance
+                //    to the next left row, then re-enter the current state and
+                //    continue joining.
+                // 3. --> FetchRight
+                //    After it has done with the current right batch (to join
+                //    with all rows in the left buffer), it will go to
+                //    FetchRight state to check what to do next.
+                NLJState::ProbeRight => {
+                    debug!("[NLJState] Entering: {:?}", self.state);
+                    // Return any completed batches first
+                    if let Some(poll) = self.maybe_flush_ready_batch() {
+                        return poll;
+                    }
+
+                    // Process current probe state
+                    match self.process_probe_batch() {
+                        // State unchanged (ProbeRight)
+                        // Continue probing until we have done joining the
+                        // current right batch with all buffered left rows.
+                        Ok(true) => continue,
+                        // To next FetchRightState
+                        // We have finished joining
+                        // (cur_right_batch x buffered_left_batches)
+                        Ok(false) => {
+                            // Left exhausted, transition to FetchingRight
+                            self.l_probe_idx = 0;
+                            if self.should_track_unmatched_right {
+                                
debug_assert!(self.current_right_batch_matched.is_some());
+                                self.state = NLJState::EmitRightUnmatched;
+                            } else {
+                                self.current_right_batch = None;
+                                self.state = NLJState::FetchingRight;
+                            }
+                            continue;
+                        }
+                        Err(e) => return Poll::Ready(Some(Err(e))),
+                    }
                 }
-                NestedLoopJoinStreamState::PrepareUnmatchedBuildRows => {
-                    handle_state!(self.prepare_unmatched_output_indices())
+
+                // In the `current_right_batch_matched` bitmap, all trues mean
+                // it has been output by the join. In this state we have to
+                // output unmatched rows for current right batch (with null
+                // padding for left relation)
+                // Precondition: we have checked the join type so that it's
+                // possible to output right unmatched (e.g. it's right join)
+                NLJState::EmitRightUnmatched => {
+                    debug!("[NLJState] Entering: {:?}", self.state);
+                    debug_assert!(self.current_right_batch.is_some());
+                    debug_assert!(self.current_right_batch_matched.is_some());
+
+                    // Construct the result batch for unmatched right rows 
using a utility function
+                    if let Some(batch) = self.process_right_unmatched()? {
+                        self.output_buffer.push_batch(batch)?;
+                    }
+
+                    // Processed all in one pass
+                    // cleared inside `process_right_unmatched`
+                    debug_assert!(self.current_right_batch.is_none());
+                    self.state = NLJState::FetchingRight;
                 }
-                NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => {
-                    let poll = handle_state!(self.build_unmatched_output());
-                    self.join_metrics.baseline.record_poll(poll)
+
+                // NLJState transitions:
+                // 1. --> EmitLeftUnmatched(1)
+                //    If we have already buffered enough output to yield, it
+                //    will first give back control to the parent state machine,
+                //    then resume at the same place.
+                // 2. --> EmitLeftUnmatched(2)
+                //    After processing some unmatched rows, it will re-enter
+                //    the same state, to check if there are any more final
+                //    results to output.
+                // 3. --> Done
+                //    It has processed all data, go to the final state and 
ready
+                //    to exit.
+                //
+                // TODO: For memory-limited case, go back to `BufferingLeft`
+                // state again.
+                NLJState::EmitLeftUnmatched => {
+                    debug!("[NLJState] Entering: {:?}", self.state);
+                    // Return any completed batches first
+                    if let Some(poll) = self.maybe_flush_ready_batch() {
+                        return poll;
+                    }
+
+                    // Process current unmatched state
+                    match self.process_left_unmatched() {
+                        // State unchanged (EmitLeftUnmatched)
+                        // Continue processing until we have processed all 
unmatched rows
+                        Ok(true) => continue,
+                        // To Done state
+                        // We have finished processing all unmatched rows
+                        Ok(false) => {
+                            self.output_buffer.finish_buffered_batch()?;
+                            self.state = NLJState::Done;
+                            continue;
+                        }
+                        Err(e) => return Poll::Ready(Some(Err(e))),
+                    }
                 }
-                NestedLoopJoinStreamState::Completed => Poll::Ready(None),
-            };
+
+                // The final state and the exit point
+                NLJState::Done => {
+                    debug!("[NLJState] Entering: {:?}", self.state);
+                    // Return any remaining completed batches before final 
termination
+                    if let Some(poll) = self.maybe_flush_ready_batch() {
+                        return poll;
+                    }
+
+                    // HACK for the doc test in 
https://github.com/apache/datafusion/blob/main/datafusion/core/src/dataframe/mod.rs#L1265
+                    // If this operator directly return `Poll::Ready(None)`
+                    // for empty result, the final result will become an empty
+                    // batch with empty schema, however the expected result
+                    // should be with the expected schema for this operator
+                    if !self.handled_empty_output {
+                        let zero_count = Count::new();
+                        if *self.join_metrics.baseline.output_rows() == 
zero_count {
+                            let empty_batch =
+                                
RecordBatch::new_empty(Arc::clone(&self.output_schema));
+                            self.handled_empty_output = true;
+                            return Poll::Ready(Some(Ok(empty_batch)));
+                        }
+                    }
+
+                    return Poll::Ready(None);
+                }
+            }
         }
     }
+}
 
-    // This function's main job is to construct an output `RecordBatch` based 
on pre-calculated join indices.
-    // It operates in a chunk-based manner, meaning it processes a portion of 
the results in each call,
-    // making it suitable for streaming large datasets without high memory 
consumption.
-    // This function behaves like an iterator. It returns `Ok(None)`
-    // to signal that the result stream is exhausted and there is no more data.
-    fn get_next_join_result(&mut self) -> Result<Option<RecordBatch>> {
-        let status = self.join_result_status.as_mut().ok_or_else(|| {
-            internal_datafusion_err!(
-                "get_next_join_result called without initializing 
join_result_status"
-            )
-        })?;
+impl RecordBatchStream for NLJStream {
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.output_schema)
+    }
+}
 
-        let (left_indices, right_indices, current_start) = (
-            &status.build_indices,
-            &status.probe_indices,
-            status.processed_count,
+impl NLJStream {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        schema: Arc<Schema>,
+        filter: Option<JoinFilter>,
+        join_type: JoinType,
+        outer_table: SendableRecordBatchStream,
+        inner_table: OnceFut<JoinLeftData>,
+        column_indices: Vec<ColumnIndex>,
+        join_metrics: BuildProbeJoinMetrics,
+        cfg_batch_size: usize,
+    ) -> Self {
+        let should_track_unmatched_right = matches!(
+            join_type,
+            JoinType::Full
+                | JoinType::Right
+                | JoinType::RightAnti
+                | JoinType::RightMark
+                | JoinType::RightSemi
         );
 
-        let left_batch = self
-            .left_data
-            .as_ref()
-            .ok_or_else(|| internal_datafusion_err!("should have left_batch"))?
-            .batch();
+        Self {
+            output_schema: Arc::clone(&schema),
+            join_filter: filter,
+            join_type,
+            outer_table,
+            column_indices,
+            inner_table,
+            join_metrics,
+            buffered_left_data: None,
+            output_buffer: Box::new(BatchCoalescer::new(schema, 
cfg_batch_size)),
+            cfg_batch_size,
+            current_right_batch: None,
+            current_right_batch_matched: None,
+            state: NLJState::BufferingLeft,
+            l_probe_idx: 0,
+            l_emit_idx: 0,
+            left_exhausted: false,
+            left_buffered_in_one_pass: true,
+            handled_empty_output: false,
+            should_track_unmatched_right,
+        }
+    }
 
-        let right_batch = match &self.state {
-            NestedLoopJoinStreamState::ProcessProbeBatch(record_batch)
-            | 
NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) => {
-                record_batch
-            }
-            _ => {
-                return internal_err!(
-                    "State should be ProcessProbeBatch or 
OutputUnmatchedBuildRows"
-                )
-            }
-        };
+    // ==== Core logic handling for each state ====
 
-        if left_indices.is_empty() && right_indices.is_empty() && 
current_start == 0 {
-            // To match the behavior of the previous implementation, return an 
empty RecordBatch.
-            let res = RecordBatch::new_empty(Arc::clone(&self.schema));
-            status.processed_count = 1;
-            return Ok(Some(res));
-        }
+    /// Returns bool to indicate should it continue probing
+    /// true -> continue in the same ProbeRight state
+    /// false -> It has done with the (buffered_left x cur_right_batch), go to
+    /// next state (ProbeRight)
+    fn process_probe_batch(&mut self) -> Result<bool> {
+        let left_data = Arc::clone(self.get_left_data()?);
+        let right_batch = self
+            .current_right_batch
+            .as_ref()
+            .ok_or_else(|| internal_datafusion_err!("Right batch should be 
available"))?
+            .clone();
 
-        if matches!(self.join_type, JoinType::RightSemi | JoinType::RightAnti) 
{
-            // in this case left_indices.num_rows() == 0
-            let end = min(
-                current_start + self.intermediate_batch_size,
-                right_indices.len(),
-            );
+        // stop probing, the caller will go to the next state
+        if self.l_probe_idx >= left_data.batch().num_rows() {
+            return Ok(false);
+        }
 
-            if current_start >= end {
-                return Ok(None);
-            }
+        // ========
+        // Join (l_row x right_batch)
+        // and push the result into output_buffer
+        // ========
 
-            let res = Some(build_batch_from_indices(
-                &self.schema,
-                left_batch,
-                right_batch,
-                left_indices,
-                &right_indices.slice(current_start, end - current_start),
-                &self.column_indices,
-                JoinSide::Left,
-            )?);
+        let l_idx = self.l_probe_idx;
+        let join_batch =
+            self.process_single_left_row_join(&left_data, &right_batch, 
l_idx)?;
 
-            status.processed_count = end;
-            return Ok(res);
+        if let Some(batch) = join_batch {
+            self.output_buffer.push_batch(batch)?;
         }
 
-        if current_start >= left_indices.len() {
+        // ==== Prepare for the next iteration ====
+
+        // Advance left cursor
+        self.l_probe_idx += 1;
+
+        // Return true to continue probing
+        Ok(true)
+    }
+
+    /// Process a single left row join with the current right batch.
+    /// Returns a RecordBatch containing the join results (None if empty)
+    fn process_single_left_row_join(
+        &mut self,
+        left_data: &JoinLeftData,
+        right_batch: &RecordBatch,
+        l_index: usize,
+    ) -> Result<Option<RecordBatch>> {
+        let right_row_count = right_batch.num_rows();
+        if right_row_count == 0 {
             return Ok(None);
         }
 
-        let end = min(
-            current_start + self.intermediate_batch_size,
-            left_indices.len(),
-        );
+        let cur_right_bitmap = if let Some(filter) = &self.join_filter {
+            apply_filter_to_row_join_batch(
+                left_data.batch(),
+                l_index,
+                right_batch,
+                filter,
+            )?
+        } else {
+            BooleanArray::from(vec![true; right_row_count])
+        };
 
-        let left_indices = &left_indices.slice(current_start, end - 
current_start);
-        let right_indices = &right_indices.slice(current_start, end - 
current_start);
+        self.update_matched_bitmap(l_index, &cur_right_bitmap)?;
 
-        // Switch around the build side and probe side for 
`JoinType::RightMark`
-        // because in a RightMark join, we want to mark rows on the right table
-        // by looking for matches in the left.
-        let res = if self.join_type == JoinType::RightMark {
-            build_batch_from_indices(
-                &self.schema,
-                right_batch,
-                left_batch,
-                left_indices,
-                right_indices,
-                &self.column_indices,
-                JoinSide::Right,
-            )
+        // For the following join types: here we only have to set the 
left/right
+        // bitmap, and no need to output result
+        if matches!(
+            self.join_type,
+            JoinType::LeftAnti
+                | JoinType::LeftSemi
+                | JoinType::LeftMark
+                | JoinType::RightAnti
+                | JoinType::RightMark
+                | JoinType::RightSemi
+        ) {
+            return Ok(None);
+        }
+
+        if cur_right_bitmap.true_count() == 0 {
+            // If none of the pairs has passed the join predicate/filter
+            Ok(None)
         } else {
-            build_batch_from_indices(
-                &self.schema,
-                left_batch,
+            // Use the optimized approach similar to 
build_intermediate_batch_for_single_left_row
+            let join_batch = build_row_join_batch(
+                &self.output_schema,
+                left_data.batch(),
+                l_index,
                 right_batch,
-                left_indices,
-                right_indices,
+                Some(cur_right_bitmap),
                 &self.column_indices,
                 JoinSide::Left,
-            )
-        }?;
+            )?;
+            Ok(join_batch)
+        }
+    }
 
-        status.processed_count = end;
+    /// Returns bool to indicate should it continue processing unmatched rows
+    /// true -> continue in the same EmitLeftUnmatched state
+    /// false -> next state (Done)
+    fn process_left_unmatched(&mut self) -> Result<bool> {
+        let left_data = self.get_left_data()?;
+        let left_batch = left_data.batch();
 
-        Ok(Some(res))
-    }
+        // Early return if join type can't have unmatched rows
+        if !need_produce_result_in_final(self.join_type) {
+            return Ok(false);
+        }
 
-    fn collect_build_side(
-        &mut self,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
-        let build_timer = self.join_metrics.build_time.timer();
-        // build hash table from left (build) side, if not yet done
-        self.left_data = Some(ready!(self.inner_table.get_shared(cx))?);
-        build_timer.done();
+        // Early return if another thread is already processing unmatched rows
+        if self.l_emit_idx == 0 && !left_data.report_probe_completed() {
+            return Ok(false);
+        }
 
-        self.state = NestedLoopJoinStreamState::FetchProbeBatch;
+        // Stop processing unmatched rows, the caller will go to the next state
+        if self.l_emit_idx >= left_batch.num_rows() {
+            return Ok(false);
+        }
 
-        Poll::Ready(Ok(StatefulStreamResult::Continue))
-    }
+        // ========
+        // Process unmatched rows and push the result into output_buffer
+        // Each time, the number to process is up to batch size
+        // ========
+        let start_idx = self.l_emit_idx;
+        let end_idx =
+            std::cmp::min(start_idx + self.cfg_batch_size, 
left_batch.num_rows());
 
-    /// Fetches next batch from probe-side
-    ///
-    /// If a non-empty batch has been fetched, updates state to
-    /// `ProcessProbeBatchState`, otherwise updates state to 
`ExhaustedProbeSide`.
-    fn fetch_probe_batch(
-        &mut self,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
-        match ready!(self.outer_table.poll_next_unpin(cx)) {
-            None => {
-                self.state = 
NestedLoopJoinStreamState::PrepareUnmatchedBuildRows;
-            }
-            Some(Ok(right_batch)) => {
-                self.join_metrics.input_batches.add(1);
-                self.join_metrics.input_rows.add(right_batch.num_rows());
+        if let Some(batch) =
+            self.process_left_unmatched_range(left_data, start_idx, end_idx)?
+        {
+            self.output_buffer.push_batch(batch)?;
+        }
 
-                self.state = 
NestedLoopJoinStreamState::ProcessProbeBatch(right_batch);
-            }
-            Some(Err(err)) => return Poll::Ready(Err(err)),
-        };
+        // ==== Prepare for the next iteration ====
+        self.l_emit_idx = end_idx;
 
-        Poll::Ready(Ok(StatefulStreamResult::Continue))
+        // Return true to continue processing unmatched rows
+        Ok(true)
     }
 
-    /// Joins current probe batch with build-side data and produces batch with
-    /// matched output, updates state to `FetchProbeBatch`.
-    fn process_probe_batch(
-        &mut self,
-    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
-        let Some(left_data) = self.left_data.clone() else {
-            return internal_err!(
-                "Expected left_data to be Some in ProcessProbeBatch state"
+    /// Process unmatched rows from the left data within the specified range.
+    /// Returns a RecordBatch containing the unmatched rows (None if empty).
+    ///
+    /// # Arguments
+    /// * `left_data` - The left side data containing the batch and bitmap
+    /// * `start_idx` - Start index (inclusive) of the range to process
+    /// * `end_idx` - End index (exclusive) of the range to process
+    ///
+    /// # Safety
+    /// The caller is responsible for ensuring that `start_idx` and `end_idx` 
are
+    /// within valid bounds of the left batch. This function does not perform
+    /// bounds checking.
+    fn process_left_unmatched_range(
+        &self,
+        left_data: &JoinLeftData,
+        start_idx: usize,
+        end_idx: usize,
+    ) -> Result<Option<RecordBatch>> {
+        if start_idx == end_idx {
+            return Ok(None);
+        }
+
+        // Slice both left batch, and bitmap to range [start_idx, end_idx)
+        // The range is bit index (not byte)
+        let left_batch = left_data.batch();
+        let left_batch_sliced = left_batch.slice(start_idx, end_idx - 
start_idx);
+
+        // Can this be more efficient?
+        let mut bitmap_sliced = BooleanBufferBuilder::new(end_idx - start_idx);
+        bitmap_sliced.append_n(end_idx - start_idx, false);
+        let bitmap = left_data.bitmap().lock();
+        for i in start_idx..end_idx {
+            assert!(
+                i - start_idx < bitmap_sliced.capacity(),
+                "DBG: {start_idx}, {end_idx}"
             );
-        };
-        let visited_left_side = left_data.bitmap();
-        let batch = self.state.try_as_process_probe_batch()?;
+            bitmap_sliced.set_bit(i - start_idx, bitmap.get_bit(i));
+        }
+        let bitmap_sliced = BooleanArray::new(bitmap_sliced.finish(), None);
+
+        build_unmatched_batch(
+            Arc::clone(&self.output_schema),
+            &left_batch_sliced,
+            bitmap_sliced,
+            self.outer_table.schema(),
+            &self.column_indices,
+            self.join_type,
+            JoinSide::Left,
+        )
+    }
 
-        let binding = self.join_metrics.join_time.clone();
-        let _timer = binding.timer();
+    /// Process unmatched rows from the current right batch and reset the 
bitmap.
+    /// Returns a RecordBatch containing the unmatched right rows (None if 
empty).
+    fn process_right_unmatched(&mut self) -> Result<Option<RecordBatch>> {
+        // ==== Take current right batch and its bitmap ====
+        let right_batch_bitmap: BooleanArray =
+            std::mem::take(&mut 
self.current_right_batch_matched).ok_or_else(|| {
+                internal_datafusion_err!("right bitmap should be available")
+            })?;
+
+        let right_batch = self.current_right_batch.take();
+        let cur_right_batch = unwrap_or_internal_err!(right_batch);
+
+        let left_data = self.get_left_data()?;
+        let left_schema = left_data.batch().schema();
+
+        let res = build_unmatched_batch(
+            Arc::clone(&self.output_schema),
+            &cur_right_batch,
+            right_batch_bitmap,
+            left_schema,
+            &self.column_indices,
+            self.join_type,
+            JoinSide::Right,
+        );
 
-        if self.join_result_status.is_none() {
-            let (left_side_indices, right_side_indices) = 
join_left_and_right_batch(
-                left_data.batch(),
-                batch,
-                self.join_type,
-                self.filter.as_ref(),
-                visited_left_side,
-                &mut self.indices_cache,
-                self.right_side_ordered,
-                self.intermediate_batch_size,
-            )?;
-            self.join_result_status = Some(JoinResultProgress {
-                build_indices: left_side_indices,
-                probe_indices: right_side_indices,
-                processed_count: 0,
-            })
-        }
+        // ==== Clean-up ====
+        self.current_right_batch_matched = None;
 
-        let join_result = self.get_next_join_result()?;
+        res
+    }
 
-        match join_result {
-            Some(res) => {
-                self.join_metrics.output_batches.add(1);
-                Ok(StatefulStreamResult::Ready(Some(res)))
-            }
-            None => {
-                self.state = NestedLoopJoinStreamState::FetchProbeBatch;
-                self.join_result_status = None;
-                Ok(StatefulStreamResult::Continue)
-            }
-        }
+    // ==== Utilities ====
+
+    /// Get the inner data of left data, errors if it's None
+    fn get_left_data(&self) -> Result<&Arc<JoinLeftData>> {
+        self.buffered_left_data
+            .as_ref()
+            .ok_or_else(|| internal_datafusion_err!("LeftData should be 
available"))
     }
 
-    fn build_unmatched_output(
-        &mut self,
-    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
-        let binding = self.join_metrics.join_time.clone();
-        let _timer = binding.timer();
-
-        let res = self.get_next_join_result()?;
-        match res {
-            Some(res) => {
-                self.join_metrics.output_batches.add(1);
-                Ok(StatefulStreamResult::Ready(Some(res)))
-            }
-            None => {
-                self.state = NestedLoopJoinStreamState::Completed;
-                Ok(StatefulStreamResult::Ready(None))
+    /// Flush the `output_buffer` if there are batches ready to output
+    /// None if no result batch ready.
+    fn maybe_flush_ready_batch(&mut self) -> 
Option<Poll<Option<Result<RecordBatch>>>> {
+        if self.output_buffer.has_completed_batch() {
+            if let Some(batch) = self.output_buffer.next_completed_batch() {
+                let poll = Poll::Ready(Some(Ok(batch)));
+                return Some(self.join_metrics.baseline.record_poll(poll));
             }
         }
+
+        None
     }
 
-    /// This function's primary purpose is to handle the final output stage 
required by specific join types after all right-side (probe) data has been 
exhausted.
-    /// It is critically important for LEFT*/FULL joins, which must emit 
left-side (build) rows that found no match. For these cases, it identifies the 
unmatched rows and prepares the necessary state to output them.
-    fn prepare_unmatched_output_indices(
+    /// After joining (l_index@left_buffer x current_right_batch), it will 
result
+    /// in a bitmap (the same length as current_right_batch) as the join match
+    /// result. Use this bitmap to update the global bitmap, for special join
+    /// types like full joins.
+    ///
+    /// Example:
+    /// After joining l_index=1 (1-indexed row in the left buffer), and the
+    /// current right batch with 3 elements, this function will be called with
+    /// arguments: l_index = 1, r_matched = [false, false, true]
+    /// - If the join type is FullJoin, the 1-index in the left bitmap will be
+    ///   set to true, and also the right bitmap will be bitwise-ORed with the
+    ///   input r_matched bitmap.
+    /// - For join types that don't require output unmatched rows, this
+    ///   function can be a no-op. For inner joins, this function is a no-op; 
for left
+    ///   joins, only the left bitmap may be updated.
+    fn update_matched_bitmap(
         &mut self,
-    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
-        let Some(left_data) = self.left_data.clone() else {
-            return internal_err!(
-                "Expected left_data to be Some in ExhaustedProbeSide state"
-            );
-        };
-        let visited_left_side = left_data.bitmap();
-        if need_produce_result_in_final(self.join_type) {
-            // At this stage `visited_left_side` won't be updated, so it's
-            // safe to report about probe completion.
-            //
-            // Setting `is_exhausted` / returning None will prevent from
-            // multiple calls of `report_probe_completed()`
-            if !left_data.report_probe_completed() {
-                self.state = NestedLoopJoinStreamState::Completed;
-                return Ok(StatefulStreamResult::Ready(None));
-            };
+        l_index: usize,
+        r_matched: &BooleanArray,
+    ) -> Result<()> {
+        let left_data = self.get_left_data()?;
 
-            // Only setting up timer, input is exhausted
-            let _timer = self.join_metrics.join_time.timer();
-            // use the global left bitmap to produce the left indices and 
right indices
-            let (left_side, right_side) =
-                get_final_indices_from_shared_bitmap(visited_left_side, 
self.join_type);
-
-            self.join_result_status = Some(JoinResultProgress {
-                build_indices: left_side,
-                probe_indices: right_side,
-                processed_count: 0,
-            });
-            self.state = NestedLoopJoinStreamState::OutputUnmatchedBuildRows(
-                RecordBatch::new_empty(self.outer_table.schema()),
-            );
+        // number of successfully joined pairs from (l_index x cur_right_batch)
+        let joined_len = r_matched.true_count();
 
-            Ok(StatefulStreamResult::Continue)
-        } else {
-            // end of the join loop
-            self.state = NestedLoopJoinStreamState::Completed;
-            Ok(StatefulStreamResult::Ready(None))
+        // 1. Maybe update the left bitmap
+        if need_produce_result_in_final(self.join_type) && (joined_len > 0) {
+            let mut bitmap = left_data.bitmap().lock();
+            bitmap.set_bit(l_index, true);
+        }
+
+        // 2. Maybe updateh the right bitmap
+        if self.should_track_unmatched_right {
+            debug_assert!(self.current_right_batch_matched.is_some());
+            // after bit-wise or, it will be put back
+            let right_bitmap = std::mem::take(&mut 
self.current_right_batch_matched)
+                .ok_or_else(|| {
+                    internal_datafusion_err!("right batch's bitmap should be 
present")
+                })?;
+            let (buf, nulls) = right_bitmap.into_parts();
+            debug_assert!(nulls.is_none());
+            let updated_right_bitmap = buf.bitor(r_matched.values());
+
+            self.current_right_batch_matched =
+                Some(BooleanArray::new(updated_right_bitmap, None));
         }
+
+        Ok(())
     }
 }
 
-#[allow(clippy::too_many_arguments)]
-fn join_left_and_right_batch(
+// ==== Utilities ====
+
+/// Apply the join filter between:
+/// (l_index th row in left buffer) x (right batch)
+/// Returns a bitmap, with successfully joined indices set to true
+fn apply_filter_to_row_join_batch(
     left_batch: &RecordBatch,
+    l_index: usize,
     right_batch: &RecordBatch,
-    join_type: JoinType,
-    filter: Option<&JoinFilter>,
-    visited_left_side: &SharedBitmapBuilder,
-    indices_cache: &mut (UInt64Array, UInt32Array),
-    right_side_ordered: bool,
-    max_intermediate_batch_size: usize,
-) -> Result<(PrimitiveArray<UInt64Type>, PrimitiveArray<UInt32Type>)> {
-    let (left_side, right_side) = build_join_indices(
-        left_batch,
-        right_batch,
-        filter,
-        indices_cache,
-        max_intermediate_batch_size,
-    )
-    .map_err(|e| {
-        exec_datafusion_err!(
-            "Fail to build join indices in NestedLoopJoinExec, error: {e}"
-        )
-    })?;
-
-    // set the left bitmap
-    // and only full join need the left bitmap
-    if need_produce_result_in_final(join_type) {
-        let mut bitmap = visited_left_side.lock();
-        left_side.values().iter().for_each(|x| {
-            bitmap.set_bit(*x as usize, true);
-        });
-    }
-    // adjust the two side indices base on the join type
-    let (left_side, right_side) = adjust_indices_by_join_type(
-        left_side,
-        right_side,
-        0..right_batch.num_rows(),
-        join_type,
-        right_side_ordered,
-    )?;
+    filter: &JoinFilter,
+) -> Result<BooleanArray> {
+    debug_assert!(left_batch.num_rows() != 0 && right_batch.num_rows() != 0);
+
+    let intermediate_batch = if filter.schema.fields().is_empty() {
+        // If filter is constant (e.g. literal `true`), empty batch can be used
+        // in the later filter step.
+        create_record_batch_with_empty_schema(
+            Arc::new((*filter.schema).clone()),
+            right_batch.num_rows(),
+        )?
+    } else {
+        build_row_join_batch(
+            &filter.schema,
+            left_batch,
+            l_index,
+            right_batch,
+            None,
+            &filter.column_indices,
+            JoinSide::Left,
+        )?
+        .ok_or_else(|| internal_datafusion_err!("This function assume input 
batch is not empty, so the intermediate batch can't be empty too"))?
+    };
+
+    let filter_result = filter
+        .expression()
+        .evaluate(&intermediate_batch)?
+        .into_array(intermediate_batch.num_rows())?;
 
-    Ok((left_side, right_side))
+    Ok(as_boolean_array(&filter_result)?.clone())
 }
 
-impl Stream for NestedLoopJoinStream {
-    type Item = Result<RecordBatch>;
+/// This function performs the following steps:
+/// 1. Apply filter to probe-side batch
+/// 2. Broadcast the left row (build_side_batch\[build_side_index\]) to the
+///    filtered probe-side batch
+/// 3. Concat them together according to `col_indices`, and return the result
+///    (None if the result is empty)
+///
+/// Example:
+/// build_side_batch:
+/// a
+/// ----
+/// 1
+/// 2
+/// 3
+///
+/// # 0 index element in the build_side_batch (that is `1`) will be used
+/// build_side_index: 0
+///
+/// probe_side_batch:
+/// b
+/// ----
+/// 10
+/// 20
+/// 30
+/// 40
+///
+/// # After applying it, only index 1 and 3 elemnt in probe_side_batch will be
+/// # kept
+/// probe_side_filter:
+/// false
+/// true
+/// false
+/// true
+///
+///
+/// # Projections to the build/probe side batch, to construct the output batch
+/// col_indices:
+/// [(left, 0), (right, 0)]
+///
+/// build_side: left
+///
+/// ====
+/// Result batch:
+/// a b
+/// ----
+/// 1 20
+/// 1 40
+fn build_row_join_batch(
+    output_schema: &Schema,
+    build_side_batch: &RecordBatch,
+    build_side_index: usize,
+    probe_side_batch: &RecordBatch,
+    probe_side_filter: Option<BooleanArray>,
+    // See [`NLJStream`] struct's `column_indices` field for more detail
+    col_indices: &[ColumnIndex],
+    // If the build side is left or right, used to interpret the side 
information
+    // in `col_indices`
+    build_side: JoinSide,
+) -> Result<Option<RecordBatch>> {
+    debug_assert!(build_side != JoinSide::None);
+
+    // TODO(perf): since the output might be projection of right batch, this
+    // filtering step is more efficient to be done inside the column_index loop
+    let filtered_probe_batch = if let Some(filter) = probe_side_filter {
+        &filter_record_batch(probe_side_batch, &filter)?
+    } else {
+        probe_side_batch
+    };
 
-    fn poll_next(
-        mut self: std::pin::Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        self.poll_next_impl(cx)
+    if filtered_probe_batch.num_rows() == 0 {
+        return Ok(None);
+    }
+
+    let mut columns: Vec<Arc<dyn Array>> =
+        Vec::with_capacity(output_schema.fields().len());
+
+    for column_index in col_indices {
+        let array = if column_index.side == build_side {
+            // Broadcast the single build-side row to match the filtered
+            // probe-side batch length
+            let original_left_array = 
build_side_batch.column(column_index.index);
+            let scalar_value = ScalarValue::try_from_array(
+                original_left_array.as_ref(),
+                build_side_index,
+            )?;
+            scalar_value.to_array_of_size(filtered_probe_batch.num_rows())?

Review Comment:
   For such simple expressions, it's 20% faster on top of this PR. I think we 
should include this optimization as a follow-up.
   After it, the bottleneck shifts to filter + concating the final output, this 
TODO might help
   
https://github.com/apache/arrow-rs/blob/a9b6077b2d8d5b5aee0e97e7d4335878e8da1876/arrow-select/src/coalesce.rs#L206



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to