2010YOUY01 commented on code in PR #16996: URL: https://github.com/apache/datafusion/pull/16996#discussion_r2272059908
########## datafusion/physical-plan/src/joins/nested_loop_join.rs: ########## @@ -660,529 +691,1168 @@ async fn collect_left_input( )) } -/// This enumeration represents various states of the nested loop join algorithm. -#[derive(Debug, Clone)] -enum NestedLoopJoinStreamState { - /// The initial state, indicating that build-side data not collected yet - WaitBuildSide, - /// Indicates that build-side has been collected, and stream is ready for - /// fetching probe-side - FetchProbeBatch, - /// Indicates that a non-empty batch has been fetched from probe-side, and - /// is ready to be processed - ProcessProbeBatch(RecordBatch), - /// Preparation phase: Gathers the indices of unmatched rows from the build-side. - /// This state is entered for join types that emit unmatched build-side rows - /// (e.g., LEFT and FULL joins) after the entire probe-side input has been consumed. - PrepareUnmatchedBuildRows, - /// Output unmatched build-side rows. - /// The indices for rows to output has already been calculated in the previous - /// `PrepareUnmatchedBuildRows` state. In this state the final batch will be materialized incrementally. - // The inner `RecordBatch` is an empty dummy batch used to get right schema. - OutputUnmatchedBuildRows(RecordBatch), - /// Indicates that NestedLoopJoinStream execution is completed - Completed, -} - -impl NestedLoopJoinStreamState { - /// Tries to extract a `ProcessProbeBatchState` from the - /// `NestedLoopJoinStreamState` enum. Returns an error if state is not - /// `ProcessProbeBatchState`. - fn try_as_process_probe_batch(&mut self) -> Result<&RecordBatch> { - match self { - NestedLoopJoinStreamState::ProcessProbeBatch(state) => Ok(state), - _ => internal_err!("Expected join stream in ProcessProbeBatch state"), - } - } -} - -/// Tracks incremental output of join result batches. -/// -/// Initialized with all matching pairs that satisfy the join predicate. -/// Pairs are stored as indices in `build_indices` and `probe_indices` -/// Each poll outputs a batch within the configured size limit and updates -/// processed_count until all pairs are consumed. -/// -/// Example: 5000 matches, batch size limit is 100 -/// - Poll 1: output batch[0..100], processed_count = 100 -/// - Poll 2: output batch[100..200], processed_count = 200 -/// - ...continues until processed_count = 5000 -struct JoinResultProgress { - /// Row indices from build-side table (left table). - build_indices: PrimitiveArray<UInt64Type>, - /// Row indices from probe-side table (right table). - probe_indices: PrimitiveArray<UInt32Type>, - /// Number of index pairs already processed into output batches. - /// We have completed join result for indices [0..processed_count). - processed_count: usize, +/// States for join processing. See `poll_next()` comment for more details about +/// state transitions. +#[derive(Debug, Clone, Copy)] +enum NLJState { + BufferingLeft, + FetchingRight, + ProbeRight, + EmitRightUnmatched, + EmitLeftUnmatched, + Done, } - -/// A stream that issues [RecordBatch]es as they arrive from the right of the join. -struct NestedLoopJoinStream { - /// Input schema - schema: Arc<Schema>, +pub(crate) struct NestedLoopJoinStream { + // ======================================================================== + // PROPERTIES: + // Operator's properties that remain constant + // + // Note: The implementation uses the terms left/inner/build-side table and Review Comment: Updated in [8cc3654](https://github.com/apache/datafusion/pull/16996/commits/8cc365491cdeab7dec8aac5d3da458191b2f3e16). Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org