UBarney commented on code in PR #16443: URL: https://github.com/apache/datafusion/pull/16443#discussion_r2202485385
########## datafusion/physical-plan/src/joins/nested_loop_join.rs: ########## @@ -828,15 +845,125 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> { let poll = handle_state!(self.process_probe_batch()); self.join_metrics.baseline.record_poll(poll) } - NestedLoopJoinStreamState::ExhaustedProbeSide => { - let poll = handle_state!(self.process_unmatched_build_batch()); + NestedLoopJoinStreamState::PrepareUnmatchedBuildRows => { + handle_state!(self.prepare_unmatched_output_indices()) + } + NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => { + let poll = handle_state!(self.build_unmatched_output()); self.join_metrics.baseline.record_poll(poll) } NestedLoopJoinStreamState::Completed => Poll::Ready(None), }; } } + // This function's main job is to construct an output `RecordBatch` based on pre-calculated join indices. + // It operates in a chunk-based manner, meaning it processes a portion of the results in each call, + // making it suitable for streaming large datasets without high memory consumption. + fn get_next_join_result(&mut self) -> Result<Option<RecordBatch>> { + let status = self.join_result_status.as_mut().ok_or_else(|| { + internal_datafusion_err!( + "get_next_join_result called without initializing join_result_status" + ) + })?; + + let (left_indices, right_indices, current_start) = ( + &status.build_indices, + &status.probe_indices, + status.processed_count, + ); + + let left_batch = self + .left_data + .as_ref() + .ok_or_else(|| internal_datafusion_err!("should have left_batch"))? + .batch(); + + let right_batch = match &self.state { + NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) + | NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) => { + record_batch + } + _ => { + return internal_err!( + "State should be ProcessProbeBatch or OutputUnmatchedBuildRows" + ) + } + }; + + if left_indices.is_empty() && right_indices.is_empty() && current_start == 0 { + let res = RecordBatch::new_empty(Arc::clone(&self.schema)); + status.processed_count = 1; + return Ok(Some(res)); + } + + if matches!(self.join_type, JoinType::RightSemi | JoinType::RightAnti) { Review Comment: I'd prefer to stick with the current implementation. The reason is that the code block from [L925 to L939](https://github.com/apache/datafusion/blob/0a6e2c754eeb0ee94de0e60069570febd1bb6158/datafusion/physical-plan/src/joins/nested_loop_join.rs#L925-L939 ) is shared by several JoinTypes, including RightMark, Inner, LeftSemi, etc. If we refactor this into the match structure as suggested, we would have to duplicate that block of logic in multiple match arms -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org