This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f8e105072 feat: support input reordering for `NestedLoopJoinExec` 
(#9676)
8f8e105072 is described below

commit 8f8e105072ae786c77537ca487f28c292c0cf0e4
Author: Eduard Karacharov <[email protected]>
AuthorDate: Mon Apr 22 21:38:10 2024 +0300

    feat: support input reordering for `NestedLoopJoinExec` (#9676)
    
    * support input reordering for NestedLoopJoinExec
    
    * renamed variables and struct fields
    
    * fixed nl join filter expression in tests
    
    * Update datafusion/physical-plan/src/joins/nested_loop_join.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * typo fixed
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../core/src/physical_optimizer/join_selection.rs  | 209 ++++++++-
 datafusion/core/tests/fuzz_cases/join_fuzz.rs      |  89 +++-
 .../physical-plan/src/joins/nested_loop_join.rs    | 489 ++++++++-------------
 datafusion/sqllogictest/test_files/cte.slt         |   2 +-
 datafusion/sqllogictest/test_files/group_by.slt    |   6 +-
 datafusion/sqllogictest/test_files/joins.slt       |  32 +-
 .../sqllogictest/test_files/tpch/q11.slt.part      |  56 +--
 .../sqllogictest/test_files/tpch/q22.slt.part      |  38 +-
 8 files changed, 547 insertions(+), 374 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs 
b/datafusion/core/src/physical_optimizer/join_selection.rs
index f7512cb6d0..a8b308d3de 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -30,8 +30,8 @@ use crate::error::Result;
 use crate::physical_optimizer::PhysicalOptimizerRule;
 use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
 use crate::physical_plan::joins::{
-    CrossJoinExec, HashJoinExec, PartitionMode, StreamJoinPartitionMode,
-    SymmetricHashJoinExec,
+    CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode,
+    StreamJoinPartitionMode, SymmetricHashJoinExec,
 };
 use crate::physical_plan::projection::ProjectionExec;
 use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
@@ -199,6 +199,38 @@ fn swap_hash_join(
     }
 }
 
+/// Swaps inputs of `NestedLoopJoinExec` and wraps it into `ProjectionExec` is 
required
+fn swap_nl_join(join: &NestedLoopJoinExec) -> Result<Arc<dyn ExecutionPlan>> {
+    let new_filter = swap_join_filter(join.filter());
+    let new_join_type = &swap_join_type(*join.join_type());
+
+    let new_join = NestedLoopJoinExec::try_new(
+        Arc::clone(join.right()),
+        Arc::clone(join.left()),
+        new_filter,
+        new_join_type,
+    )?;
+
+    // For Semi/Anti joins, swap result will produce same output schema,
+    // no need to wrap them into additional projection
+    let plan: Arc<dyn ExecutionPlan> = if matches!(
+        join.join_type(),
+        JoinType::LeftSemi
+            | JoinType::RightSemi
+            | JoinType::LeftAnti
+            | JoinType::RightAnti
+    ) {
+        Arc::new(new_join)
+    } else {
+        let projection =
+            swap_reverting_projection(&join.left().schema(), 
&join.right().schema());
+
+        Arc::new(ProjectionExec::try_new(projection, Arc::new(new_join))?)
+    };
+
+    Ok(plan)
+}
+
 /// When the order of the join is changed by the optimizer, the columns in
 /// the output should not be impacted. This function creates the expressions
 /// that will allow to swap back the values from the original left as the first
@@ -438,6 +470,14 @@ fn statistical_join_selection_subrule(
             } else {
                 None
             }
+        } else if let Some(nl_join) = 
plan.as_any().downcast_ref::<NestedLoopJoinExec>() {
+            let left = nl_join.left();
+            let right = nl_join.right();
+            if should_swap_join_order(&**left, &**right)? {
+                swap_nl_join(nl_join).map(Some)?
+            } else {
+                None
+            }
         } else {
             None
         };
