This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8f8e105072 feat: support input reordering for `NestedLoopJoinExec`
(#9676)
8f8e105072 is described below
commit 8f8e105072ae786c77537ca487f28c292c0cf0e4
Author: Eduard Karacharov <[email protected]>
AuthorDate: Mon Apr 22 21:38:10 2024 +0300
feat: support input reordering for `NestedLoopJoinExec` (#9676)
* support input reordering for NestedLoopJoinExec
* renamed variables and struct fields
* fixed nl join filter expression in tests
* Update datafusion/physical-plan/src/joins/nested_loop_join.rs
Co-authored-by: Andrew Lamb <[email protected]>
* typo fixed
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
.../core/src/physical_optimizer/join_selection.rs | 209 ++++++++-
datafusion/core/tests/fuzz_cases/join_fuzz.rs | 89 +++-
.../physical-plan/src/joins/nested_loop_join.rs | 489 ++++++++-------------
datafusion/sqllogictest/test_files/cte.slt | 2 +-
datafusion/sqllogictest/test_files/group_by.slt | 6 +-
datafusion/sqllogictest/test_files/joins.slt | 32 +-
.../sqllogictest/test_files/tpch/q11.slt.part | 56 +--
.../sqllogictest/test_files/tpch/q22.slt.part | 38 +-
8 files changed, 547 insertions(+), 374 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs
b/datafusion/core/src/physical_optimizer/join_selection.rs
index f7512cb6d0..a8b308d3de 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -30,8 +30,8 @@ use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
use crate::physical_plan::joins::{
- CrossJoinExec, HashJoinExec, PartitionMode, StreamJoinPartitionMode,
- SymmetricHashJoinExec,
+ CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode,
+ StreamJoinPartitionMode, SymmetricHashJoinExec,
};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
@@ -199,6 +199,38 @@ fn swap_hash_join(
}
}
+/// Swaps inputs of `NestedLoopJoinExec` and wraps it into `ProjectionExec` is
required
+fn swap_nl_join(join: &NestedLoopJoinExec) -> Result<Arc<dyn ExecutionPlan>> {
+ let new_filter = swap_join_filter(join.filter());
+ let new_join_type = &swap_join_type(*join.join_type());
+
+ let new_join = NestedLoopJoinExec::try_new(
+ Arc::clone(join.right()),
+ Arc::clone(join.left()),
+ new_filter,
+ new_join_type,
+ )?;
+
+ // For Semi/Anti joins, swap result will produce same output schema,
+ // no need to wrap them into additional projection
+ let plan: Arc<dyn ExecutionPlan> = if matches!(
+ join.join_type(),
+ JoinType::LeftSemi
+ | JoinType::RightSemi
+ | JoinType::LeftAnti
+ | JoinType::RightAnti
+ ) {
+ Arc::new(new_join)
+ } else {
+ let projection =
+ swap_reverting_projection(&join.left().schema(),
&join.right().schema());
+
+ Arc::new(ProjectionExec::try_new(projection, Arc::new(new_join))?)
+ };
+
+ Ok(plan)
+}
+
/// When the order of the join is changed by the optimizer, the columns in
/// the output should not be impacted. This function creates the expressions
/// that will allow to swap back the values from the original left as the first
@@ -438,6 +470,14 @@ fn statistical_join_selection_subrule(
} else {
None
}
+ } else if let Some(nl_join) =
plan.as_any().downcast_ref::<NestedLoopJoinExec>() {
+ let left = nl_join.left();
+ let right = nl_join.right();
+ if should_swap_join_order(&**left, &**right)? {
+ swap_nl_join(nl_join).map(Some)?
+ } else {
+ None
+ }
} else {
None
};
@@ -674,9 +714,12 @@ mod tests_statistical {
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::{stats::Precision, JoinType, ScalarValue};
- use datafusion_physical_expr::expressions::Column;
+ use datafusion_expr::Operator;
+ use datafusion_physical_expr::expressions::{BinaryExpr, Column};
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
+ use rstest::rstest;
+
/// Return statistcs for empty table
fn empty_statistics() -> Statistics {
Statistics {
@@ -762,6 +805,35 @@ mod tests_statistical {
}]
}
+ /// Create join filter for NLJoinExec with expression `big_col > small_col`
+ /// where both columns are 0-indexed and come from left and right inputs
respectively
+ fn nl_join_filter() -> Option<JoinFilter> {
+ let column_indices = vec![
+ ColumnIndex {
+ index: 0,
+ side: JoinSide::Left,
+ },
+ ColumnIndex {
+ index: 0,
+ side: JoinSide::Right,
+ },
+ ];
+ let intermediate_schema = Schema::new(vec![
+ Field::new("big_col", DataType::Int32, false),
+ Field::new("small_col", DataType::Int32, false),
+ ]);
+ let expression = Arc::new(BinaryExpr::new(
+ Arc::new(Column::new_with_schema("big_col",
&intermediate_schema).unwrap()),
+ Operator::Gt,
+ Arc::new(Column::new_with_schema("small_col",
&intermediate_schema).unwrap()),
+ )) as _;
+ Some(JoinFilter::new(
+ expression,
+ column_indices,
+ intermediate_schema,
+ ))
+ }
+
/// Returns three plans with statistics of (min, max, distinct_count)
/// * big 100K rows @ (0, 50k, 50k)
/// * medium 10K rows @ (1k, 5k, 1k)
@@ -1114,6 +1186,137 @@ mod tests_statistical {
crosscheck_plans(join).unwrap();
}
+ #[rstest(
+ join_type,
+ case::inner(JoinType::Inner),
+ case::left(JoinType::Left),
+ case::right(JoinType::Right),
+ case::full(JoinType::Full)
+ )]
+ #[tokio::test]
+ async fn test_nl_join_with_swap(join_type: JoinType) {
+ let (big, small) = create_big_and_small();
+
+ let join = Arc::new(
+ NestedLoopJoinExec::try_new(
+ Arc::clone(&big),
+ Arc::clone(&small),
+ nl_join_filter(),
+ &join_type,
+ )
+ .unwrap(),
+ );
+
+ let optimized_join = JoinSelection::new()
+ .optimize(join.clone(), &ConfigOptions::new())
+ .unwrap();
+
+ let swapping_projection = optimized_join
+ .as_any()
+ .downcast_ref::<ProjectionExec>()
+ .expect("A proj is required to swap columns back to their original
order");
+
+ assert_eq!(swapping_projection.expr().len(), 2);
+ let (col, name) = &swapping_projection.expr()[0];
+ assert_eq!(name, "big_col");
+ assert_col_expr(col, "big_col", 1);
+ let (col, name) = &swapping_projection.expr()[1];
+ assert_eq!(name, "small_col");
+ assert_col_expr(col, "small_col", 0);
+
+ let swapped_join = swapping_projection
+ .input()
+ .as_any()
+ .downcast_ref::<NestedLoopJoinExec>()
+ .expect("The type of the plan should not be changed");
+
+ // Assert join side of big_col swapped in filter expression
+ let swapped_filter = swapped_join.filter().unwrap();
+ let swapped_big_col_idx =
swapped_filter.schema().index_of("big_col").unwrap();
+ let swapped_big_col_side = swapped_filter
+ .column_indices()
+ .get(swapped_big_col_idx)
+ .unwrap()
+ .side;
+ assert_eq!(
+ swapped_big_col_side,
+ JoinSide::Right,
+ "Filter column side should be swapped"
+ );
+
+ assert_eq!(
+ swapped_join.left().statistics().unwrap().total_byte_size,
+ Precision::Inexact(8192)
+ );
+ assert_eq!(
+ swapped_join.right().statistics().unwrap().total_byte_size,
+ Precision::Inexact(2097152)
+ );
+ crosscheck_plans(join.clone()).unwrap();
+ }
+
+ #[rstest(
+ join_type,
+ case::left_semi(JoinType::LeftSemi),
+ case::left_anti(JoinType::LeftAnti),
+ case::right_semi(JoinType::RightSemi),
+ case::right_anti(JoinType::RightAnti)
+ )]
+ #[tokio::test]
+ async fn test_nl_join_with_swap_no_proj(join_type: JoinType) {
+ let (big, small) = create_big_and_small();
+
+ let join = Arc::new(
+ NestedLoopJoinExec::try_new(
+ Arc::clone(&big),
+ Arc::clone(&small),
+ nl_join_filter(),
+ &join_type,
+ )
+ .unwrap(),
+ );
+
+ let optimized_join = JoinSelection::new()
+ .optimize(join.clone(), &ConfigOptions::new())
+ .unwrap();
+
+ let swapped_join = optimized_join
+ .as_any()
+ .downcast_ref::<NestedLoopJoinExec>()
+ .expect("The type of the plan should not be changed");
+
+ // Assert before/after schemas are equal
+ assert_eq!(
+ join.schema(),
+ swapped_join.schema(),
+ "Join schema should not be modified while optimization"
+ );
+
+ // Assert join side of big_col swapped in filter expression
+ let swapped_filter = swapped_join.filter().unwrap();
+ let swapped_big_col_idx =
swapped_filter.schema().index_of("big_col").unwrap();
+ let swapped_big_col_side = swapped_filter
+ .column_indices()
+ .get(swapped_big_col_idx)
+ .unwrap()
+ .side;
+ assert_eq!(
+ swapped_big_col_side,
+ JoinSide::Right,
+ "Filter column side should be swapped"
+ );
+
+ assert_eq!(
+ swapped_join.left().statistics().unwrap().total_byte_size,
+ Precision::Inexact(8192)
+ );
+ assert_eq!(
+ swapped_join.right().statistics().unwrap().total_byte_size,
+ Precision::Inexact(2097152)
+ );
+ crosscheck_plans(join.clone()).unwrap();
+ }
+
#[tokio::test]
async fn test_swap_reverting_projection() {
let left_schema = Schema::new(vec![
diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs
b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
index e25f04dc4b..fbfa0ffc19 100644
--- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
@@ -21,13 +21,19 @@ use arrow::array::{ArrayRef, Int32Array};
use arrow::compute::SortOptions;
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
+use arrow_schema::Schema;
use rand::Rng;
+use datafusion::common::JoinSide;
+use datafusion::logical_expr::{JoinType, Operator};
+use datafusion::physical_expr::expressions::BinaryExpr;
use datafusion::physical_plan::collect;
use datafusion::physical_plan::expressions::Column;
-use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode,
SortMergeJoinExec};
+use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
+use datafusion::physical_plan::joins::{
+ HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec,
+};
use datafusion::physical_plan::memory::MemoryExec;
-use datafusion_expr::JoinType;
use datafusion::prelude::{SessionConfig, SessionContext};
use test_utils::stagger_batch_with_seed;
@@ -73,7 +79,7 @@ async fn test_full_join_1k() {
}
#[tokio::test]
-async fn test_semi_join_1k() {
+async fn test_semi_join_10k() {
run_join_test(
make_staggered_batches(10000),
make_staggered_batches(10000),
@@ -83,7 +89,7 @@ async fn test_semi_join_1k() {
}
#[tokio::test]
-async fn test_anti_join_1k() {
+async fn test_anti_join_10k() {
run_join_test(
make_staggered_batches(10000),
make_staggered_batches(10000),
@@ -118,6 +124,46 @@ async fn run_join_test(
),
];
+ // Nested loop join uses filter for joining records
+ let column_indices = vec![
+ ColumnIndex {
+ index: 0,
+ side: JoinSide::Left,
+ },
+ ColumnIndex {
+ index: 1,
+ side: JoinSide::Left,
+ },
+ ColumnIndex {
+ index: 0,
+ side: JoinSide::Right,
+ },
+ ColumnIndex {
+ index: 1,
+ side: JoinSide::Right,
+ },
+ ];
+ let intermediate_schema = Schema::new(vec![
+ schema1.field_with_name("a").unwrap().to_owned(),
+ schema1.field_with_name("b").unwrap().to_owned(),
+ schema2.field_with_name("a").unwrap().to_owned(),
+ schema2.field_with_name("b").unwrap().to_owned(),
+ ]);
+
+ let equal_a = Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("a", 0)),
+ Operator::Eq,
+ Arc::new(Column::new("a", 2)),
+ )) as _;
+ let equal_b = Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("b", 1)),
+ Operator::Eq,
+ Arc::new(Column::new("b", 3)),
+ )) as _;
+ let expression = Arc::new(BinaryExpr::new(equal_a, Operator::And,
equal_b)) as _;
+
+ let on_filter = JoinFilter::new(expression, column_indices,
intermediate_schema);
+
// sort-merge join
let left = Arc::new(
MemoryExec::try_new(&[input1.clone()], schema1.clone(),
None).unwrap(),
@@ -161,9 +207,23 @@ async fn run_join_test(
);
let hj_collected = collect(hj, task_ctx.clone()).await.unwrap();
+ // nested loop join
+ let left = Arc::new(
+ MemoryExec::try_new(&[input1.clone()], schema1.clone(),
None).unwrap(),
+ );
+ let right = Arc::new(
+ MemoryExec::try_new(&[input2.clone()], schema2.clone(),
None).unwrap(),
+ );
+ let nlj = Arc::new(
+ NestedLoopJoinExec::try_new(left, right, Some(on_filter),
&join_type)
+ .unwrap(),
+ );
+ let nlj_collected = collect(nlj, task_ctx.clone()).await.unwrap();
+
// compare
let smj_formatted =
pretty_format_batches(&smj_collected).unwrap().to_string();
let hj_formatted =
pretty_format_batches(&hj_collected).unwrap().to_string();
+ let nlj_formatted =
pretty_format_batches(&nlj_collected).unwrap().to_string();
let mut smj_formatted_sorted: Vec<&str> =
smj_formatted.trim().lines().collect();
smj_formatted_sorted.sort_unstable();
@@ -171,12 +231,31 @@ async fn run_join_test(
let mut hj_formatted_sorted: Vec<&str> =
hj_formatted.trim().lines().collect();
hj_formatted_sorted.sort_unstable();
+ let mut nlj_formatted_sorted: Vec<&str> =
nlj_formatted.trim().lines().collect();
+ nlj_formatted_sorted.sort_unstable();
+
for (i, (smj_line, hj_line)) in smj_formatted_sorted
.iter()
.zip(&hj_formatted_sorted)
.enumerate()
{
- assert_eq!((i, smj_line), (i, hj_line));
+ assert_eq!(
+ (i, smj_line),
+ (i, hj_line),
+ "SortMergeJoinExec and HashJoinExec produced different results"
+ );
+ }
+
+ for (i, (nlj_line, hj_line)) in nlj_formatted_sorted
+ .iter()
+ .zip(&hj_formatted_sorted)
+ .enumerate()
+ {
+ assert_eq!(
+ (i, nlj_line),
+ (i, hj_line),
+ "NestedLoopJoinExec and HashJoinExec produced different
results"
+ );
}
}
}
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index e6236e45f0..5fccd63029 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -21,21 +21,22 @@
use std::any::Any;
use std::fmt::Formatter;
+use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Poll;
use crate::coalesce_batches::concat_batches;
+use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::joins::utils::{
- append_right_indices, apply_join_filter_to_indices,
build_batch_from_indices,
- build_join_schema, check_join_is_valid, estimate_join_statistics,
get_anti_indices,
- get_final_indices_from_bit_map, get_semi_indices,
- partitioned_join_output_partitioning, BuildProbeJoinMetrics, ColumnIndex,
JoinFilter,
- OnceAsync, OnceFut,
+ adjust_indices_by_join_type, adjust_right_output_partitioning,
+ apply_join_filter_to_indices, build_batch_from_indices, build_join_schema,
+ check_join_is_valid, estimate_join_statistics,
get_final_indices_from_bit_map,
+ BuildProbeJoinMetrics, ColumnIndex, JoinFilter, OnceAsync, OnceFut,
};
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::{
execution_mode_from_children, DisplayAs, DisplayFormatType, Distribution,
- ExecutionMode, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
+ ExecutionMode, ExecutionPlan, ExecutionPlanProperties, Partitioning,
PlanProperties,
RecordBatchStream, SendableRecordBatchStream,
};
@@ -52,28 +53,90 @@ use datafusion_expr::JoinType;
use datafusion_physical_expr::equivalence::join_equivalence_properties;
use futures::{ready, Stream, StreamExt, TryStreamExt};
+use parking_lot::Mutex;
+
+use super::utils::need_produce_result_in_final;
+
+/// Shared bitmap for visited left-side indices
+type SharedBitmapBuilder = Mutex<BooleanBufferBuilder>;
+/// Left (build-side) data
+struct JoinLeftData {
+ /// Build-side data collected to single batch
+ batch: RecordBatch,
+ /// Shared bitmap builder for visited left indices
+ bitmap: SharedBitmapBuilder,
+ /// Counter of running probe-threads, potentially able to update `bitmap`
+ probe_threads_counter: AtomicUsize,
+ /// Memory reservation for tracking batch and bitmap
+ /// Cleared on `JoinLeftData` drop
+ #[allow(dead_code)]
+ reservation: MemoryReservation,
+}
-/// Data of the inner table side
-type JoinLeftData = (RecordBatch, MemoryReservation);
+impl JoinLeftData {
+ fn new(
+ batch: RecordBatch,
+ bitmap: SharedBitmapBuilder,
+ probe_threads_counter: AtomicUsize,
+ reservation: MemoryReservation,
+ ) -> Self {
+ Self {
+ batch,
+ bitmap,
+ probe_threads_counter,
+ reservation,
+ }
+ }
+
+ fn batch(&self) -> &RecordBatch {
+ &self.batch
+ }
-/// NestedLoopJoinExec executes partitions in parallel.
-/// One input will be collected to a single partition, call it inner-table.
-/// The other side of the input is treated as outer-table, and the output
Partitioning is from it.
-/// Giving an output partition number x, the execution will be:
+ fn bitmap(&self) -> &SharedBitmapBuilder {
+ &self.bitmap
+ }
+
+ /// Decrements counter of running threads, and returns `true`
+ /// if caller is the last running thread
+ fn report_probe_completed(&self) -> bool {
+ self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
+ }
+}
+
+/// NestedLoopJoinExec is build-probe join operator, whose main task is to
+/// perform joins without any equijoin conditions in `ON` clause.
+///
+/// Execution consists of following phases:
///
-/// ```text
-/// for outer-table-batch in outer-table-partition-x
-/// check-join(outer-table-batch, inner-table-data)
-/// ```
+/// #### 1. Build phase
+/// Collecting build-side data in memory, by polling all available data from
build-side input.
+/// Due to the absence of equijoin conditions, it's not possible to partition
build-side data
+/// across multiple threads of the operator, so build-side is always collected
in a single
+/// batch shared across all threads.
+/// The operator always considers LEFT input as build-side input, so it's
crucial to adjust
+/// smaller input to be the LEFT one. Normally this selection is handled by
physical optimizer.
///
-/// One of the inputs will become inner table, and it is decided by the join
type.
-/// Following is the relation table:
+/// #### 2. Probe phase
+/// Sequentially polling batches from the probe-side input and processing them
according to the
+/// following logic:
+/// - apply join filter (`ON` clause) to Cartesian product of probe batch and
build side data
+/// -- filter evaluation is executed once per build-side data row
+/// - update shared bitmap of joined ("visited") build-side row indices, if
required -- allows
+/// to produce unmatched build-side data in case of e.g. LEFT/FULL JOIN
after probing phase
+/// completed
+/// - perform join index alignment is required -- depending on `JoinType`
+/// - produce output join batch
///
-/// | JoinType | Distribution (left, right)
| Inner-table |
-///
|--------------------------------|--------------------------------------------|-------------|
-/// | Inner/Left/LeftSemi/LeftAnti | (UnspecifiedDistribution,
SinglePartition) | right |
-/// | Right/RightSemi/RightAnti/Full | (SinglePartition,
UnspecifiedDistribution) | left |
-/// | Full | (SinglePartition, SinglePartition)
| left |
+/// Probing phase is executed in parallel, according to probe-side input
partitioning -- one
+/// thread per partition. After probe input is exhausted, each thread
**ATTEMPTS** to produce
+/// unmatched build-side data.
+///
+/// #### 3. Producing unmatched build-side data
+/// Producing unmatched build-side data as an output batch, after probe input
is exhausted.
+/// This step is also executed in parallel (once per probe input partition),
and to avoid
+/// duplicate output of unmatched data (due to shared nature build-side data),
each thread
+/// "reports" about probe phase completion (which means that "visited" bitmap
won't be
+/// updated anymore), and only the last thread, reporting about completion,
will return output.
///
#[derive(Debug)]
pub struct NestedLoopJoinExec {
@@ -112,6 +175,7 @@ impl NestedLoopJoinExec {
build_join_schema(&left_schema, &right_schema, join_type);
let schema = Arc::new(schema);
let cache = Self::compute_properties(&left, &right, schema.clone(),
*join_type);
+
Ok(NestedLoopJoinExec {
left,
right,
@@ -165,15 +229,19 @@ impl NestedLoopJoinExec {
);
// Get output partitioning,
- let output_partitioning = if join_type == JoinType::Full {
- left.output_partitioning().clone()
- } else {
- partitioned_join_output_partitioning(
- join_type,
- left.output_partitioning(),
+ let output_partitioning = match join_type {
+ JoinType::Inner | JoinType::Right =>
adjust_right_output_partitioning(
right.output_partitioning(),
- left.schema().fields.len(),
- )
+ left.schema().fields().len(),
+ ),
+ JoinType::RightSemi | JoinType::RightAnti => {
+ right.output_partitioning().clone()
+ }
+ JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti |
JoinType::Full => {
+ Partitioning::UnknownPartitioning(
+ right.output_partitioning().partition_count(),
+ )
+ }
};
// Determine execution mode:
@@ -218,7 +286,10 @@ impl ExecutionPlan for NestedLoopJoinExec {
}
fn required_input_distribution(&self) -> Vec<Distribution> {
- distribution_from_join_type(&self.join_type)
+ vec![
+ Distribution::SinglePartition,
+ Distribution::UnspecifiedDistribution,
+ ]
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -249,38 +320,17 @@ impl ExecutionPlan for NestedLoopJoinExec {
MemoryConsumer::new(format!("NestedLoopJoinLoad[{partition}]"))
.register(context.memory_pool());
- // Initialization of stream-level reservation
- let reservation =
- MemoryConsumer::new(format!("NestedLoopJoinStream[{partition}]"))
- .register(context.memory_pool());
-
- let (outer_table, inner_table) = if left_is_build_side(self.join_type)
{
- // left must be single partition
- let inner_table = self.inner_table.once(|| {
- load_specified_partition_of_input(
- 0,
- self.left.clone(),
- context.clone(),
- join_metrics.clone(),
- load_reservation,
- )
- });
- let outer_table = self.right.execute(partition, context)?;
- (outer_table, inner_table)
- } else {
- // right must be single partition
- let inner_table = self.inner_table.once(|| {
- load_specified_partition_of_input(
- 0,
- self.right.clone(),
- context.clone(),
- join_metrics.clone(),
- load_reservation,
- )
- });
- let outer_table = self.left.execute(partition, context)?;
- (outer_table, inner_table)
- };
+ let inner_table = self.inner_table.once(|| {
+ collect_left_input(
+ self.left.clone(),
+ context.clone(),
+ join_metrics.clone(),
+ load_reservation,
+ need_produce_result_in_final(self.join_type),
+ self.right().output_partitioning().partition_count(),
+ )
+ });
+ let outer_table = self.right.execute(partition, context)?;
Ok(Box::pin(NestedLoopJoinStream {
schema: self.schema.clone(),
@@ -289,10 +339,8 @@ impl ExecutionPlan for NestedLoopJoinExec {
outer_table,
inner_table,
is_exhausted: false,
- visited_left_side: None,
column_indices: self.column_indices.clone(),
join_metrics,
- reservation,
}))
}
@@ -311,43 +359,25 @@ impl ExecutionPlan for NestedLoopJoinExec {
}
}
-// For the nested loop join, different join type need the different
distribution for
-// left and right node.
-fn distribution_from_join_type(join_type: &JoinType) -> Vec<Distribution> {
- match join_type {
- JoinType::Inner | JoinType::Left | JoinType::LeftSemi |
JoinType::LeftAnti => {
- // need the left data, and the right should be one partition
- vec![
- Distribution::UnspecifiedDistribution,
- Distribution::SinglePartition,
- ]
- }
- JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => {
- // need the right data, and the left should be one partition
- vec![
- Distribution::SinglePartition,
- Distribution::UnspecifiedDistribution,
- ]
- }
- JoinType::Full => {
- // need the left and right data, and the left and right should be
one partition
- vec![Distribution::SinglePartition, Distribution::SinglePartition]
- }
- }
-}
-
-/// Asynchronously collect the specified partition data of the input
-async fn load_specified_partition_of_input(
- partition: usize,
+/// Asynchronously collect input into a single batch, and creates
`JoinLeftData` from it
+async fn collect_left_input(
input: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
join_metrics: BuildProbeJoinMetrics,
reservation: MemoryReservation,
+ with_visited_left_side: bool,
+ probe_threads_count: usize,
) -> Result<JoinLeftData> {
- let stream = input.execute(partition, context)?;
+ let schema = input.schema();
+ let merge = if input.output_partitioning().partition_count() != 1 {
+ Arc::new(CoalescePartitionsExec::new(input))
+ } else {
+ input
+ };
+ let stream = merge.execute(0, context)?;
// Load all batches and count the rows
- let (batches, num_rows, _, reservation) = stream
+ let (batches, num_rows, metrics, mut reservation) = stream
.try_fold(
(Vec::new(), 0usize, join_metrics, reservation),
|mut acc, batch| async {
@@ -367,19 +397,31 @@ async fn load_specified_partition_of_input(
)
.await?;
- let merged_batch = concat_batches(&input.schema(), &batches, num_rows)?;
+ let merged_batch = concat_batches(&schema, &batches, num_rows)?;
- Ok((merged_batch, reservation))
-}
+ // Reserve memory for visited_left_side bitmap if required by join type
+ let visited_left_side = if with_visited_left_side {
+ // TODO: Replace `ceil` wrapper with stable `div_cell` after
+ // https://github.com/rust-lang/rust/issues/88581
+ let buffer_size = bit_util::ceil(merged_batch.num_rows(), 8);
+ reservation.try_grow(buffer_size)?;
+ metrics.build_mem_used.add(buffer_size);
-// BuildLeft means the left relation is the single patrition side.
-// For full join, both side are single partition, so it is BuildLeft and
BuildRight, treat it as BuildLeft.
-pub fn left_is_build_side(join_type: JoinType) -> bool {
- matches!(
- join_type,
- JoinType::Right | JoinType::RightSemi | JoinType::RightAnti |
JoinType::Full
- )
+ let mut buffer = BooleanBufferBuilder::new(merged_batch.num_rows());
+ buffer.append_n(merged_batch.num_rows(), false);
+ buffer
+ } else {
+ BooleanBufferBuilder::new(0)
+ };
+
+ Ok(JoinLeftData::new(
+ merged_batch,
+ Mutex::new(visited_left_side),
+ AtomicUsize::new(probe_threads_count),
+ reservation,
+ ))
}
+
/// A stream that issues [RecordBatch]es as they arrive from the right of the
join.
struct NestedLoopJoinStream {
/// Input schema
@@ -394,16 +436,12 @@ struct NestedLoopJoinStream {
inner_table: OnceFut<JoinLeftData>,
/// There is nothing to process anymore and left side is processed in case
of full join
is_exhausted: bool,
- /// Keeps track of the left side rows whether they are visited
- visited_left_side: Option<BooleanBufferBuilder>,
/// Information of index and left / right placement of columns
column_indices: Vec<ColumnIndex>,
// TODO: support null aware equal
// null_equals_null: bool
/// Join execution metrics
join_metrics: BuildProbeJoinMetrics,
- /// Memory reservation for visited_left_side
- reservation: MemoryReservation,
}
fn build_join_indices(
@@ -434,39 +472,20 @@ fn build_join_indices(
}
impl NestedLoopJoinStream {
- /// For Right/RightSemi/RightAnti/Full joins, left is the single partition
side.
- fn poll_next_impl_for_build_left(
+ fn poll_next_impl(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
// all left row
let build_timer = self.join_metrics.build_time.timer();
- let (left_data, _) = match ready!(self.inner_table.get(cx)) {
+ let left_data = match ready!(self.inner_table.get_shared(cx)) {
Ok(data) => data,
Err(e) => return Poll::Ready(Some(Err(e))),
};
build_timer.done();
- if self.visited_left_side.is_none() && self.join_type ==
JoinType::Full {
- // TODO: Replace `ceil` wrapper with stable `div_cell` after
- // https://github.com/rust-lang/rust/issues/88581
- let visited_bitmap_size = bit_util::ceil(left_data.num_rows(), 8);
- self.reservation.try_grow(visited_bitmap_size)?;
- self.join_metrics.build_mem_used.add(visited_bitmap_size);
- }
-
- // add a bitmap for full join.
- let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
- let left_num_rows = left_data.num_rows();
- // only full join need bitmap
- if self.join_type == JoinType::Full {
- let mut buffer = BooleanBufferBuilder::new(left_num_rows);
- buffer.append_n(left_num_rows, false);
- buffer
- } else {
- BooleanBufferBuilder::new(0)
- }
- });
+ // Get or initialize visited_left_side bitmap if required by join type
+ let visited_left_side = left_data.bitmap();
self.outer_table
.poll_next_unpin(cx)
@@ -478,7 +497,7 @@ impl NestedLoopJoinStream {
let timer = self.join_metrics.join_time.timer();
let result = join_left_and_right_batch(
- left_data,
+ left_data.batch(),
&right_batch,
self.join_type,
self.filter.as_ref(),
@@ -498,21 +517,32 @@ impl NestedLoopJoinStream {
}
Some(err) => Some(err),
None => {
- if self.join_type == JoinType::Full && !self.is_exhausted {
+ if need_produce_result_in_final(self.join_type) &&
!self.is_exhausted
+ {
+ // At this stage `visited_left_side` won't be updated,
so it's
+ // safe to report about probe completion.
+ //
+ // Setting `is_exhausted` / returning None will
prevent from
+ // multiple calls of `report_probe_completed()`
+ if !left_data.report_probe_completed() {
+ self.is_exhausted = true;
+ return None;
+ };
+
// Only setting up timer, input is exhausted
let timer = self.join_metrics.join_time.timer();
-
// use the global left bitmap to produce the left
indices and right indices
- let (left_side, right_side) =
get_final_indices_from_bit_map(
- visited_left_side,
- self.join_type,
- );
+ let (left_side, right_side) =
+ get_final_indices_from_shared_bitmap(
+ visited_left_side,
+ self.join_type,
+ );
let empty_right_batch =
RecordBatch::new_empty(self.outer_table.schema());
// use the left and right indices to produce the batch
result
let result = build_batch_from_indices(
&self.schema,
- left_data,
+ left_data.batch(),
&empty_right_batch,
&left_side,
&right_side,
@@ -536,55 +566,6 @@ impl NestedLoopJoinStream {
}
})
}
-
- /// For Inner/Left/LeftSemi/LeftAnti joins, right is the single partition
side.
- fn poll_next_impl_for_build_right(
- &mut self,
- cx: &mut std::task::Context<'_>,
- ) -> Poll<Option<Result<RecordBatch>>> {
- // all right row
- let build_timer = self.join_metrics.build_time.timer();
- let (right_data, _) = match ready!(self.inner_table.get(cx)) {
- Ok(data) => data,
- Err(e) => return Poll::Ready(Some(Err(e))),
- };
- build_timer.done();
-
- // for build right, bitmap is not needed.
- let mut empty_visited_left_side = BooleanBufferBuilder::new(0);
- self.outer_table
- .poll_next_unpin(cx)
- .map(|maybe_batch| match maybe_batch {
- Some(Ok(left_batch)) => {
- // Setting up timer & updating input metrics
- self.join_metrics.input_batches.add(1);
- self.join_metrics.input_rows.add(left_batch.num_rows());
- let timer = self.join_metrics.join_time.timer();
-
- // Actual join execution
- let result = join_left_and_right_batch(
- &left_batch,
- right_data,
- self.join_type,
- self.filter.as_ref(),
- &self.column_indices,
- &self.schema,
- &mut empty_visited_left_side,
- );
-
- // Recording time & updating output metrics
- if let Ok(batch) = &result {
- timer.done();
- self.join_metrics.output_batches.add(1);
- self.join_metrics.output_rows.add(batch.num_rows());
- }
-
- Some(result)
- }
- Some(err) => Some(err),
- None => None,
- })
- }
}
fn join_left_and_right_batch(
@@ -594,7 +575,7 @@ fn join_left_and_right_batch(
filter: Option<&JoinFilter>,
column_indices: &[ColumnIndex],
schema: &Schema,
- visited_left_side: &mut BooleanBufferBuilder,
+ visited_left_side: &SharedBitmapBuilder,
) -> Result<RecordBatch> {
let indices_result = (0..left_batch.num_rows())
.map(|left_row_index| {
@@ -625,17 +606,17 @@ fn join_left_and_right_batch(
Ok((left_side, right_side)) => {
// set the left bitmap
// and only full join need the left bitmap
- if join_type == JoinType::Full {
+ if need_produce_result_in_final(join_type) {
+ let mut bitmap = visited_left_side.lock();
left_side.iter().flatten().for_each(|x| {
- visited_left_side.set_bit(x as usize, true);
+ bitmap.set_bit(x as usize, true);
});
}
// adjust the two side indices base on the join type
let (left_side, right_side) = adjust_indices_by_join_type(
left_side,
right_side,
- left_batch.num_rows(),
- right_batch.num_rows(),
+ 0..right_batch.num_rows(),
join_type,
);
@@ -653,86 +634,12 @@ fn join_left_and_right_batch(
}
}
-fn adjust_indices_by_join_type(
- left_indices: UInt64Array,
- right_indices: UInt32Array,
- count_left_batch: usize,
- count_right_batch: usize,
+fn get_final_indices_from_shared_bitmap(
+ shared_bitmap: &SharedBitmapBuilder,
join_type: JoinType,
) -> (UInt64Array, UInt32Array) {
- match join_type {
- JoinType::Inner => (left_indices, right_indices),
- JoinType::Left => {
- // matched
- // unmatched left row will be produced in this batch
- let left_unmatched_indices =
- get_anti_indices(0..count_left_batch, &left_indices);
- // combine the matched and unmatched left result together
- append_left_indices(left_indices, right_indices,
left_unmatched_indices)
- }
- JoinType::LeftSemi => {
- // need to remove the duplicated record in the left side
- let left_indices = get_semi_indices(0..count_left_batch,
&left_indices);
- // the right_indices will not be used later for the `left semi`
join
- (left_indices, right_indices)
- }
- JoinType::LeftAnti => {
- // need to remove the duplicated record in the left side
- // get the anti index for the left side
- let left_indices = get_anti_indices(0..count_left_batch,
&left_indices);
- // the right_indices will not be used later for the `left anti`
join
- (left_indices, right_indices)
- }
- // right/right-semi/right-anti => right = outer_table, left =
inner_table
- JoinType::Right | JoinType::Full => {
- // matched
- // unmatched right row will be produced in this batch
- let right_unmatched_indices =
- get_anti_indices(0..count_right_batch, &right_indices);
- // combine the matched and unmatched right result together
- append_right_indices(left_indices, right_indices,
right_unmatched_indices)
- }
- JoinType::RightSemi => {
- // need to remove the duplicated record in the right side
- let right_indices = get_semi_indices(0..count_right_batch,
&right_indices);
- // the left_indices will not be used later for the `right semi`
join
- (left_indices, right_indices)
- }
- JoinType::RightAnti => {
- // need to remove the duplicated record in the right side
- // get the anti index for the right side
- let right_indices = get_anti_indices(0..count_right_batch,
&right_indices);
- // the left_indices will not be used later for the `right anti`
join
- (left_indices, right_indices)
- }
- }
-}
-
-/// Appends the `left_unmatched_indices` to the `left_indices`,
-/// and fills Null to tail of `right_indices` to
-/// keep the length of `left_indices` and `right_indices` consistent.
-fn append_left_indices(
- left_indices: UInt64Array,
- right_indices: UInt32Array,
- left_unmatched_indices: UInt64Array,
-) -> (UInt64Array, UInt32Array) {
- if left_unmatched_indices.is_empty() {
- (left_indices, right_indices)
- } else {
- let unmatched_size = left_unmatched_indices.len();
- // the new left indices: left_indices + null array
- // the new right indices: right_indices + right_unmatched_indices
- let new_left_indices = left_indices
- .iter()
- .chain(left_unmatched_indices.iter())
- .collect::<UInt64Array>();
- let new_right_indices = right_indices
- .iter()
- .chain(std::iter::repeat(None).take(unmatched_size))
- .collect::<UInt32Array>();
-
- (new_left_indices, new_right_indices)
- }
+ let bitmap = shared_bitmap.lock();
+ get_final_indices_from_bit_map(&bitmap, join_type)
}
impl Stream for NestedLoopJoinStream {
@@ -742,11 +649,7 @@ impl Stream for NestedLoopJoinStream {
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
- if left_is_build_side(self.join_type) {
- self.poll_next_impl_for_build_left(cx)
- } else {
- self.poll_next_impl_for_build_right(cx)
- }
+ self.poll_next_impl(cx)
}
}
@@ -851,35 +754,19 @@ mod tests {
context: Arc<TaskContext>,
) -> Result<(Vec<String>, Vec<RecordBatch>)> {
let partition_count = 4;
- let mut output_partition = 1;
- let distribution = distribution_from_join_type(join_type);
- // left
- let left = if matches!(distribution[0], Distribution::SinglePartition)
{
- left
- } else {
- output_partition = partition_count;
- Arc::new(RepartitionExec::try_new(
- left,
- Partitioning::RoundRobinBatch(partition_count),
- )?)
- } as Arc<dyn ExecutionPlan>;
-
- let right = if matches!(distribution[1],
Distribution::SinglePartition) {
- right
- } else {
- output_partition = partition_count;
- Arc::new(RepartitionExec::try_new(
- right,
- Partitioning::RoundRobinBatch(partition_count),
- )?)
- } as Arc<dyn ExecutionPlan>;
+
+ // Redistributing right input
+ let right = Arc::new(RepartitionExec::try_new(
+ right,
+ Partitioning::RoundRobinBatch(partition_count),
+ )?) as Arc<dyn ExecutionPlan>;
// Use the required distribution for nested loop join to test
partition data
let nested_loop_join =
NestedLoopJoinExec::try_new(left, right, join_filter, join_type)?;
let columns = columns(&nested_loop_join.schema());
let mut batches = vec![];
- for i in 0..output_partition {
+ for i in 0..partition_count {
let stream = nested_loop_join.execute(i, context.clone())?;
let more_batches = common::collect(stream).await?;
batches.extend(
diff --git a/datafusion/sqllogictest/test_files/cte.slt
b/datafusion/sqllogictest/test_files/cte.slt
index b85b49c3e1..7d0f929962 100644
--- a/datafusion/sqllogictest/test_files/cte.slt
+++ b/datafusion/sqllogictest/test_files/cte.slt
@@ -390,7 +390,7 @@ WITH RECURSIVE "recursive_cte" AS (
SELECT
2 as "val"
FROM
- "recursive_cte"
+ "recursive_cte"
FULL JOIN "sub_cte" ON 1 = 1
WHERE
"recursive_cte"."val" < 2
diff --git a/datafusion/sqllogictest/test_files/group_by.slt
b/datafusion/sqllogictest/test_files/group_by.slt
index 77a31cf89a..7bef738337 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -3429,9 +3429,9 @@ physical_plan
07)------------AggregateExec: mode=Partial, gby=[sn@1 as sn, amount@2 as
amount], aggr=[SUM(l.amount)]
08)--------------ProjectionExec: expr=[amount@1 as amount, sn@2 as sn,
amount@3 as amount]
09)----------------NestedLoopJoinExec: join_type=Inner, filter=sn@0 >= sn@1
-10)------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0,
0, 0, 0]
-11)------------------CoalescePartitionsExec
-12)--------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0,
0, 0, 0, 0]
+10)------------------CoalescePartitionsExec
+11)--------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0,
0, 0, 0, 0]
+12)------------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0,
0, 0, 0]
query IRR
SELECT r.sn, SUM(l.amount), r.amount
diff --git a/datafusion/sqllogictest/test_files/joins.slt
b/datafusion/sqllogictest/test_files/joins.slt
index ce72546bef..65e6c17b92 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -2010,7 +2010,8 @@ set datafusion.explain.logical_plan_only = false;
statement ok
set datafusion.execution.target_partitions = 4;
-# Right as inner table nested loop join
+# Planning inner nested loop join
+# inputs are swapped due to inexact statistics + join reordering caused
additional projection
query TT
EXPLAIN
@@ -2027,17 +2028,18 @@ logical_plan
05)----Filter: join_t2.t2_int > UInt32(1)
06)------TableScan: join_t2 projection=[t2_id, t2_int]
physical_plan
-01)NestedLoopJoinExec: join_type=Inner, filter=t1_id@0 > t2_id@1
-02)--CoalesceBatchesExec: target_batch_size=2
-03)----FilterExec: t1_id@0 > 10
-04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-05)--------MemoryExec: partitions=1, partition_sizes=[1]
-06)--CoalescePartitionsExec
-07)----ProjectionExec: expr=[t2_id@0 as t2_id]
-08)------CoalesceBatchesExec: target_batch_size=2
-09)--------FilterExec: t2_int@1 > 1
-10)----------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-11)------------MemoryExec: partitions=1, partition_sizes=[1]
+01)ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id]
+02)--NestedLoopJoinExec: join_type=Inner, filter=t1_id@0 > t2_id@1
+03)----CoalescePartitionsExec
+04)------ProjectionExec: expr=[t2_id@0 as t2_id]
+05)--------CoalesceBatchesExec: target_batch_size=2
+06)----------FilterExec: t2_int@1 > 1
+07)------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+08)--------------MemoryExec: partitions=1, partition_sizes=[1]
+09)----CoalesceBatchesExec: target_batch_size=2
+10)------FilterExec: t1_id@0 > 10
+11)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+12)----------MemoryExec: partitions=1, partition_sizes=[1]
query II
SELECT join_t1.t1_id, join_t2.t2_id
@@ -3473,9 +3475,9 @@ logical_plan
05)----TableScan: annotated_data projection=[a0, a, b, c, d]
physical_plan
01)NestedLoopJoinExec: join_type=Inner, filter=a@1 < a@0
-02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-03)----CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
has_header=true
-04)--CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
has_header=true
+02)--CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
has_header=true
+03)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+04)----CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
has_header=true
# Currently datafusion cannot pushdown filter conditions with scalar UDF into
# cross join.
diff --git a/datafusion/sqllogictest/test_files/tpch/q11.slt.part
b/datafusion/sqllogictest/test_files/tpch/q11.slt.part
index 55b38333ca..3050af6f89 100644
--- a/datafusion/sqllogictest/test_files/tpch/q11.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/q11.slt.part
@@ -75,10 +75,10 @@ logical_plan
26)----------------------TableScan: nation projection=[n_nationkey, n_name],
partial_filters=[nation.n_name = Utf8("GERMANY")]
physical_plan
01)GlobalLimitExec: skip=0, fetch=10
-02)--SortPreservingMergeExec: [value@1 DESC], fetch=10
-03)----SortExec: TopK(fetch=10), expr=[value@1 DESC]
-04)------ProjectionExec: expr=[ps_partkey@0 as ps_partkey,
SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@1 as value]
-05)--------NestedLoopJoinExec: join_type=Inner,
filter=CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@0 AS
Decimal128(38, 15)) > SUM(partsupp.ps_supplycost * partsupp.ps_availqty) *
Float64(0.0001)@1
+02)--SortExec: TopK(fetch=10), expr=[value@1 DESC]
+03)----ProjectionExec: expr=[ps_partkey@0 as ps_partkey,
SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@1 as value]
+04)------NestedLoopJoinExec: join_type=Inner,
filter=CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@0 AS
Decimal128(38, 15)) > SUM(partsupp.ps_supplycost * partsupp.ps_availqty) *
Float64(0.0001)@1
+05)--------CoalescePartitionsExec
06)----------AggregateExec: mode=FinalPartitioned, gby=[ps_partkey@0 as
ps_partkey], aggr=[SUM(partsupp.ps_supplycost * partsupp.ps_availqty)]
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------RepartitionExec: partitioning=Hash([ps_partkey@0], 4),
input_partitions=4
@@ -103,30 +103,30 @@ physical_plan
27)------------------------------FilterExec: n_name@1 = GERMANY
28)--------------------------------RepartitionExec:
partitioning=RoundRobinBatch(4), input_partitions=1
29)----------------------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]},
projection=[n_nationkey, n_name], has_header=false
-30)----------ProjectionExec: expr=[CAST(CAST(SUM(partsupp.ps_supplycost *
partsupp.ps_availqty)@0 AS Float64) * 0.0001 AS Decimal128(38, 15)) as
SUM(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001)]
-31)------------AggregateExec: mode=Final, gby=[],
aggr=[SUM(partsupp.ps_supplycost * partsupp.ps_availqty)]
-32)--------------CoalescePartitionsExec
-33)----------------AggregateExec: mode=Partial, gby=[],
aggr=[SUM(partsupp.ps_supplycost * partsupp.ps_availqty)]
-34)------------------CoalesceBatchesExec: target_batch_size=8192
-35)--------------------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(s_nationkey@2, n_nationkey@0)], projection=[ps_availqty@0, ps_supplycost@1]
-36)----------------------CoalesceBatchesExec: target_batch_size=8192
-37)------------------------RepartitionExec: partitioning=Hash([s_nationkey@2],
4), input_partitions=4
-38)--------------------------CoalesceBatchesExec: target_batch_size=8192
-39)----------------------------HashJoinExec: mode=Partitioned,
join_type=Inner, on=[(ps_suppkey@0, s_suppkey@0)], projection=[ps_availqty@1,
ps_supplycost@2, s_nationkey@4]
-40)------------------------------CoalesceBatchesExec: target_batch_size=8192
-41)--------------------------------RepartitionExec:
partitioning=Hash([ps_suppkey@0], 4), input_partitions=4
-42)----------------------------------CsvExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:0..2932049],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:2932049..5864098],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:5864098..8796147],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:8796147..11728193]]},
projection=[ps_suppkey, ps_availqty, ps_supplycost], [...]
-43)------------------------------CoalesceBatchesExec: target_batch_size=8192
-44)--------------------------------RepartitionExec:
partitioning=Hash([s_suppkey@0], 4), input_partitions=4
-45)----------------------------------RepartitionExec:
partitioning=RoundRobinBatch(4), input_partitions=1
-46)------------------------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]},
projection=[s_suppkey, s_nationkey], has_header=false
-47)----------------------CoalesceBatchesExec: target_batch_size=8192
-48)------------------------RepartitionExec: partitioning=Hash([n_nationkey@0],
4), input_partitions=4
-49)--------------------------ProjectionExec: expr=[n_nationkey@0 as
n_nationkey]
-50)----------------------------CoalesceBatchesExec: target_batch_size=8192
-51)------------------------------FilterExec: n_name@1 = GERMANY
-52)--------------------------------RepartitionExec:
partitioning=RoundRobinBatch(4), input_partitions=1
-53)----------------------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]},
projection=[n_nationkey, n_name], has_header=false
+30)--------ProjectionExec: expr=[CAST(CAST(SUM(partsupp.ps_supplycost *
partsupp.ps_availqty)@0 AS Float64) * 0.0001 AS Decimal128(38, 15)) as
SUM(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001)]
+31)----------AggregateExec: mode=Final, gby=[],
aggr=[SUM(partsupp.ps_supplycost * partsupp.ps_availqty)]
+32)------------CoalescePartitionsExec
+33)--------------AggregateExec: mode=Partial, gby=[],
aggr=[SUM(partsupp.ps_supplycost * partsupp.ps_availqty)]
+34)----------------CoalesceBatchesExec: target_batch_size=8192
+35)------------------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(s_nationkey@2, n_nationkey@0)], projection=[ps_availqty@0, ps_supplycost@1]
+36)--------------------CoalesceBatchesExec: target_batch_size=8192
+37)----------------------RepartitionExec: partitioning=Hash([s_nationkey@2],
4), input_partitions=4
+38)------------------------CoalesceBatchesExec: target_batch_size=8192
+39)--------------------------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(ps_suppkey@0, s_suppkey@0)], projection=[ps_availqty@1, ps_supplycost@2,
s_nationkey@4]
+40)----------------------------CoalesceBatchesExec: target_batch_size=8192
+41)------------------------------RepartitionExec:
partitioning=Hash([ps_suppkey@0], 4), input_partitions=4
+42)--------------------------------CsvExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:0..2932049],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:2932049..5864098],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:5864098..8796147],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:8796147..11728193]]},
projection=[ps_suppkey, ps_availqty, ps_supplycost], ha [...]
+43)----------------------------CoalesceBatchesExec: target_batch_size=8192
+44)------------------------------RepartitionExec:
partitioning=Hash([s_suppkey@0], 4), input_partitions=4
+45)--------------------------------RepartitionExec:
partitioning=RoundRobinBatch(4), input_partitions=1
+46)----------------------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]},
projection=[s_suppkey, s_nationkey], has_header=false
+47)--------------------CoalesceBatchesExec: target_batch_size=8192
+48)----------------------RepartitionExec: partitioning=Hash([n_nationkey@0],
4), input_partitions=4
+49)------------------------ProjectionExec: expr=[n_nationkey@0 as n_nationkey]
+50)--------------------------CoalesceBatchesExec: target_batch_size=8192
+51)----------------------------FilterExec: n_name@1 = GERMANY
+52)------------------------------RepartitionExec:
partitioning=RoundRobinBatch(4), input_partitions=1
+53)--------------------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]},
projection=[n_nationkey, n_name], has_header=false
diff --git a/datafusion/sqllogictest/test_files/tpch/q22.slt.part
b/datafusion/sqllogictest/test_files/tpch/q22.slt.part
index 73b3c16025..98c8ba3965 100644
--- a/datafusion/sqllogictest/test_files/tpch/q22.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/q22.slt.part
@@ -82,26 +82,28 @@ physical_plan
06)----------RepartitionExec: partitioning=Hash([cntrycode@0], 4),
input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[cntrycode@0 as cntrycode],
aggr=[COUNT(*), SUM(custsale.c_acctbal)]
08)--------------ProjectionExec: expr=[substr(c_phone@0, 1, 2) as cntrycode,
c_acctbal@1 as c_acctbal]
-09)----------------NestedLoopJoinExec: join_type=Inner,
filter=CAST(c_acctbal@0 AS Decimal128(19, 6)) > AVG(customer.c_acctbal)@1
-10)------------------CoalesceBatchesExec: target_batch_size=8192
-11)--------------------HashJoinExec: mode=Partitioned, join_type=LeftAnti,
on=[(c_custkey@0, o_custkey@0)], projection=[c_phone@1, c_acctbal@2]
+09)----------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+10)------------------NestedLoopJoinExec: join_type=Inner,
filter=CAST(c_acctbal@0 AS Decimal128(19, 6)) > AVG(customer.c_acctbal)@1
+11)--------------------CoalescePartitionsExec
12)----------------------CoalesceBatchesExec: target_batch_size=8192
-13)------------------------RepartitionExec: partitioning=Hash([c_custkey@0],
4), input_partitions=4
+13)------------------------HashJoinExec: mode=Partitioned, join_type=LeftAnti,
on=[(c_custkey@0, o_custkey@0)], projection=[c_phone@1, c_acctbal@2]
14)--------------------------CoalesceBatchesExec: target_batch_size=8192
-15)----------------------------FilterExec: Use substr(c_phone@1, 1, 2) IN
(SET) ([Literal { value: Utf8("13") }, Literal { value: Utf8("31") }, Literal {
value: Utf8("23") }, Literal { value: Utf8("29") }, Literal { value: Utf8("30")
}, Literal { value: Utf8("18") }, Literal { value: Utf8("17") }])
-16)------------------------------RepartitionExec:
partitioning=RoundRobinBatch(4), input_partitions=1
-17)--------------------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]},
projection=[c_custkey, c_phone, c_acctbal], has_header=false
-18)----------------------CoalesceBatchesExec: target_batch_size=8192
-19)------------------------RepartitionExec: partitioning=Hash([o_custkey@0],
4), input_partitions=4
-20)--------------------------CsvExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:0..4223281],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:4223281..8446562],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:8446562..12669843],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:12669843..16893122]]},
projection=[o_custkey], has_header=false
-21)------------------AggregateExec: mode=Final, gby=[],
aggr=[AVG(customer.c_acctbal)]
-22)--------------------CoalescePartitionsExec
-23)----------------------AggregateExec: mode=Partial, gby=[],
aggr=[AVG(customer.c_acctbal)]
-24)------------------------ProjectionExec: expr=[c_acctbal@1 as c_acctbal]
-25)--------------------------CoalesceBatchesExec: target_batch_size=8192
-26)----------------------------FilterExec: c_acctbal@1 > Some(0),15,2 AND Use
substr(c_phone@0, 1, 2) IN (SET) ([Literal { value: Utf8("13") }, Literal {
value: Utf8("31") }, Literal { value: Utf8("23") }, Literal { value: Utf8("29")
}, Literal { value: Utf8("30") }, Literal { value: Utf8("18") }, Literal {
value: Utf8("17") }])
-27)------------------------------RepartitionExec:
partitioning=RoundRobinBatch(4), input_partitions=1
-28)--------------------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]},
projection=[c_phone, c_acctbal], has_header=false
+15)----------------------------RepartitionExec:
partitioning=Hash([c_custkey@0], 4), input_partitions=4
+16)------------------------------CoalesceBatchesExec: target_batch_size=8192
+17)--------------------------------FilterExec: Use substr(c_phone@1, 1, 2) IN
(SET) ([Literal { value: Utf8("13") }, Literal { value: Utf8("31") }, Literal {
value: Utf8("23") }, Literal { value: Utf8("29") }, Literal { value: Utf8("30")
}, Literal { value: Utf8("18") }, Literal { value: Utf8("17") }])
+18)----------------------------------RepartitionExec:
partitioning=RoundRobinBatch(4), input_partitions=1
+19)------------------------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]},
projection=[c_custkey, c_phone, c_acctbal], has_header=false
+20)--------------------------CoalesceBatchesExec: target_batch_size=8192
+21)----------------------------RepartitionExec:
partitioning=Hash([o_custkey@0], 4), input_partitions=4
+22)------------------------------CsvExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:0..4223281],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:4223281..8446562],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:8446562..12669843],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:12669843..16893122]]},
projection=[o_custkey], has_header=false
+23)--------------------AggregateExec: mode=Final, gby=[],
aggr=[AVG(customer.c_acctbal)]
+24)----------------------CoalescePartitionsExec
+25)------------------------AggregateExec: mode=Partial, gby=[],
aggr=[AVG(customer.c_acctbal)]
+26)--------------------------ProjectionExec: expr=[c_acctbal@1 as c_acctbal]
+27)----------------------------CoalesceBatchesExec: target_batch_size=8192
+28)------------------------------FilterExec: c_acctbal@1 > Some(0),15,2 AND
Use substr(c_phone@0, 1, 2) IN (SET) ([Literal { value: Utf8("13") }, Literal {
value: Utf8("31") }, Literal { value: Utf8("23") }, Literal { value: Utf8("29")
}, Literal { value: Utf8("30") }, Literal { value: Utf8("18") }, Literal {
value: Utf8("17") }])
+29)--------------------------------RepartitionExec:
partitioning=RoundRobinBatch(4), input_partitions=1
+30)----------------------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]},
projection=[c_phone, c_acctbal], has_header=false
query TIR
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]