korowa commented on code in PR #16443:
URL: https://github.com/apache/datafusion/pull/16443#discussion_r2173258366


##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -729,10 +716,26 @@ struct NestedLoopJoinStream<T> {
     right_side_ordered: bool,
     /// Current state of the stream
     state: NestedLoopJoinStreamState,
+    #[allow(dead_code)]
+    // TODO: remove this field ??
     /// Transforms the output batch before returning.
     batch_transformer: T,
     /// Result of the left data future
     left_data: Option<Arc<JoinLeftData>>,
+
+    // Tracks progress when building join result batches incrementally
+    // Contains (build_indices, probe_indices, processed_count) where:
+    // - build_indices: row indices from build-side table (left table)
+    // - probe_indices: row indices from probe-side table (right table)
+    // - processed_count: number of index pairs already processed into output 
batches
+    // We have completed join result for indices [0..processed_count)
+    join_result_status: Option<(

Review Comment:
   It may be better to create a separate struct for ProcessProbeBatch state, 
extended with all attributes required to track join progress 
([example](https://github.com/apache/datafusion/blob/9f3cc7b85b084102916acf489354d7f65f7c9e7c/datafusion/physical-plan/src/joins/hash_join.rs#L1186)
 for hash join)



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -729,10 +716,26 @@ struct NestedLoopJoinStream<T> {
     right_side_ordered: bool,
     /// Current state of the stream
     state: NestedLoopJoinStreamState,
+    #[allow(dead_code)]
+    // TODO: remove this field ??

Review Comment:
   Since there is no more need in splitting output batch, and the output is 
generating progressively, I suppose it can be removed.



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -828,13 +833,127 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
                     handle_state!(self.process_probe_batch())
                 }
                 NestedLoopJoinStreamState::ExhaustedProbeSide => {
-                    handle_state!(self.process_unmatched_build_batch())
+                    handle_state!(self.prepare_unmatched_output_indices())
+                }
+                NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => {
+                    handle_state!(self.build_unmatched_output())
                 }
                 NestedLoopJoinStreamState::Completed => Poll::Ready(None),
             };
         }
     }
 
+    fn get_next_join_result(&mut self) -> Result<Option<RecordBatch>> {
+        let (left_indices, right_indices, start) =
+            self.join_result_status.as_mut().ok_or_else(|| {
+                datafusion_common::_internal_datafusion_err!(
+                    "should have join_result_status"
+                )
+            })?;
+
+        let left_batch = self
+            .left_data
+            .as_ref()
+            .ok_or_else(|| {
+                datafusion_common::_internal_datafusion_err!("should have 
left_batch")
+            })?
+            .batch();
+
+        let right_batch = match &self.state {
+            NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) => 
record_batch,
+            NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) 
=> {
+                record_batch
+            }
+            _ => {
+                return internal_err!(
+                    "state should be ProcessProbeBatch or OutputUnmatchBatch"
+                )
+            }
+        };
+
+        let current_start = *start;
+
+        if left_indices.is_empty() && right_indices.is_empty() && 
current_start == 0 {

Review Comment:
   If both index arrays are empty, maybe it is ok to simply return None here, 
instead of building batch and setting start to 1?



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -1215,16 +1324,25 @@ pub(crate) mod tests {
             batches.extend(
                 more_batches
                     .into_iter()
+                    .inspect(|b| {
+                        assert!(b.num_rows() <= 
context.session_config().batch_size())
+                    })
                     .filter(|b| b.num_rows() > 0)
                     .collect::<Vec<_>>(),
             );
         }
         Ok((columns, batches))
     }
 
+    fn new_task_ctx() -> Arc<TaskContext> {
+        let base = TaskContext::default();
+        // limit max size of intermediate batch used in nlj to 1
+        let cfg = base.session_config().clone().with_batch_size(1);

Review Comment:
   Could you, please, parameterize batch_size value and run all unit tests for 
various batch sizes (e.g. 1, 2, 4, 10, 8192)?



##########
datafusion/physical-plan/src/joins/utils.rs:
##########
@@ -843,24 +844,56 @@ pub(crate) fn apply_join_filter_to_indices(
     probe_indices: UInt32Array,
     filter: &JoinFilter,
     build_side: JoinSide,
+    max_intermediate_size: Option<usize>,
 ) -> Result<(UInt64Array, UInt32Array)> {
     if build_indices.is_empty() && probe_indices.is_empty() {
         return Ok((build_indices, probe_indices));
     };
 
-    let intermediate_batch = build_batch_from_indices(
-        filter.schema(),
-        build_input_buffer,
-        probe_batch,
-        &build_indices,
-        &probe_indices,
-        filter.column_indices(),
-        build_side,
-    )?;
-    let filter_result = filter
-        .expression()
-        .evaluate(&intermediate_batch)?
-        .into_array(intermediate_batch.num_rows())?;
+    let filter_result = if let Some(max_size) = max_intermediate_size {

Review Comment:
   Why batch_size enforcement should take place during filtering? Can we 
enforce it before filtering, while calculating build/probe_indices args for 
this function (in NestedLoopJoinExec::build_join_indices)?



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -883,44 +1002,66 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
         let visited_left_side = left_data.bitmap();
         let batch = self.state.try_as_process_probe_batch()?;
 
-        match self.batch_transformer.next() {
-            None => {
-                // Setting up timer & updating input metrics
-                self.join_metrics.input_batches.add(1);
-                self.join_metrics.input_rows.add(batch.num_rows());
-                let timer = self.join_metrics.join_time.timer();
-
-                let result = join_left_and_right_batch(
-                    left_data.batch(),
-                    batch,
-                    self.join_type,
-                    self.filter.as_ref(),
-                    &self.column_indices,
-                    &self.schema,
-                    visited_left_side,
-                    &mut self.indices_cache,
-                    self.right_side_ordered,
-                );
-                timer.done();
+        if self.join_result_status.is_none() {
+            self.join_metrics.input_batches.add(1);
+            self.join_metrics.input_rows.add(batch.num_rows());
+            let _timer = self.join_metrics.join_time.timer();

Review Comment:
   Maybe only one timer covering all the function will be enough (instead on 
this timer, and the one on L1023)?



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -883,44 +1002,66 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
         let visited_left_side = left_data.bitmap();
         let batch = self.state.try_as_process_probe_batch()?;
 
-        match self.batch_transformer.next() {
-            None => {
-                // Setting up timer & updating input metrics
-                self.join_metrics.input_batches.add(1);
-                self.join_metrics.input_rows.add(batch.num_rows());
-                let timer = self.join_metrics.join_time.timer();
-
-                let result = join_left_and_right_batch(
-                    left_data.batch(),
-                    batch,
-                    self.join_type,
-                    self.filter.as_ref(),
-                    &self.column_indices,
-                    &self.schema,
-                    visited_left_side,
-                    &mut self.indices_cache,
-                    self.right_side_ordered,
-                );
-                timer.done();
+        if self.join_result_status.is_none() {
+            self.join_metrics.input_batches.add(1);

Review Comment:
   `fetch_probe_batch` seems to be better fit for tracking these two metrics 
(input_batches/rows)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to