@@ -674,9 +714,12 @@ mod tests_statistical {
 
     use arrow::datatypes::{DataType, Field, Schema};
     use datafusion_common::{stats::Precision, JoinType, ScalarValue};
-    use datafusion_physical_expr::expressions::Column;
+    use datafusion_expr::Operator;
+    use datafusion_physical_expr::expressions::{BinaryExpr, Column};
     use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
 
+    use rstest::rstest;
+
     /// Return statistcs for empty table
     fn empty_statistics() -> Statistics {
         Statistics {
@@ -762,6 +805,35 @@ mod tests_statistical {
         }]
     }
 
+    /// Create join filter for NLJoinExec with expression `big_col > small_col`
+    /// where both columns are 0-indexed and come from left and right inputs 
respectively
+    fn nl_join_filter() -> Option<JoinFilter> {
+        let column_indices = vec![
+            ColumnIndex {
+                index: 0,
+                side: JoinSide::Left,
+            },
+            ColumnIndex {
+                index: 0,
+                side: JoinSide::Right,
+            },
+        ];
+        let intermediate_schema = Schema::new(vec![
+            Field::new("big_col", DataType::Int32, false),
+            Field::new("small_col", DataType::Int32, false),
+        ]);
+        let expression = Arc::new(BinaryExpr::new(
+            Arc::new(Column::new_with_schema("big_col", 
&intermediate_schema).unwrap()),
+            Operator::Gt,
+            Arc::new(Column::new_with_schema("small_col", 
&intermediate_schema).unwrap()),
+        )) as _;
+        Some(JoinFilter::new(
+            expression,
+            column_indices,
+            intermediate_schema,
+        ))
+    }
+
     /// Returns three plans with statistics of (min, max, distinct_count)
     /// * big 100K rows @ (0, 50k, 50k)
     /// * medium 10K rows @ (1k, 5k, 1k)
@@ -1114,6 +1186,137 @@ mod tests_statistical {
         crosscheck_plans(join).unwrap();
     }
 
+    #[rstest(
+        join_type,
+        case::inner(JoinType::Inner),
+        case::left(JoinType::Left),
+        case::right(JoinType::Right),
+        case::full(JoinType::Full)
+    )]
+    #[tokio::test]
+    async fn test_nl_join_with_swap(join_type: JoinType) {
+        let (big, small) = create_big_and_small();
+
+        let join = Arc::new(
+            NestedLoopJoinExec::try_new(
+                Arc::clone(&big),
+                Arc::clone(&small),
+                nl_join_filter(),
+                &join_type,
+            )
+            .unwrap(),
+        );
+
+        let optimized_join = JoinSelection::new()
+            .optimize(join.clone(), &ConfigOptions::new())
+            .unwrap();
+
+        let swapping_projection = optimized_join
+            .as_any()
+            .downcast_ref::<ProjectionExec>()
+            .expect("A proj is required to swap columns back to their original 
order");
+
+        assert_eq!(swapping_projection.expr().len(), 2);
+        let (col, name) = &swapping_projection.expr()[0];
+        assert_eq!(name, "big_col");
+        assert_col_expr(col, "big_col", 1);
+        let (col, name) = &swapping_projection.expr()[1];
+        assert_eq!(name, "small_col");
+        assert_col_expr(col, "small_col", 0);
+
+        let swapped_join = swapping_projection
+            .input()
+            .as_any()
+            .downcast_ref::<NestedLoopJoinExec>()
+            .expect("The type of the plan should not be changed");
+
+        // Assert join side of big_col swapped in filter expression
+        let swapped_filter = swapped_join.filter().unwrap();
+        let swapped_big_col_idx = 
swapped_filter.schema().index_of("big_col").unwrap();
+        let swapped_big_col_side = swapped_filter
+            .column_indices()
+            .get(swapped_big_col_idx)
+            .unwrap()
+            .side;
+        assert_eq!(
+            swapped_big_col_side,
+            JoinSide::Right,
+            "Filter column side should be swapped"
+        );
+
+        assert_eq!(
+            swapped_join.left().statistics().unwrap().total_byte_size,
+            Precision::Inexact(8192)
+        );
+        assert_eq!(
+            swapped_join.right().statistics().unwrap().total_byte_size,
+            Precision::Inexact(2097152)
+        );
+        crosscheck_plans(join.clone()).unwrap();
+    }
+
+    #[rstest(
+        join_type,
+        case::left_semi(JoinType::LeftSemi),
+        case::left_anti(JoinType::LeftAnti),
+        case::right_semi(JoinType::RightSemi),
+        case::right_anti(JoinType::RightAnti)
+    )]
+    #[tokio::test]
+    async fn test_nl_join_with_swap_no_proj(join_type: JoinType) {
+        let (big, small) = create_big_and_small();
+
+        let join = Arc::new(
+            NestedLoopJoinExec::try_new(
+                Arc::clone(&big),
+                Arc::clone(&small),
+                nl_join_filter(),
+                &join_type,
+            )
+            .unwrap(),
+        );
+
+        let optimized_join = JoinSelection::new()
+            .optimize(join.clone(), &ConfigOptions::new())
+            .unwrap();
+
+        let swapped_join = optimized_join
+            .as_any()
+            .downcast_ref::<NestedLoopJoinExec>()
+            .expect("The type of the plan should not be changed");
+
+        // Assert before/after schemas are equal
+        assert_eq!(
+            join.schema(),
+            swapped_join.schema(),
+            "Join schema should not be modified while optimization"
+        );
+
+        // Assert join side of big_col swapped in filter expression
+        let swapped_filter = swapped_join.filter().unwrap();
+        let swapped_big_col_idx = 
swapped_filter.schema().index_of("big_col").unwrap();
+        let swapped_big_col_side = swapped_filter
+            .column_indices()
+            .get(swapped_big_col_idx)
+            .unwrap()
+            .side;
+        assert_eq!(
+            swapped_big_col_side,
+            JoinSide::Right,
+            "Filter column side should be swapped"
+        );
+
+        assert_eq!(
+            swapped_join.left().statistics().unwrap().total_byte_size,
+            Precision::Inexact(8192)
+        );
+        assert_eq!(
+            swapped_join.right().statistics().unwrap().total_byte_size,
+            Precision::Inexact(2097152)
+        );
+        crosscheck_plans(join.clone()).unwrap();
+    }
+
     #[tokio::test]
     async fn test_swap_reverting_projection() {
         let left_schema = Schema::new(vec![
diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
index e25f04dc4b..fbfa0ffc19 100644
--- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
@@ -21,13 +21,19 @@ use arrow::array::{ArrayRef, Int32Array};
 use arrow::compute::SortOptions;
 use arrow::record_batch::RecordBatch;
 use arrow::util::pretty::pretty_format_batches;
+use arrow_schema::Schema;
 use rand::Rng;
 
+use datafusion::common::JoinSide;
+use datafusion::logical_expr::{JoinType, Operator};
+use datafusion::physical_expr::expressions::BinaryExpr;
 use datafusion::physical_plan::collect;
 use datafusion::physical_plan::expressions::Column;
-use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode, 
SortMergeJoinExec};
+use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
+use datafusion::physical_plan::joins::{
+    HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec,
+};
 use datafusion::physical_plan::memory::MemoryExec;
-use datafusion_expr::JoinType;
 
 use datafusion::prelude::{SessionConfig, SessionContext};
 use test_utils::stagger_batch_with_seed;
@@ -73,7 +79,7 @@ async fn test_full_join_1k() {
 }
 
 #[tokio::test]
-async fn test_semi_join_1k() {
+async fn test_semi_join_10k() {
     run_join_test(
         make_staggered_batches(10000),
         make_staggered_batches(10000),
@@ -83,7 +89,7 @@ async fn test_semi_join_1k() {
 }
 
 #[tokio::test]
-async fn test_anti_join_1k() {
+async fn test_anti_join_10k() {
     run_join_test(
         make_staggered_batches(10000),
         make_staggered_batches(10000),
@@ -118,6 +124,46 @@ async fn run_join_test(
             ),
         ];
 
+        // Nested loop join uses filter for joining records
+        let column_indices = vec![
+            ColumnIndex {
+                index: 0,
+                side: JoinSide::Left,
+            },
+            ColumnIndex {
+                index: 1,
+                side: JoinSide::Left,
+            },
+            ColumnIndex {
+                index: 0,
+                side: JoinSide::Right,
+            },
+            ColumnIndex {
+                index: 1,
+                side: JoinSide::Right,
+            },
+        ];
+        let intermediate_schema = Schema::new(vec![
+            schema1.field_with_name("a").unwrap().to_owned(),
+            schema1.field_with_name("b").unwrap().to_owned(),
+            schema2.field_with_name("a").unwrap().to_owned(),
+            schema2.field_with_name("b").unwrap().to_owned(),
+        ]);
+
+        let equal_a = Arc::new(BinaryExpr::new(
+            Arc::new(Column::new("a", 0)),
+            Operator::Eq,
+            Arc::new(Column::new("a", 2)),
+        )) as _;
+        let equal_b = Arc::new(BinaryExpr::new(
+            Arc::new(Column::new("b", 1)),
+            Operator::Eq,
+            Arc::new(Column::new("b", 3)),
+        )) as _;
+        let expression = Arc::new(BinaryExpr::new(equal_a, Operator::And, 
equal_b)) as _;
+
+        let on_filter = JoinFilter::new(expression, column_indices, 
intermediate_schema);
+
         // sort-merge join
         let left = Arc::new(
             MemoryExec::try_new(&[input1.clone()], schema1.clone(), 
None).unwrap(),
@@ -161,9 +207,23 @@ async fn run_join_test(
         );
         let hj_collected = collect(hj, task_ctx.clone()).await.unwrap();
 
+        // nested loop join
+        let left = Arc::new(
+            MemoryExec::try_new(&[input1.clone()], schema1.clone(), 
None).unwrap(),
+        );
+        let right = Arc::new(
+            MemoryExec::try_new(&[input2.clone()], schema2.clone(), 
None).unwrap(),
+        );
+        let nlj = Arc::new(
+            NestedLoopJoinExec::try_new(left, right, Some(on_filter), 
&join_type)
+                .unwrap(),
+        );
+        let nlj_collected = collect(nlj, task_ctx.clone()).await.unwrap();
+
         // compare
         let smj_formatted = 
pretty_format_batches(&smj_collected).unwrap().to_string();
         let hj_formatted = 
pretty_format_batches(&hj_collected).unwrap().to_string();
+        let nlj_formatted = 
pretty_format_batches(&nlj_collected).unwrap().to_string();
 
         let mut smj_formatted_sorted: Vec<&str> = 
smj_formatted.trim().lines().collect();
         smj_formatted_sorted.sort_unstable();
@@ -171,12 +231,31 @@ async fn run_join_test(
         let mut hj_formatted_sorted: Vec<&str> = 
hj_formatted.trim().lines().collect();
         hj_formatted_sorted.sort_unstable();
 
+        let mut nlj_formatted_sorted: Vec<&str> = 
nlj_formatted.trim().lines().collect();
+        nlj_formatted_sorted.sort_unstable();
+
         for (i, (smj_line, hj_line)) in smj_formatted_sorted
             .iter()
             .zip(&hj_formatted_sorted)
             .enumerate()
         {
-            assert_eq!((i, smj_line), (i, hj_line));
+            assert_eq!(
+                (i, smj_line),
+                (i, hj_line),
+                "SortMergeJoinExec and HashJoinExec produced different results"
+            );
+        }
+
+        for (i, (nlj_line, hj_line)) in nlj_formatted_sorted
+            .iter()
+            .zip(&hj_formatted_sorted)
+            .enumerate()
+        {
+            assert_eq!(
+                (i, nlj_line),
+                (i, hj_line),
+                "NestedLoopJoinExec and HashJoinExec produced different 
results"
+            );
         }
     }
 }
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs 
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index e6236e45f0..5fccd63029 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -21,21 +21,22 @@
 
 use std::any::Any;
 use std::fmt::Formatter;
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
 use std::task::Poll;
 
 use crate::coalesce_batches::concat_batches;
+use crate::coalesce_partitions::CoalescePartitionsExec;
 use crate::joins::utils::{
-    append_right_indices, apply_join_filter_to_indices, 
build_batch_from_indices,
-    build_join_schema, check_join_is_valid, estimate_join_statistics, 
get_anti_indices,
-    get_final_indices_from_bit_map, get_semi_indices,
-    partitioned_join_output_partitioning, BuildProbeJoinMetrics, ColumnIndex, 
JoinFilter,
-    OnceAsync, OnceFut,
+    adjust_indices_by_join_type, adjust_right_output_partitioning,
+    apply_join_filter_to_indices, build_batch_from_indices, build_join_schema,
+    check_join_is_valid, estimate_join_statistics, 
get_final_indices_from_bit_map,
+    BuildProbeJoinMetrics, ColumnIndex, JoinFilter, OnceAsync, OnceFut,
 };
 use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::{
     execution_mode_from_children, DisplayAs, DisplayFormatType, Distribution,
-    ExecutionMode, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
+    ExecutionMode, ExecutionPlan, ExecutionPlanProperties, Partitioning, 
PlanProperties,
     RecordBatchStream, SendableRecordBatchStream,
 };
 
@@ -52,28 +53,90 @@ use datafusion_expr::JoinType;
 use datafusion_physical_expr::equivalence::join_equivalence_properties;
 
 use futures::{ready, Stream, StreamExt, TryStreamExt};
+use parking_lot::Mutex;
+
+use super::utils::need_produce_result_in_final;
+
+/// Shared bitmap for visited left-side indices
+type SharedBitmapBuilder = Mutex<BooleanBufferBuilder>;
+/// Left (build-side) data
+struct JoinLeftData {
+    /// Build-side data collected to single batch
+    batch: RecordBatch,
+    /// Shared bitmap builder for visited left indices
+    bitmap: SharedBitmapBuilder,
+    /// Counter of running probe-threads, potentially able to update `bitmap`
+    probe_threads_counter: AtomicUsize,
+    /// Memory reservation for tracking batch and bitmap
+    /// Cleared on `JoinLeftData` drop
+    #[allow(dead_code)]
+    reservation: MemoryReservation,
+}
 
-/// Data of the inner table side
-type JoinLeftData = (RecordBatch, MemoryReservation);
+impl JoinLeftData {
+    fn new(
+        batch: RecordBatch,
+        bitmap: SharedBitmapBuilder,
+        probe_threads_counter: AtomicUsize,
+        reservation: MemoryReservation,
+    ) -> Self {
+        Self {
+            batch,
+            bitmap,
+            probe_threads_counter,
+            reservation,
+        }
+    }
+
+    fn batch(&self) -> &RecordBatch {
+        &self.batch
+    }
 
-/// NestedLoopJoinExec executes partitions in parallel.
-/// One input will be collected to a single partition, call it inner-table.
-/// The other side of the input is treated as outer-table, and the output 
Partitioning is from it.
-/// Giving an output partition number x, the execution will be:
+    fn bitmap(&self) -> &SharedBitmapBuilder {
+        &self.bitmap
+    }
+
+    /// Decrements counter of running threads, and returns `true`
+    /// if caller is the last running thread
+    fn report_probe_completed(&self) -> bool {
+        self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
+    }
+}
+
+/// NestedLoopJoinExec is build-probe join operator, whose main task is to
+/// perform joins without any equijoin conditions in `ON` clause.
+///
+/// Execution consists of following phases:
 ///
-/// ```text
-/// for outer-table-batch in outer-table-partition-x
-///     check-join(outer-table-batch, inner-table-data)
-/// ```
+/// #### 1. Build phase
+/// Collecting build-side data in memory, by polling all available data from 
build-side input.
+/// Due to the absence of equijoin conditions, it's not possible to partition 
build-side data
+/// across multiple threads of the operator, so build-side is always collected 
in a single
+/// batch shared across all threads.
+/// The operator always considers LEFT input as build-side input, so it's 
crucial to adjust
+/// smaller input to be the LEFT one. Normally this selection is handled by 
physical optimizer.
 ///
-/// One of the inputs will become inner table, and it is decided by the join 
type.
-/// Following is the relation table:
+/// #### 2. Probe phase
+/// Sequentially polling batches from the probe-side input and processing them 
according to the
+/// following logic:
+/// - apply join filter (`ON` clause) to Cartesian product of probe batch and 
build side data
+///   -- filter evaluation is executed once per build-side data row
+/// - update shared bitmap of joined ("visited") build-side row indices, if 
required -- allows
+///   to produce unmatched build-side data in case of e.g. LEFT/FULL JOIN 
after probing phase
+///   completed
+/// - perform join index alignment is required -- depending on `JoinType`
+/// - produce output join batch
 ///
-/// | JoinType                       | Distribution (left, right)              
   | Inner-table |
-/// 
|--------------------------------|--------------------------------------------|-------------|
-/// | Inner/Left/LeftSemi/LeftAnti   | (UnspecifiedDistribution, 
SinglePartition) | right       |
-/// | Right/RightSemi/RightAnti/Full | (SinglePartition, 
UnspecifiedDistribution) | left        |
-/// | Full                           | (SinglePartition, SinglePartition)      
   | left        |
+/// Probing phase is executed in parallel, according to probe-side input 
partitioning -- one
+/// thread per partition. After probe input is exhausted, each thread 
**ATTEMPTS** to produce
+/// unmatched build-side data.
+///
+/// #### 3. Producing unmatched build-side data
+/// Producing unmatched build-side data as an output batch, after probe input 
is exhausted.
+/// This step is also executed in parallel (once per probe input partition), 
and to avoid
+/// duplicate output of unmatched data (due to shared nature build-side data), 
each thread
+/// "reports" about probe phase completion (which means that "visited" bitmap 
won't be
+/// updated anymore), and only the last thread, reporting about completion, 
will return output.
 ///
 #[derive(Debug)]
 pub struct NestedLoopJoinExec {
@@ -112,6 +175,7 @@ impl NestedLoopJoinExec {
             build_join_schema(&left_schema, &right_schema, join_type);
         let schema = Arc::new(schema);
         let cache = Self::compute_properties(&left, &right, schema.clone(), 
*join_type);
+
         Ok(NestedLoopJoinExec {
             left,
             right,
@@ -165,15 +229,19 @@ impl NestedLoopJoinExec {
         );
 
         // Get output partitioning,
-        let output_partitioning = if join_type == JoinType::Full {
-            left.output_partitioning().clone()
-        } else {
-            partitioned_join_output_partitioning(
-                join_type,
-                left.output_partitioning(),
+        let output_partitioning = match join_type {
+            JoinType::Inner | JoinType::Right => 
adjust_right_output_partitioning(
                 right.output_partitioning(),
-                left.schema().fields.len(),
-            )
+                left.schema().fields().len(),
+            ),
+            JoinType::RightSemi | JoinType::RightAnti => {
+                right.output_partitioning().clone()
+            }
+            JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti | 
JoinType::Full => {
+                Partitioning::UnknownPartitioning(
+                    right.output_partitioning().partition_count(),
+                )
+            }
         };
 
         // Determine execution mode:
@@ -218,7 +286,10 @@ impl ExecutionPlan for NestedLoopJoinExec {
     }
 
     fn required_input_distribution(&self) -> Vec<Distribution> {
-        distribution_from_join_type(&self.join_type)
+        vec![
+            Distribution::SinglePartition,
+            Distribution::UnspecifiedDistribution,
+        ]
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -249,38 +320,17 @@ impl ExecutionPlan for NestedLoopJoinExec {
             MemoryConsumer::new(format!("NestedLoopJoinLoad[{partition}]"))
                 .register(context.memory_pool());
 
-        // Initialization of stream-level reservation
-        let reservation =
-            MemoryConsumer::new(format!("NestedLoopJoinStream[{partition}]"))
-                .register(context.memory_pool());
-
-        let (outer_table, inner_table) = if left_is_build_side(self.join_type) 
{
-            // left must be single partition
-            let inner_table = self.inner_table.once(|| {
-                load_specified_partition_of_input(
-                    0,
-                    self.left.clone(),
-                    context.clone(),
-                    join_metrics.clone(),
-                    load_reservation,
-                )
-            });
-            let outer_table = self.right.execute(partition, context)?;
-            (outer_table, inner_table)
-        } else {
-            // right must be single partition
-            let inner_table = self.inner_table.once(|| {
-                load_specified_partition_of_input(
-                    0,
-                    self.right.clone(),
-                    context.clone(),
-                    join_metrics.clone(),
-                    load_reservation,
-                )
-            });
-            let outer_table = self.left.execute(partition, context)?;
-            (outer_table, inner_table)
-        };
+        let inner_table = self.inner_table.once(|| {
+            collect_left_input(
+                self.left.clone(),
+                context.clone(),
+                join_metrics.clone(),
+                load_reservation,
+                need_produce_result_in_final(self.join_type),
+                self.right().output_partitioning().partition_count(),
+            )
+        });
+        let outer_table = self.right.execute(partition, context)?;
 
         Ok(Box::pin(NestedLoopJoinStream {
             schema: self.schema.clone(),
@@ -289,10 +339,8 @@ impl ExecutionPlan for NestedLoopJoinExec {
             outer_table,
             inner_table,
             is_exhausted: false,
-            visited_left_side: None,
             column_indices: self.column_indices.clone(),
             join_metrics,
-            reservation,
         }))
     }
 
@@ -311,43 +359,25 @@ impl ExecutionPlan for NestedLoopJoinExec {
     }
 }
 
-// For the nested loop join, different join type need the different 
distribution for
-// left and right node.
-fn distribution_from_join_type(join_type: &JoinType) -> Vec<Distribution> {
-    match join_type {
-        JoinType::Inner | JoinType::Left | JoinType::LeftSemi | 
JoinType::LeftAnti => {
-            // need the left data, and the right should be one partition
-            vec![
-                Distribution::UnspecifiedDistribution,
-                Distribution::SinglePartition,
-            ]
-        }
-        JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => {
-            // need the right data, and the left should be one partition
-            vec![
-                Distribution::SinglePartition,
-                Distribution::UnspecifiedDistribution,
-            ]
-        }
-        JoinType::Full => {
-            // need the left and right data, and the left and right should be 
one partition
-            vec![Distribution::SinglePartition, Distribution::SinglePartition]
-        }
-    }
-}
-
-/// Asynchronously collect the specified partition data of the input
-async fn load_specified_partition_of_input(
-    partition: usize,
+/// Asynchronously collect input into a single batch, and creates 
`JoinLeftData` from it
+async fn collect_left_input(
     input: Arc<dyn ExecutionPlan>,
     context: Arc<TaskContext>,
     join_metrics: BuildProbeJoinMetrics,
     reservation: MemoryReservation,
+    with_visited_left_side: bool,
+    probe_threads_count: usize,
 ) -> Result<JoinLeftData> {
-    let stream = input.execute(partition, context)?;
+    let schema = input.schema();
+    let merge = if input.output_partitioning().partition_count() != 1 {
+        Arc::new(CoalescePartitionsExec::new(input))
+    } else {
+        input
+    };
+    let stream = merge.execute(0, context)?;
 
     // Load all batches and count the rows
-    let (batches, num_rows, _, reservation) = stream
+    let (batches, num_rows, metrics, mut reservation) = stream
         .try_fold(
             (Vec::new(), 0usize, join_metrics, reservation),
             |mut acc, batch| async {
@@ -367,19 +397,31 @@ async fn load_specified_partition_of_input(
         )
         .await?;
 
-    let merged_batch = concat_batches(&input.schema(), &batches, num_rows)?;
+    let merged_batch = concat_batches(&schema, &batches, num_rows)?;
 
-    Ok((merged_batch, reservation))
-}
+    // Reserve memory for visited_left_side bitmap if required by join type
+    let visited_left_side = if with_visited_left_side {
+        // TODO: Replace `ceil` wrapper with stable `div_cell` after
+        // https://github.com/rust-lang/rust/issues/88581
+        let buffer_size = bit_util::ceil(merged_batch.num_rows(), 8);
+        reservation.try_grow(buffer_size)?;
+        metrics.build_mem_used.add(buffer_size);
 
-// BuildLeft means the left relation is the single patrition side.
-// For full join, both side are single partition, so it is BuildLeft and 
BuildRight, treat it as BuildLeft.
-pub fn left_is_build_side(join_type: JoinType) -> bool {
-    matches!(
-        join_type,
-        JoinType::Right | JoinType::RightSemi | JoinType::RightAnti | 
JoinType::Full
-    )
+        let mut buffer = BooleanBufferBuilder::new(merged_batch.num_rows());
+        buffer.append_n(merged_batch.num_rows(), false);
+        buffer
+    } else {
+        BooleanBufferBuilder::new(0)
+    };
+
+    Ok(JoinLeftData::new(
+        merged_batch,
+        Mutex::new(visited_left_side),
+        AtomicUsize::new(probe_threads_count),
+        reservation,
+    ))
 }
+
 /// A stream that issues [RecordBatch]es as they arrive from the right  of the 
join.
 struct NestedLoopJoinStream {
     /// Input schema
@@ -394,16 +436,12 @@ struct NestedLoopJoinStream {
     inner_table: OnceFut<JoinLeftData>,
     /// There is nothing to process anymore and left side is processed in case 
of full join
     is_exhausted: bool,
-    /// Keeps track of the left side rows whether they are visited
-    visited_left_side: Option<BooleanBufferBuilder>,
     /// Information of index and left / right placement of columns
     column_indices: Vec<ColumnIndex>,
     // TODO: support null aware equal
     // null_equals_null: bool
     /// Join execution metrics
     join_metrics: BuildProbeJoinMetrics,
-    /// Memory reservation for visited_left_side
-    reservation: MemoryReservation,
 }
 
 fn build_join_indices(
@@ -434,39 +472,20 @@ fn build_join_indices(
 }
 
 impl NestedLoopJoinStream {
-    /// For Right/RightSemi/RightAnti/Full joins, left is the single partition 
side.
-    fn poll_next_impl_for_build_left(
+    fn poll_next_impl(
         &mut self,
         cx: &mut std::task::Context<'_>,
     ) -> Poll<Option<Result<RecordBatch>>> {
         // all left row
         let build_timer = self.join_metrics.build_time.timer();
-        let (left_data, _) = match ready!(self.inner_table.get(cx)) {
+        let left_data = match ready!(self.inner_table.get_shared(cx)) {
             Ok(data) => data,
             Err(e) => return Poll::Ready(Some(Err(e))),
         };
         build_timer.done();
 
-        if self.visited_left_side.is_none() && self.join_type == 
JoinType::Full {
-            // TODO: Replace `ceil` wrapper with stable `div_cell` after
-            // https://github.com/rust-lang/rust/issues/88581
-            let visited_bitmap_size = bit_util::ceil(left_data.num_rows(), 8);
-            self.reservation.try_grow(visited_bitmap_size)?;
-            self.join_metrics.build_mem_used.add(visited_bitmap_size);
-        }
-
-        // add a bitmap for full join.
-        let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
-            let left_num_rows = left_data.num_rows();
-            // only full join need bitmap
-            if self.join_type == JoinType::Full {
-                let mut buffer = BooleanBufferBuilder::new(left_num_rows);
-                buffer.append_n(left_num_rows, false);
-                buffer
-            } else {
-                BooleanBufferBuilder::new(0)
-            }
-        });
+        // Get or initialize visited_left_side bitmap if required by join type
+        let visited_left_side = left_data.bitmap();
 
         self.outer_table
             .poll_next_unpin(cx)
@@ -478,7 +497,7 @@ impl NestedLoopJoinStream {
                     let timer = self.join_metrics.join_time.timer();
 
                     let result = join_left_and_right_batch(
-                        left_data,
+                        left_data.batch(),
                         &right_batch,
                         self.join_type,
                         self.filter.as_ref(),
@@ -498,21 +517,32 @@ impl NestedLoopJoinStream {
                 }
                 Some(err) => Some(err),
                 None => {
-                    if self.join_type == JoinType::Full && !self.is_exhausted {
+                    if need_produce_result_in_final(self.join_type) && 
!self.is_exhausted
+                    {
+                        // At this stage `visited_left_side` won't be updated, 
so it's
+                        // safe to report about probe completion.
+                        //
+                        // Setting `is_exhausted` / returning None will 
prevent from
+                        // multiple calls of `report_probe_completed()`
+                        if !left_data.report_probe_completed() {
+                            self.is_exhausted = true;
+                            return None;
+                        };
+
                         // Only setting up timer, input is exhausted
                         let timer = self.join_metrics.join_time.timer();
-
                         // use the global left bitmap to produce the left 
indices and right indices
-                        let (left_side, right_side) = 
get_final_indices_from_bit_map(
-                            visited_left_side,
-                            self.join_type,
-                        );
+                        let (left_side, right_side) =
+                            get_final_indices_from_shared_bitmap(
+                                visited_left_side,
+                                self.join_type,
+                            );
                         let empty_right_batch =
                             RecordBatch::new_empty(self.outer_table.schema());
                         // use the left and right indices to produce the batch 
result
                         let result = build_batch_from_indices(
                             &self.schema,
-                            left_data,
+                            left_data.batch(),
                             &empty_right_batch,
                             &left_side,
                             &right_side,
@@ -536,55 +566,6 @@ impl NestedLoopJoinStream {
                 }
             })
     }
-
-    /// For Inner/Left/LeftSemi/LeftAnti joins, right is the single partition 
side.
-    fn poll_next_impl_for_build_right(
-        &mut self,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<Option<Result<RecordBatch>>> {
-        // all right row
-        let build_timer = self.join_metrics.build_time.timer();
-        let (right_data, _) = match ready!(self.inner_table.get(cx)) {
-            Ok(data) => data,
-            Err(e) => return Poll::Ready(Some(Err(e))),
-        };
-        build_timer.done();
-
-        // for build right, bitmap is not needed.
-        let mut empty_visited_left_side = BooleanBufferBuilder::new(0);
-        self.outer_table
-            .poll_next_unpin(cx)
-            .map(|maybe_batch| match maybe_batch {
-                Some(Ok(left_batch)) => {
-                    // Setting up timer & updating input metrics
-                    self.join_metrics.input_batches.add(1);
-                    self.join_metrics.input_rows.add(left_batch.num_rows());
-                    let timer = self.join_metrics.join_time.timer();
-
-                    // Actual join execution
-                    let result = join_left_and_right_batch(
-                        &left_batch,
-                        right_data,
-                        self.join_type,
-                        self.filter.as_ref(),
-                        &self.column_indices,
-                        &self.schema,
-                        &mut empty_visited_left_side,
-                    );
-
-                    // Recording time & updating output metrics
-                    if let Ok(batch) = &result {
-                        timer.done();
-                        self.join_metrics.output_batches.add(1);
-                        self.join_metrics.output_rows.add(batch.num_rows());
-                    }
-
-                    Some(result)
-                }
-                Some(err) => Some(err),
-                None => None,
-            })
-    }
 }
 
 fn join_left_and_right_batch(
@@ -594,7 +575,7 @@ fn join_left_and_right_batch(
     filter: Option<&JoinFilter>,
     column_indices: &[ColumnIndex],
     schema: &Schema,
-    visited_left_side: &mut BooleanBufferBuilder,
+    visited_left_side: &SharedBitmapBuilder,
 ) -> Result<RecordBatch> {
     let indices_result = (0..left_batch.num_rows())
         .map(|left_row_index| {
@@ -625,17 +606,17 @@ fn join_left_and_right_batch(
         Ok((left_side, right_side)) => {
             // set the left bitmap
             // and only full join need the left bitmap
-            if join_type == JoinType::Full {
+            if need_produce_result_in_final(join_type) {
+                let mut bitmap = visited_left_side.lock();
                 left_side.iter().flatten().for_each(|x| {
-                    visited_left_side.set_bit(x as usize, true);
+                    bitmap.set_bit(x as usize, true);
                 });
             }
             // adjust the two side indices base on the join type
             let (left_side, right_side) = adjust_indices_by_join_type(
                 left_side,
                 right_side,
-                left_batch.num_rows(),
-                right_batch.num_rows(),
+                0..right_batch.num_rows(),
                 join_type,
             );
 
@@ -653,86 +634,12 @@ fn join_left_and_right_batch(
     }
 }
 
-fn adjust_indices_by_join_type(
-    left_indices: UInt64Array,
-    right_indices: UInt32Array,
-    count_left_batch: usize,
-    count_right_batch: usize,
+fn get_final_indices_from_shared_bitmap(
+    shared_bitmap: &SharedBitmapBuilder,
     join_type: JoinType,
 ) -> (UInt64Array, UInt32Array) {
-    match join_type {
-        JoinType::Inner => (left_indices, right_indices),
-        JoinType::Left => {
-            // matched
-            // unmatched left row will be produced in this batch
-            let left_unmatched_indices =
-                get_anti_indices(0..count_left_batch, &left_indices);
-            // combine the matched and unmatched left result together
-            append_left_indices(left_indices, right_indices, 
left_unmatched_indices)
-        }
-        JoinType::LeftSemi => {
-            // need to remove the duplicated record in the left side
-            let left_indices = get_semi_indices(0..count_left_batch, 
&left_indices);
-            // the right_indices will not be used later for the `left semi` 
join
-            (left_indices, right_indices)
-        }
-        JoinType::LeftAnti => {
-            // need to remove the duplicated record in the left side
-            // get the anti index for the left side
-            let left_indices = get_anti_indices(0..count_left_batch, 
&left_indices);
-            // the right_indices will not be used later for the `left anti` 
join
-            (left_indices, right_indices)
-        }
-        // right/right-semi/right-anti => right = outer_table, left = 
inner_table
-        JoinType::Right | JoinType::Full => {
-            // matched
-            // unmatched right row will be produced in this batch
-            let right_unmatched_indices =
-                get_anti_indices(0..count_right_batch, &right_indices);
-            // combine the matched and unmatched right result together
-            append_right_indices(left_indices, right_indices, 
right_unmatched_indices)
-        }
-        JoinType::RightSemi => {
-            // need to remove the duplicated record in the right side
-            let right_indices = get_semi_indices(0..count_right_batch, 
&right_indices);
-            // the left_indices will not be used later for the `right semi` 
join
-            (left_indices, right_indices)
-        }
-        JoinType::RightAnti => {
-            // need to remove the duplicated record in the right side
-            // get the anti index for the right side
-            let right_indices = get_anti_indices(0..count_right_batch, 
&right_indices);
-            // the left_indices will not be used later for the `right anti` 
join
-            (left_indices, right_indices)
-        }
-    }
-}
-
-/// Appends the `left_unmatched_indices` to the `left_indices`,
-/// and fills Null to tail of `right_indices` to
-/// keep the length of `left_indices` and `right_indices` consistent.
-fn append_left_indices(
-    left_indices: UInt64Array,
-    right_indices: UInt32Array,
-    left_unmatched_indices: UInt64Array,
-) -> (UInt64Array, UInt32Array) {
-    if left_unmatched_indices.is_empty() {
-        (left_indices, right_indices)
-    } else {
-        let unmatched_size = left_unmatched_indices.len();
-        // the new left indices: left_indices + null array
-        // the new right indices: right_indices + right_unmatched_indices
-        let new_left_indices = left_indices
-            .iter()
-            .chain(left_unmatched_indices.iter())
-            .collect::<UInt64Array>();
-        let new_right_indices = right_indices
-            .iter()
-            .chain(std::iter::repeat(None).take(unmatched_size))
-            .collect::<UInt32Array>();
-
-        (new_left_indices, new_right_indices)
-    }
+    let bitmap = shared_bitmap.lock();
+    get_final_indices_from_bit_map(&bitmap, join_type)
 }
 
 impl Stream for NestedLoopJoinStream {
@@ -742,11 +649,7 @@ impl Stream for NestedLoopJoinStream {
         mut self: std::pin::Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        if left_is_build_side(self.join_type) {
-            self.poll_next_impl_for_build_left(cx)
-        } else {
-            self.poll_next_impl_for_build_right(cx)
-        }
+        self.poll_next_impl(cx)
     }
 }
 
@@ -851,35 +754,19 @@ mod tests {
         context: Arc<TaskContext>,
     ) -> Result<(Vec<String>, Vec<RecordBatch>)> {
         let partition_count = 4;
-        let mut output_partition = 1;
-        let distribution = distribution_from_join_type(join_type);
-        // left
-        let left = if matches!(distribution[0], Distribution::SinglePartition) 
{
-            left
-        } else {
-            output_partition = partition_count;
-            Arc::new(RepartitionExec::try_new(
-                left,
-                Partitioning::RoundRobinBatch(partition_count),
-            )?)
-        } as Arc<dyn ExecutionPlan>;
-
-        let right = if matches!(distribution[1], 
Distribution::SinglePartition) {
-            right
-        } else {
-            output_partition = partition_count;
-            Arc::new(RepartitionExec::try_new(
-                right,
-                Partitioning::RoundRobinBatch(partition_count),
-            )?)
-        } as Arc<dyn ExecutionPlan>;
+
+        // Redistributing right input
+        let right = Arc::new(RepartitionExec::try_new(
+            right,
+            Partitioning::RoundRobinBatch(partition_count),
+        )?) as Arc<dyn ExecutionPlan>;
 
         // Use the required distribution for nested loop join to test 
partition data
         let nested_loop_join =
             NestedLoopJoinExec::try_new(left, right, join_filter, join_type)?;
         let columns = columns(&nested_loop_join.schema());
         let mut batches = vec![];
-        for i in 0..output_partition {
+        for i in 0..partition_count {
             let stream = nested_loop_join.execute(i, context.clone())?;
             let more_batches = common::collect(stream).await?;
             batches.extend(
diff --git a/datafusion/sqllogictest/test_files/cte.slt 
b/datafusion/sqllogictest/test_files/cte.slt
index b85b49c3e1..7d0f929962 100644
--- a/datafusion/sqllogictest/test_files/cte.slt
+++ b/datafusion/sqllogictest/test_files/cte.slt
@@ -390,7 +390,7 @@ WITH RECURSIVE "recursive_cte" AS (
     SELECT
       2 as "val"
     FROM
-      "recursive_cte" 
+      "recursive_cte"
       FULL JOIN "sub_cte" ON 1 = 1
     WHERE
       "recursive_cte"."val" < 2
diff --git a/datafusion/sqllogictest/test_files/group_by.slt 
b/datafusion/sqllogictest/test_files/group_by.slt
index 77a31cf89a..7bef738337 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -3429,9 +3429,9 @@ physical_plan
 07)------------AggregateExec: mode=Partial, gby=[sn@1 as sn, amount@2 as 
amount], aggr=[SUM(l.amount)]
 08)--------------ProjectionExec: expr=[amount@1 as amount, sn@2 as sn, 
amount@3 as amount]
 09)----------------NestedLoopJoinExec: join_type=Inner, filter=sn@0 >= sn@1
-10)------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 
0, 0, 0]
-11)------------------CoalescePartitionsExec
-12)--------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 
0, 0, 0, 0]
+10)------------------CoalescePartitionsExec
+11)--------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 
0, 0, 0, 0]
+12)------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 
0, 0, 0]
 
 query IRR
 SELECT r.sn, SUM(l.amount), r.amount
diff --git a/datafusion/sqllogictest/test_files/joins.slt 
b/datafusion/sqllogictest/test_files/joins.slt
index ce72546bef..65e6c17b92 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -2010,7 +2010,8 @@ set datafusion.explain.logical_plan_only = false;
 statement ok
 set datafusion.execution.target_partitions = 4;
 
-# Right as inner table nested loop join
+# Planning inner nested loop join 
+# inputs are swapped due to inexact statistics + join reordering caused 
additional projection
 
 query TT
 EXPLAIN
@@ -2027,17 +2028,18 @@ logical_plan
 05)----Filter: join_t2.t2_int > UInt32(1)
 06)------TableScan: join_t2 projection=[t2_id, t2_int]
 physical_plan
-01)NestedLoopJoinExec: join_type=Inner, filter=t1_id@0 > t2_id@1
-02)--CoalesceBatchesExec: target_batch_size=2
-03)----FilterExec: t1_id@0 > 10
-04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-05)--------MemoryExec: partitions=1, partition_sizes=[1]
-06)--CoalescePartitionsExec
-07)----ProjectionExec: expr=[t2_id@0 as t2_id]
-08)------CoalesceBatchesExec: target_batch_size=2
-09)--------FilterExec: t2_int@1 > 1
-10)----------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-11)------------MemoryExec: partitions=1, partition_sizes=[1]
+01)ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id]
+02)--NestedLoopJoinExec: join_type=Inner, filter=t1_id@0 > t2_id@1
+03)----CoalescePartitionsExec
+04)------ProjectionExec: expr=[t2_id@0 as t2_id]
+05)--------CoalesceBatchesExec: target_batch_size=2
+06)----------FilterExec: t2_int@1 > 1
+07)------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+08)--------------MemoryExec: partitions=1, partition_sizes=[1]
+09)----CoalesceBatchesExec: target_batch_size=2
+10)------FilterExec: t1_id@0 > 10
+11)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+12)----------MemoryExec: partitions=1, partition_sizes=[1]
 
 query II
 SELECT join_t1.t1_id, join_t2.t2_id
