2010YOUY01 commented on code in PR #16443:
URL: https://github.com/apache/datafusion/pull/16443#discussion_r2199423074


##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -828,13 +833,127 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
                     handle_state!(self.process_probe_batch())
                 }
                 NestedLoopJoinStreamState::ExhaustedProbeSide => {
-                    handle_state!(self.process_unmatched_build_batch())
+                    handle_state!(self.prepare_unmatched_output_indices())
+                }
+                NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => {
+                    handle_state!(self.build_unmatched_output())
                 }
                 NestedLoopJoinStreamState::Completed => Poll::Ready(None),
             };
         }
     }
 
+    fn get_next_join_result(&mut self) -> Result<Option<RecordBatch>> {
+        let (left_indices, right_indices, start) =
+            self.join_result_status.as_mut().ok_or_else(|| {
+                datafusion_common::_internal_datafusion_err!(
+                    "should have join_result_status"
+                )
+            })?;
+
+        let left_batch = self
+            .left_data
+            .as_ref()
+            .ok_or_else(|| {
+                datafusion_common::_internal_datafusion_err!("should have 
left_batch")
+            })?
+            .batch();
+
+        let right_batch = match &self.state {
+            NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) => 
record_batch,
+            NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) 
=> {
+                record_batch
+            }
+            _ => {
+                return internal_err!(
+                    "state should be ProcessProbeBatch or OutputUnmatchBatch"
+                )
+            }
+        };
+
+        let current_start = *start;
+
+        if left_indices.is_empty() && right_indices.is_empty() && 
current_start == 0 {

Review Comment:
   I don't get this `status.processed_count = 1` logic either, perhaps you can 
add a quick comment to explain it?



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -828,15 +845,125 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
                     let poll = handle_state!(self.process_probe_batch());
                     self.join_metrics.baseline.record_poll(poll)
                 }
-                NestedLoopJoinStreamState::ExhaustedProbeSide => {
-                    let poll = 
handle_state!(self.process_unmatched_build_batch());
+                NestedLoopJoinStreamState::PrepareUnmatchedBuildRows => {
+                    handle_state!(self.prepare_unmatched_output_indices())
+                }
+                NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => {
+                    let poll = handle_state!(self.build_unmatched_output());
                     self.join_metrics.baseline.record_poll(poll)
                 }
                 NestedLoopJoinStreamState::Completed => Poll::Ready(None),
             };
         }
     }
 
+    // This function's main job is to construct an output `RecordBatch` based 
on pre-calculated join indices.
+    // It operates in a chunk-based manner, meaning it processes a portion of 
the results in each call,
+    // making it suitable for streaming large datasets without high memory 
consumption.
+    fn get_next_join_result(&mut self) -> Result<Option<RecordBatch>> {
+        let status = self.join_result_status.as_mut().ok_or_else(|| {
+            internal_datafusion_err!(
+                "get_next_join_result called without initializing 
join_result_status"
+            )
+        })?;
+
+        let (left_indices, right_indices, current_start) = (
+            &status.build_indices,
+            &status.probe_indices,
+            status.processed_count,
+        );
+
+        let left_batch = self
+            .left_data
+            .as_ref()
+            .ok_or_else(|| internal_datafusion_err!("should have left_batch"))?
+            .batch();
+
+        let right_batch = match &self.state {
+            NestedLoopJoinStreamState::ProcessProbeBatch(record_batch)
+            | 
NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) => {
+                record_batch
+            }
+            _ => {
+                return internal_err!(
+                    "State should be ProcessProbeBatch or 
OutputUnmatchedBuildRows"
+                )
+            }
+        };
+
+        if left_indices.is_empty() && right_indices.is_empty() && 
current_start == 0 {
+            let res = RecordBatch::new_empty(Arc::clone(&self.schema));
+            status.processed_count = 1;
+            return Ok(Some(res));
+        }
+
+        if matches!(self.join_type, JoinType::RightSemi | JoinType::RightAnti) 
{

Review Comment:
   I think from here to the end of the function, it can look nicer if we 
structure it like this way
   ```rust
   match self.join_type {
       JoinType::RightSemi | JoinType::RightAnti => {...}
       JoinType::RightMark => {...}
       JoinType::......(others) => {}
       _ => {unreachable!()}
   }
   ```



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -705,8 +696,29 @@ impl NestedLoopJoinStreamState {
     }
 }
 
+/// Tracks incremental output of join result batches.
+///
+/// Initialized with all matching pairs that satisfy the join predicate.
+/// Pairs are stored as indices in `build_indices` and `probe_indices`
+/// Each poll outputs a batch within the configured size limit and updates
+/// processed_count until all pairs are consumed.
+///
+/// Example: 5000 matches, batch size limit is 100
+/// - Poll 1: output batch[0..100], processed_count = 100  
+/// - Poll 2: output batch[100..200], processed_count = 200
+/// - ...continues until processed_count = 5000
+struct JoinResultStatus {

Review Comment:
   nit: `Status` is most commonly used for error code/ state flags, perhaps we 
can `JoinResultProgress` here to avoid confusion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to