UBarney commented on code in PR #16443:
URL: https://github.com/apache/datafusion/pull/16443#discussion_r2202485385


##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -828,15 +845,125 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
                     let poll = handle_state!(self.process_probe_batch());
                     self.join_metrics.baseline.record_poll(poll)
                 }
-                NestedLoopJoinStreamState::ExhaustedProbeSide => {
-                    let poll = 
handle_state!(self.process_unmatched_build_batch());
+                NestedLoopJoinStreamState::PrepareUnmatchedBuildRows => {
+                    handle_state!(self.prepare_unmatched_output_indices())
+                }
+                NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => {
+                    let poll = handle_state!(self.build_unmatched_output());
                     self.join_metrics.baseline.record_poll(poll)
                 }
                 NestedLoopJoinStreamState::Completed => Poll::Ready(None),
             };
         }
     }
 
+    // This function's main job is to construct an output `RecordBatch` based 
on pre-calculated join indices.
+    // It operates in a chunk-based manner, meaning it processes a portion of 
the results in each call,
+    // making it suitable for streaming large datasets without high memory 
consumption.
+    fn get_next_join_result(&mut self) -> Result<Option<RecordBatch>> {
+        let status = self.join_result_status.as_mut().ok_or_else(|| {
+            internal_datafusion_err!(
+                "get_next_join_result called without initializing 
join_result_status"
+            )
+        })?;
+
+        let (left_indices, right_indices, current_start) = (
+            &status.build_indices,
+            &status.probe_indices,
+            status.processed_count,
+        );
+
+        let left_batch = self
+            .left_data
+            .as_ref()
+            .ok_or_else(|| internal_datafusion_err!("should have left_batch"))?
+            .batch();
+
+        let right_batch = match &self.state {
+            NestedLoopJoinStreamState::ProcessProbeBatch(record_batch)
+            | 
NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) => {
+                record_batch
+            }
+            _ => {
+                return internal_err!(
+                    "State should be ProcessProbeBatch or 
OutputUnmatchedBuildRows"
+                )
+            }
+        };
+
+        if left_indices.is_empty() && right_indices.is_empty() && 
current_start == 0 {
+            let res = RecordBatch::new_empty(Arc::clone(&self.schema));
+            status.processed_count = 1;
+            return Ok(Some(res));
+        }
+
+        if matches!(self.join_type, JoinType::RightSemi | JoinType::RightAnti) 
{

Review Comment:
   I'd prefer to stick with the current implementation. The reason is that the 
code block from [L925 to 
L939](https://github.com/apache/datafusion/blob/0a6e2c754eeb0ee94de0e60069570febd1bb6158/datafusion/physical-plan/src/joins/nested_loop_join.rs#L925-L939
   ) is shared by several JoinTypes, including RightMark, Inner, LeftSemi, etc.
   
   If we refactor this into the match structure as suggested, we would have to 
duplicate that block of logic in multiple match arms



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to