@@ -3473,9 +3475,9 @@ logical_plan
 05)----TableScan: annotated_data projection=[a0, a, b, c, d]
 physical_plan
 01)NestedLoopJoinExec: join_type=Inner, filter=a@1 < a@0
-02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-03)----CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
-04)--CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
+02)--CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
+03)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+04)----CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
 
 # Currently datafusion cannot pushdown filter conditions with scalar UDF into
 # cross join.
diff --git a/datafusion/sqllogictest/test_files/tpch/q11.slt.part 
b/datafusion/sqllogictest/test_files/tpch/q11.slt.part
index 55b38333ca..3050af6f89 100644
--- a/datafusion/sqllogictest/test_files/tpch/q11.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/q11.slt.part
@@ -75,10 +75,10 @@ logical_plan
 26)----------------------TableScan: nation projection=[n_nationkey, n_name], 
partial_filters=[nation.n_name = Utf8("GERMANY")]
 physical_plan
 01)GlobalLimitExec: skip=0, fetch=10
-02)--SortPreservingMergeExec: [value@1 DESC], fetch=10
-03)----SortExec: TopK(fetch=10), expr=[value@1 DESC]
-04)------ProjectionExec: expr=[ps_partkey@0 as ps_partkey, 
SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@1 as value]
-05)--------NestedLoopJoinExec: join_type=Inner, 
filter=CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@0 AS 
Decimal128(38, 15)) > SUM(partsupp.ps_supplycost * partsupp.ps_availqty) * 
Float64(0.0001)@1
+02)--SortExec: TopK(fetch=10), expr=[value@1 DESC]
+03)----ProjectionExec: expr=[ps_partkey@0 as ps_partkey, 
SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@1 as value]
+04)------NestedLoopJoinExec: join_type=Inner, 
filter=CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@0 AS 
Decimal128(38, 15)) > SUM(partsupp.ps_supplycost * partsupp.ps_availqty) * 
Float64(0.0001)@1
+05)--------CoalescePartitionsExec
 06)----------AggregateExec: mode=FinalPartitioned, gby=[ps_partkey@0 as 
ps_partkey], aggr=[SUM(partsupp.ps_supplycost * partsupp.ps_availqty)]
 07)------------CoalesceBatchesExec: target_batch_size=8192
 08)--------------RepartitionExec: partitioning=Hash([ps_partkey@0], 4), 
