2010YOUY01 commented on code in PR #16443:
URL: https://github.com/apache/datafusion/pull/16443#discussion_r2194486005


##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -689,6 +674,8 @@ enum NestedLoopJoinStreamState {
     ProcessProbeBatch(RecordBatch),
     /// Indicates that probe-side has been fully processed
     ExhaustedProbeSide,
+    /// Output unmatched build-side rows

Review Comment:
   ```suggestion
       /// Output unmatched build-side rows.
       /// The indices for rows to output has already been calculated in the 
previous
       /// `ExhaustedProbeSide` state. In this state the final batch will be 
materialized
       // incrementally.
       // The inner `RecordBatch` is an empty dummy batch used to get right 
schema.
   ```
   Maybe we can also rename `ExhaustedProbeSide` to `PrepareUnmatchedBuildRows` 
to be more accurate.
   



##########
datafusion/physical-plan/src/joins/utils.rs:
##########
@@ -843,24 +844,56 @@ pub(crate) fn apply_join_filter_to_indices(
     probe_indices: UInt32Array,
     filter: &JoinFilter,
     build_side: JoinSide,
+    max_intermediate_size: Option<usize>,
 ) -> Result<(UInt64Array, UInt32Array)> {
     if build_indices.is_empty() && probe_indices.is_empty() {
         return Ok((build_indices, probe_indices));
     };
 
-    let intermediate_batch = build_batch_from_indices(
-        filter.schema(),
-        build_input_buffer,
-        probe_batch,
-        &build_indices,
-        &probe_indices,
-        filter.column_indices(),
-        build_side,
-    )?;
-    let filter_result = filter
-        .expression()
-        .evaluate(&intermediate_batch)?
-        .into_array(intermediate_batch.num_rows())?;
+    let filter_result = if let Some(max_size) = max_intermediate_size {

Review Comment:
   > > I believe this removes the purpose of the pull request if we are 
building the entire amount of indices
   > 
   > This PR only limits the size of the intermediate `record_batch`. The 
Cartesian product of the entire `left_table` and `right_batch` is still 
generated at once (this will be limited in a subsequent PR).
   
   Additionally, making the Cartesian product step incremental likely requires 
a larger refactor (comparing to this PR), so it may be better suited for a 
separate PR.



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -883,44 +996,60 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
         let visited_left_side = left_data.bitmap();
         let batch = self.state.try_as_process_probe_batch()?;
 
-        match self.batch_transformer.next() {
-            None => {
-                // Setting up timer & updating input metrics
-                self.join_metrics.input_batches.add(1);
-                self.join_metrics.input_rows.add(batch.num_rows());
-                let timer = self.join_metrics.join_time.timer();
-
-                let result = join_left_and_right_batch(
-                    left_data.batch(),
-                    batch,
-                    self.join_type,
-                    self.filter.as_ref(),
-                    &self.column_indices,
-                    &self.schema,
-                    visited_left_side,
-                    &mut self.indices_cache,
-                    self.right_side_ordered,
-                );
-                timer.done();
+        let binding = self.join_metrics.join_time.clone();
+        let _timer = binding.timer();
+
+        if self.join_result_status.is_none() {
+            let (left_side_indices, right_side_indices) = 
join_left_and_right_batch(
+                left_data.batch(),
+                batch,
+                self.join_type,
+                self.filter.as_ref(),
+                visited_left_side,
+                &mut self.indices_cache,
+                self.right_side_ordered,
+                self.intermediate_batch_size,
+            )?;
+            self.join_result_status = Some(JoinResultStatus {
+                build_indices: left_side_indices,
+                probe_indices: right_side_indices,
+                processed_count: 0,
+            })
+        }
 
-                self.batch_transformer.set_batch(result?);
+        let join_result = self.get_next_join_result()?;
+
+        match join_result {
+            Some(res) => {
+                self.join_metrics.output_batches.add(1);
+                self.join_metrics.output_rows.add(res.num_rows());

Review Comment:
   we don't have to count output_rows here: it would be automatically counted 
in the outer poll
   
   This was made by a recent change: 
https://github.com/apache/datafusion/pull/16500
   While output_batches still needs to be manually tracked here, it could also 
be automatically counted in the future.



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -705,8 +692,19 @@ impl NestedLoopJoinStreamState {
     }
 }
 
+/// Tracks progress when building join result batches incrementally.

Review Comment:
   ```suggestion
   /// Tracks incremental output of join result batches.
   /// 
   /// Initialized with all matching pairs that satisfy the join predicate.
   /// Pairs are stored as indices in `build_indices` and `probe_indices`
   /// Each poll outputs a batch within the configured size limit and updates 
   /// processed_count until all pairs are consumed.
   ///
   /// Example: 5000 matches, batch size limit is 100
   /// - Poll 1: output batch[0..100], processed_count = 100  
   /// - Poll 2: output batch[100..200], processed_count = 200
   /// - ...continues until processed_count = 5000
   ```
   It would be helpful to doc high-level ideas and examples, for key structs 
and functions.



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -883,44 +996,60 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
         let visited_left_side = left_data.bitmap();
         let batch = self.state.try_as_process_probe_batch()?;
 
-        match self.batch_transformer.next() {
-            None => {
-                // Setting up timer & updating input metrics
-                self.join_metrics.input_batches.add(1);
-                self.join_metrics.input_rows.add(batch.num_rows());
-                let timer = self.join_metrics.join_time.timer();
-
-                let result = join_left_and_right_batch(
-                    left_data.batch(),
-                    batch,
-                    self.join_type,
-                    self.filter.as_ref(),
-                    &self.column_indices,
-                    &self.schema,
-                    visited_left_side,
-                    &mut self.indices_cache,
-                    self.right_side_ordered,
-                );
-                timer.done();
+        let binding = self.join_metrics.join_time.clone();
+        let _timer = binding.timer();
+
+        if self.join_result_status.is_none() {
+            let (left_side_indices, right_side_indices) = 
join_left_and_right_batch(
+                left_data.batch(),
+                batch,
+                self.join_type,
+                self.filter.as_ref(),
+                visited_left_side,
+                &mut self.indices_cache,
+                self.right_side_ordered,
+                self.intermediate_batch_size,
+            )?;
+            self.join_result_status = Some(JoinResultStatus {
+                build_indices: left_side_indices,
+                probe_indices: right_side_indices,
+                processed_count: 0,
+            })
+        }
 
-                self.batch_transformer.set_batch(result?);
+        let join_result = self.get_next_join_result()?;
+
+        match join_result {
+            Some(res) => {
+                self.join_metrics.output_batches.add(1);
+                self.join_metrics.output_rows.add(res.num_rows());
+
+                Ok(StatefulStreamResult::Ready(Some(res)))
+            }
+            None => {
+                self.state = NestedLoopJoinStreamState::FetchProbeBatch;
+                self.join_result_status = None;
                 Ok(StatefulStreamResult::Continue)
             }
-            Some((batch, last)) => {
-                if last {
-                    self.state = NestedLoopJoinStreamState::FetchProbeBatch;
-                }
+        }
+    }
 
-                self.join_metrics.output_batches.add(1);
-                self.join_metrics.output_rows.add(batch.num_rows());
-                Ok(StatefulStreamResult::Ready(Some(batch)))
+    fn build_unmatched_output(
+        &mut self,
+    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
+        let start = Instant::now();

Review Comment:
   nit: I think we can just construct a timer guard here and let it stop on 
drops.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to