2010YOUY01 commented on code in PR #16996: URL: https://github.com/apache/datafusion/pull/16996#discussion_r2262497844
########## datafusion/physical-plan/src/joins/nested_loop_join.rs: ########## @@ -63,92 +64,95 @@ use datafusion_physical_expr::equivalence::{ join_equivalence_properties, ProjectionMapping, }; -use futures::{ready, Stream, StreamExt, TryStreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; +use log::debug; use parking_lot::Mutex; -/// Left (build-side) data -struct JoinLeftData { - /// Build-side data collected to single batch - batch: RecordBatch, - /// Shared bitmap builder for visited left indices - bitmap: SharedBitmapBuilder, - /// Counter of running probe-threads, potentially able to update `bitmap` - probe_threads_counter: AtomicUsize, - /// Memory reservation for tracking batch and bitmap - /// Cleared on `JoinLeftData` drop - /// reservation is cleared on Drop - #[expect(dead_code)] - reservation: MemoryReservation, -} - -impl JoinLeftData { - fn new( - batch: RecordBatch, - bitmap: SharedBitmapBuilder, - probe_threads_counter: AtomicUsize, - reservation: MemoryReservation, - ) -> Self { - Self { - batch, - bitmap, - probe_threads_counter, - reservation, - } - } - - fn batch(&self) -> &RecordBatch { - &self.batch - } - - fn bitmap(&self) -> &SharedBitmapBuilder { - &self.bitmap - } - - /// Decrements counter of running threads, and returns `true` - /// if caller is the last running thread - fn report_probe_completed(&self) -> bool { - self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1 - } -} - #[allow(rustdoc::private_intra_doc_links)] -/// NestedLoopJoinExec is build-probe join operator, whose main task is to -/// perform joins without any equijoin conditions in `ON` clause. +/// NestedLoopJoinExec is a build-probe join operator designed for joins that +/// do not have equijoin keys in their `ON` clause. /// -/// Execution consists of following phases: +/// # Execution Flow /// -/// #### 1. Build phase -/// Collecting build-side data in memory, by polling all available data from build-side input. -/// Due to the absence of equijoin conditions, it's not possible to partition build-side data -/// across multiple threads of the operator, so build-side is always collected in a single -/// batch shared across all threads. -/// The operator always considers LEFT input as build-side input, so it's crucial to adjust -/// smaller input to be the LEFT one. Normally this selection is handled by physical optimizer. +/// ```text +/// Incoming right batch +/// Left Side Buffered Batches +/// ┌───────────┐ ┌───────────────┐ +/// │ ┌───────┐ │ │ │ +/// │ │ │ │ │ │ +/// Current Left Row ───▶│ ├───────├─┤──────────┐ │ │ +/// │ │ │ │ │ └───────────────┘ +/// │ │ │ │ │ │ +/// │ │ │ │ │ │ +/// │ └───────┘ │ │ │ +/// │ ┌───────┐ │ │ │ +/// │ │ │ │ │ ┌─────┘ +/// │ │ │ │ │ │ +/// │ │ │ │ │ │ +/// │ │ │ │ │ │ +/// │ │ │ │ │ │ +/// │ └───────┘ │ ▼ ▼ +/// │ ...... │ ┌──────────────────────┐ +/// │ │ │X (Cartesian Product) │ +/// │ │ └──────────┬───────────┘ +/// └───────────┘ │ +/// │ +/// ▼ +/// ┌───────┬───────────────┐ +/// │ │ │ +/// │ │ │ +/// │ │ │ +/// └───────┴───────────────┘ +/// Intermediate Batch +/// (For join predicate evaluation) +/// ``` /// -/// #### 2. Probe phase -/// Sequentially polling batches from the probe-side input and processing them according to the -/// following logic: -/// - apply join filter (`ON` clause) to Cartesian product of probe batch and build side data -/// -- filter evaluation is executed once per build-side data row -/// - update shared bitmap of joined ("visited") build-side row indices, if required -- allows -/// to produce unmatched build-side data in case of e.g. LEFT/FULL JOIN after probing phase -/// completed -/// - perform join index alignment is required -- depending on `JoinType` -/// - produce output join batch +/// The execution follows a two-phase design: /// -/// Probing phase is executed in parallel, according to probe-side input partitioning -- one -/// thread per partition. After probe input is exhausted, each thread **ATTEMPTS** to produce -/// unmatched build-side data. +/// ## 1. Buffering Left Input +/// - The operator eagerly buffers all left-side input batches into memory, +/// unless a memory limit is reached (see 'Memory-limited Execution'). Review Comment: I have updated the comment to explain it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org