input_partitions=4
@@ -103,30 +103,30 @@ physical_plan
 27)------------------------------FilterExec: n_name@1 = GERMANY
 28)--------------------------------RepartitionExec: 
partitioning=RoundRobinBatch(4), input_partitions=1
 29)----------------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]}, 
projection=[n_nationkey, n_name], has_header=false
-30)----------ProjectionExec: expr=[CAST(CAST(SUM(partsupp.ps_supplycost * 
partsupp.ps_availqty)@0 AS Float64) * 0.0001 AS Decimal128(38, 15)) as 
SUM(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001)]
-31)------------AggregateExec: mode=Final, gby=[], 
aggr=[SUM(partsupp.ps_supplycost * partsupp.ps_availqty)]
-32)--------------CoalescePartitionsExec
-33)----------------AggregateExec: mode=Partial, gby=[], 
aggr=[SUM(partsupp.ps_supplycost * partsupp.ps_availqty)]
-34)------------------CoalesceBatchesExec: target_batch_size=8192
-35)--------------------HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(s_nationkey@2, n_nationkey@0)], projection=[ps_availqty@0, ps_supplycost@1]
-36)----------------------CoalesceBatchesExec: target_batch_size=8192
-37)------------------------RepartitionExec: partitioning=Hash([s_nationkey@2], 
4), input_partitions=4
-38)--------------------------CoalesceBatchesExec: target_batch_size=8192
-39)----------------------------HashJoinExec: mode=Partitioned, 
join_type=Inner, on=[(ps_suppkey@0, s_suppkey@0)], projection=[ps_availqty@1, 
ps_supplycost@2, s_nationkey@4]
-40)------------------------------CoalesceBatchesExec: target_batch_size=8192
-41)--------------------------------RepartitionExec: 
partitioning=Hash([ps_suppkey@0], 4), input_partitions=4
-42)----------------------------------CsvExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:0..2932049],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:2932049..5864098],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:5864098..8796147],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:8796147..11728193]]},
 projection=[ps_suppkey, ps_availqty, ps_supplycost],  [...]
