2010YOUY01 commented on code in PR #16443: URL: https://github.com/apache/datafusion/pull/16443#discussion_r2199423074
########## datafusion/physical-plan/src/joins/nested_loop_join.rs: ########## @@ -828,13 +833,127 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> { handle_state!(self.process_probe_batch()) } NestedLoopJoinStreamState::ExhaustedProbeSide => { - handle_state!(self.process_unmatched_build_batch()) + handle_state!(self.prepare_unmatched_output_indices()) + } + NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => { + handle_state!(self.build_unmatched_output()) } NestedLoopJoinStreamState::Completed => Poll::Ready(None), }; } } + fn get_next_join_result(&mut self) -> Result<Option<RecordBatch>> { + let (left_indices, right_indices, start) = + self.join_result_status.as_mut().ok_or_else(|| { + datafusion_common::_internal_datafusion_err!( + "should have join_result_status" + ) + })?; + + let left_batch = self + .left_data + .as_ref() + .ok_or_else(|| { + datafusion_common::_internal_datafusion_err!("should have left_batch") + })? + .batch(); + + let right_batch = match &self.state { + NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) => record_batch, + NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) => { + record_batch + } + _ => { + return internal_err!( + "state should be ProcessProbeBatch or OutputUnmatchBatch" + ) + } + }; + + let current_start = *start; + + if left_indices.is_empty() && right_indices.is_empty() && current_start == 0 { Review Comment: I don't get this `status.processed_count = 1` logic either, perhaps you can add a quick comment to explain it? ########## datafusion/physical-plan/src/joins/nested_loop_join.rs: ########## @@ -828,15 +845,125 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> { let poll = handle_state!(self.process_probe_batch()); self.join_metrics.baseline.record_poll(poll) } - NestedLoopJoinStreamState::ExhaustedProbeSide => { - let poll = handle_state!(self.process_unmatched_build_batch()); + NestedLoopJoinStreamState::PrepareUnmatchedBuildRows => { + handle_state!(self.prepare_unmatched_output_indices()) + } + NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => { + let poll = handle_state!(self.build_unmatched_output()); self.join_metrics.baseline.record_poll(poll) } NestedLoopJoinStreamState::Completed => Poll::Ready(None), }; } } + // This function's main job is to construct an output `RecordBatch` based on pre-calculated join indices. + // It operates in a chunk-based manner, meaning it processes a portion of the results in each call, + // making it suitable for streaming large datasets without high memory consumption. + fn get_next_join_result(&mut self) -> Result<Option<RecordBatch>> { + let status = self.join_result_status.as_mut().ok_or_else(|| { + internal_datafusion_err!( + "get_next_join_result called without initializing join_result_status" + ) + })?; + + let (left_indices, right_indices, current_start) = ( + &status.build_indices, + &status.probe_indices, + status.processed_count, + ); + + let left_batch = self + .left_data + .as_ref() + .ok_or_else(|| internal_datafusion_err!("should have left_batch"))? + .batch(); + + let right_batch = match &self.state { + NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) + | NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) => { + record_batch + } + _ => { + return internal_err!( + "State should be ProcessProbeBatch or OutputUnmatchedBuildRows" + ) + } + }; + + if left_indices.is_empty() && right_indices.is_empty() && current_start == 0 { + let res = RecordBatch::new_empty(Arc::clone(&self.schema)); + status.processed_count = 1; + return Ok(Some(res)); + } + + if matches!(self.join_type, JoinType::RightSemi | JoinType::RightAnti) { Review Comment: I think from here to the end of the function, it can look nicer if we structure it like this way ```rust match self.join_type { JoinType::RightSemi | JoinType::RightAnti => {...} JoinType::RightMark => {...} JoinType::......(others) => {} _ => {unreachable!()} } ``` ########## datafusion/physical-plan/src/joins/nested_loop_join.rs: ########## @@ -705,8 +696,29 @@ impl NestedLoopJoinStreamState { } } +/// Tracks incremental output of join result batches. +/// +/// Initialized with all matching pairs that satisfy the join predicate. +/// Pairs are stored as indices in `build_indices` and `probe_indices` +/// Each poll outputs a batch within the configured size limit and updates +/// processed_count until all pairs are consumed. +/// +/// Example: 5000 matches, batch size limit is 100 +/// - Poll 1: output batch[0..100], processed_count = 100 +/// - Poll 2: output batch[100..200], processed_count = 200 +/// - ...continues until processed_count = 5000 +struct JoinResultStatus { Review Comment: nit: `Status` is most commonly used for error code/ state flags, perhaps we can `JoinResultProgress` here to avoid confusion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org