-43)------------------------------CoalesceBatchesExec: target_batch_size=8192
-44)--------------------------------RepartitionExec: 
partitioning=Hash([s_suppkey@0], 4), input_partitions=4
-45)----------------------------------RepartitionExec: 
partitioning=RoundRobinBatch(4), input_partitions=1
-46)------------------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, 
projection=[s_suppkey, s_nationkey], has_header=false
-47)----------------------CoalesceBatchesExec: target_batch_size=8192
-48)------------------------RepartitionExec: partitioning=Hash([n_nationkey@0], 
4), input_partitions=4
-49)--------------------------ProjectionExec: expr=[n_nationkey@0 as 
n_nationkey]
-50)----------------------------CoalesceBatchesExec: target_batch_size=8192
-51)------------------------------FilterExec: n_name@1 = GERMANY
-52)--------------------------------RepartitionExec: 
partitioning=RoundRobinBatch(4), input_partitions=1
-53)----------------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]}, 
projection=[n_nationkey, n_name], has_header=false
+30)--------ProjectionExec: expr=[CAST(CAST(SUM(partsupp.ps_supplycost * 
partsupp.ps_availqty)@0 AS Float64) * 0.0001 AS Decimal128(38, 15)) as 
SUM(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001)]
+31)----------AggregateExec: mode=Final, gby=[], 
aggr=[SUM(partsupp.ps_supplycost * partsupp.ps_availqty)]
+32)------------CoalescePartitionsExec
+33)--------------AggregateExec: mode=Partial, gby=[], 
aggr=[SUM(partsupp.ps_supplycost * partsupp.ps_availqty)]
+34)----------------CoalesceBatchesExec: target_batch_size=8192
+35)------------------HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(s_nationkey@2, n_nationkey@0)], projection=[ps_availqty@0, ps_supplycost@1]
+36)--------------------CoalesceBatchesExec: target_batch_size=8192
+37)----------------------RepartitionExec: partitioning=Hash([s_nationkey@2], 
4), input_partitions=4
+38)------------------------CoalesceBatchesExec: target_batch_size=8192
+39)--------------------------HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(ps_suppkey@0, s_suppkey@0)], projection=[ps_availqty@1, ps_supplycost@2, 
s_nationkey@4]
+40)----------------------------CoalesceBatchesExec: target_batch_size=8192
+41)------------------------------RepartitionExec: 
partitioning=Hash([ps_suppkey@0], 4), input_partitions=4
+42)--------------------------------CsvExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:0..2932049],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:2932049..5864098],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:5864098..8796147],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:8796147..11728193]]},
 projection=[ps_suppkey, ps_availqty, ps_supplycost], ha [...]
+43)----------------------------CoalesceBatchesExec: target_batch_size=8192
+44)------------------------------RepartitionExec: 
partitioning=Hash([s_suppkey@0], 4), input_partitions=4
+45)--------------------------------RepartitionExec: 
partitioning=RoundRobinBatch(4), input_partitions=1
+46)----------------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, 
projection=[s_suppkey, s_nationkey], has_header=false
+47)--------------------CoalesceBatchesExec: target_batch_size=8192
+48)----------------------RepartitionExec: partitioning=Hash([n_nationkey@0], 
4), input_partitions=4
+49)------------------------ProjectionExec: expr=[n_nationkey@0 as n_nationkey]
+50)--------------------------CoalesceBatchesExec: target_batch_size=8192
+51)----------------------------FilterExec: n_name@1 = GERMANY
+52)------------------------------RepartitionExec: 
partitioning=RoundRobinBatch(4), input_partitions=1
+53)--------------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]}, 
projection=[n_nationkey, n_name], has_header=false
 
 
 
diff --git a/datafusion/sqllogictest/test_files/tpch/q22.slt.part 
b/datafusion/sqllogictest/test_files/tpch/q22.slt.part
index 73b3c16025..98c8ba3965 100644
--- a/datafusion/sqllogictest/test_files/tpch/q22.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/q22.slt.part
@@ -82,26 +82,28 @@ physical_plan
 06)----------RepartitionExec: partitioning=Hash([cntrycode@0], 4), 
input_partitions=4
 07)------------AggregateExec: mode=Partial, gby=[cntrycode@0 as cntrycode], 
aggr=[COUNT(*), SUM(custsale.c_acctbal)]
 08)--------------ProjectionExec: expr=[substr(c_phone@0, 1, 2) as cntrycode, 
c_acctbal@1 as c_acctbal]
-09)----------------NestedLoopJoinExec: join_type=Inner, 
filter=CAST(c_acctbal@0 AS Decimal128(19, 6)) > AVG(customer.c_acctbal)@1
-10)------------------CoalesceBatchesExec: target_batch_size=8192
-11)--------------------HashJoinExec: mode=Partitioned, join_type=LeftAnti, 
on=[(c_custkey@0, o_custkey@0)], projection=[c_phone@1, c_acctbal@2]
+09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+10)------------------NestedLoopJoinExec: join_type=Inner, 
filter=CAST(c_acctbal@0 AS Decimal128(19, 6)) > AVG(customer.c_acctbal)@1
+11)--------------------CoalescePartitionsExec
 12)----------------------CoalesceBatchesExec: target_batch_size=8192
-13)------------------------RepartitionExec: partitioning=Hash([c_custkey@0], 
4), input_partitions=4
+13)------------------------HashJoinExec: mode=Partitioned, join_type=LeftAnti, 
on=[(c_custkey@0, o_custkey@0)], projection=[c_phone@1, c_acctbal@2]
 14)--------------------------CoalesceBatchesExec: target_batch_size=8192
-15)----------------------------FilterExec: Use substr(c_phone@1, 1, 2) IN 
(SET) ([Literal { value: Utf8("13") }, Literal { value: Utf8("31") }, Literal { 
value: Utf8("23") }, Literal { value: Utf8("29") }, Literal { value: Utf8("30") 
}, Literal { value: Utf8("18") }, Literal { value: Utf8("17") }])
-16)------------------------------RepartitionExec: 
partitioning=RoundRobinBatch(4), input_partitions=1
-17)--------------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]}, 
projection=[c_custkey, c_phone, c_acctbal], has_header=false
-18)----------------------CoalesceBatchesExec: target_batch_size=8192
-19)------------------------RepartitionExec: partitioning=Hash([o_custkey@0], 
4), input_partitions=4
-20)--------------------------CsvExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:0..4223281],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:4223281..8446562],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:8446562..12669843],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:12669843..16893122]]},
 projection=[o_custkey], has_header=false
-21)------------------AggregateExec: mode=Final, gby=[], 
aggr=[AVG(customer.c_acctbal)]
-22)--------------------CoalescePartitionsExec
-23)----------------------AggregateExec: mode=Partial, gby=[], 
aggr=[AVG(customer.c_acctbal)]
-24)------------------------ProjectionExec: expr=[c_acctbal@1 as c_acctbal]
-25)--------------------------CoalesceBatchesExec: target_batch_size=8192
-26)----------------------------FilterExec: c_acctbal@1 > Some(0),15,2 AND Use 
substr(c_phone@0, 1, 2) IN (SET) ([Literal { value: Utf8("13") }, Literal { 
value: Utf8("31") }, Literal { value: Utf8("23") }, Literal { value: Utf8("29") 
}, Literal { value: Utf8("30") }, Literal { value: Utf8("18") }, Literal { 
value: Utf8("17") }])
-27)------------------------------RepartitionExec: 
partitioning=RoundRobinBatch(4), input_partitions=1
-28)--------------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]}, 
projection=[c_phone, c_acctbal], has_header=false
+15)----------------------------RepartitionExec: 
partitioning=Hash([c_custkey@0], 4), input_partitions=4
+16)------------------------------CoalesceBatchesExec: target_batch_size=8192
+17)--------------------------------FilterExec: Use substr(c_phone@1, 1, 2) IN 
(SET) ([Literal { value: Utf8("13") }, Literal { value: Utf8("31") }, Literal { 
value: Utf8("23") }, Literal { value: Utf8("29") }, Literal { value: Utf8("30") 
}, Literal { value: Utf8("18") }, Literal { value: Utf8("17") }])
+18)----------------------------------RepartitionExec: 
partitioning=RoundRobinBatch(4), input_partitions=1
+19)------------------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]}, 
projection=[c_custkey, c_phone, c_acctbal], has_header=false
+20)--------------------------CoalesceBatchesExec: target_batch_size=8192
+21)----------------------------RepartitionExec: 
partitioning=Hash([o_custkey@0], 4), input_partitions=4
+22)------------------------------CsvExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:0..4223281],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:4223281..8446562],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:8446562..12669843],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:12669843..16893122]]},
 projection=[o_custkey], has_header=false
+23)--------------------AggregateExec: mode=Final, gby=[], 
aggr=[AVG(customer.c_acctbal)]
+24)----------------------CoalescePartitionsExec
+25)------------------------AggregateExec: mode=Partial, gby=[], 
aggr=[AVG(customer.c_acctbal)]
+26)--------------------------ProjectionExec: expr=[c_acctbal@1 as c_acctbal]
+27)----------------------------CoalesceBatchesExec: target_batch_size=8192
+28)------------------------------FilterExec: c_acctbal@1 > Some(0),15,2 AND 
Use substr(c_phone@0, 1, 2) IN (SET) ([Literal { value: Utf8("13") }, Literal { 
value: Utf8("31") }, Literal { value: Utf8("23") }, Literal { value: Utf8("29") 
}, Literal { value: Utf8("30") }, Literal { value: Utf8("18") }, Literal { 
value: Utf8("17") }])
+29)--------------------------------RepartitionExec: 
partitioning=RoundRobinBatch(4), input_partitions=1
+30)----------------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]}, 
projection=[c_phone, c_acctbal], has_header=false
 
 
 query TIR


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to