This is an automated email from the ASF dual-hosted git repository.
ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 99f2666a18 [Refactor] PipelineFixer physical optimizer rule removal
(#7059)
99f2666a18 is described below
commit 99f2666a18d289ac64444fc813560762a90a0037
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Tue Jul 25 19:12:13 2023 +0300
[Refactor] PipelineFixer physical optimizer rule removal (#7059)
* Initial refactor
* Minor
* Update datafusion/core/src/physical_optimizer/join_selection.rs
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
* Update according to comments
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
.../core/src/physical_optimizer/join_selection.rs | 915 ++++++++++++++++++---
datafusion/core/src/physical_optimizer/mod.rs | 1 -
.../core/src/physical_optimizer/optimizer.rs | 7 -
.../core/src/physical_optimizer/pipeline_fixer.rs | 716 ----------------
datafusion/core/tests/memory_limit.rs | 4 +-
.../tests/sqllogictests/test_files/explain.slt | 1 -
6 files changed, 802 insertions(+), 842 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs
b/datafusion/core/src/physical_optimizer/join_selection.rs
index a9dec73c36..9ecfb8993f 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -15,36 +15,37 @@
// specific language governing permissions and limitations
// under the License.
-//! Select the proper PartitionMode and build side based on the avaliable
statistics for hash join.
-use std::sync::Arc;
+//! The [`JoinSelection`] rule tries to modify a given plan so that it can
+//! accommodate infinite sources and utilize statistical information (if there
+//! is any) to obtain more performant plans. To achieve the first goal, it
+//! tries to transform a non-runnable query (with the given infinite sources)
+//! into a runnable query by replacing pipeline-breaking join operations with
+//! pipeline-friendly ones. To achieve the second goal, it selects the proper
+//! `PartitionMode` and the build side using the available statistics for hash
joins.
-use arrow::datatypes::Schema;
+use std::sync::Arc;
use crate::config::ConfigOptions;
-use crate::logical_expr::JoinType;
-use crate::physical_plan::expressions::Column;
+use crate::error::Result;
+use crate::physical_optimizer::pipeline_checker::PipelineStatePropagator;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
use crate::physical_plan::joins::{
- utils::{ColumnIndex, JoinFilter, JoinSide},
- CrossJoinExec, HashJoinExec, PartitionMode,
+ CrossJoinExec, HashJoinExec, PartitionMode, StreamJoinPartitionMode,
+ SymmetricHashJoinExec,
};
use crate::physical_plan::projection::ProjectionExec;
-use crate::physical_plan::{ExecutionPlan, PhysicalExpr};
+use crate::physical_plan::ExecutionPlan;
-use super::optimizer::PhysicalOptimizerRule;
-use crate::error::Result;
+use arrow_schema::Schema;
use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_common::{DataFusionError, JoinType};
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::PhysicalExpr;
-/// For hash join with the partition mode [PartitionMode::Auto], JoinSelection
rule will make
-/// a cost based decision to select which PartitionMode
mode(Partitioned/CollectLeft) is optimal
-/// based on the available statistics that the inputs have.
-/// If the statistics information is not available, the partition mode will
fall back to [PartitionMode::Partitioned].
-///
-/// JoinSelection rule will also reorder the build and probe phase of the hash
joins
-/// based on the avaliable statistics that the inputs have.
-/// The rule optimizes the order such that the left (build) side of the join
is the smallest.
-/// If the statistics information is not available, the order stays the same
as the original query.
-/// JoinSelection rule will also swap the left and right sides for cross join
to keep the left side
-/// is the smallest.
+/// The [`JoinSelection`] rule tries to modify a given plan so that it can
+/// accommodate infinite sources and optimize joins in the plan according to
+/// available statistical information, if there is any.
#[derive(Default)]
pub struct JoinSelection {}
@@ -55,8 +56,9 @@ impl JoinSelection {
}
}
-// TODO we need some performance test for Right Semi/Right Join swap to Left
Semi/Left Join in case that the right side is smaller but not much smaller.
-// TODO In PrestoSQL, the optimizer flips join sides only if one side is much
smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default
is is 8 times.
+// TODO: We need some performance test for Right Semi/Right Join swap to Left
Semi/Left Join in case that the right side is smaller but not much smaller.
+// TODO: In PrestoSQL, the optimizer flips join sides only if one side is much
smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default
is is 8 times.
+/// Checks statistics for join swap.
fn should_swap_join_order(left: &dyn ExecutionPlan, right: &dyn ExecutionPlan)
-> bool {
// Get the left and right table's total bytes
// If both the left and right tables contain total_byte_size statistics,
@@ -89,8 +91,9 @@ fn supports_collect_by_size(
false
}
}
+
/// Predicate that checks whether the given join type supports input swapping.
-pub fn supports_swap(join_type: JoinType) -> bool {
+fn supports_swap(join_type: JoinType) -> bool {
matches!(
join_type,
JoinType::Inner
@@ -103,9 +106,10 @@ pub fn supports_swap(join_type: JoinType) -> bool {
| JoinType::RightAnti
)
}
+
/// This function returns the new join type we get after swapping the given
/// join's inputs.
-pub fn swap_join_type(join_type: JoinType) -> JoinType {
+fn swap_join_type(join_type: JoinType) -> JoinType {
match join_type {
JoinType::Inner => JoinType::Inner,
JoinType::Full => JoinType::Full,
@@ -119,7 +123,7 @@ pub fn swap_join_type(join_type: JoinType) -> JoinType {
}
/// This function swaps the inputs of the given join operator.
-pub fn swap_hash_join(
+fn swap_hash_join(
hash_join: &HashJoinExec,
partition_mode: PartitionMode,
) -> Result<Arc<dyn ExecutionPlan>> {
@@ -160,7 +164,7 @@ pub fn swap_hash_join(
/// the output should not be impacted. This function creates the expressions
/// that will allow to swap back the values from the original left as the first
/// columns and those on the right next.
-pub fn swap_reverting_projection(
+fn swap_reverting_projection(
left_schema: &Schema,
right_schema: &Schema,
) -> Vec<(Arc<dyn PhysicalExpr>, String)> {
@@ -182,30 +186,26 @@ pub fn swap_reverting_projection(
}
/// Swaps join sides for filter column indices and produces new JoinFilter
-fn swap_join_filter(filter: Option<&JoinFilter>) -> Option<JoinFilter> {
- filter.map(|filter| {
- let column_indices = filter
- .column_indices()
- .iter()
- .map(|idx| {
- let side = if matches!(idx.side, JoinSide::Left) {
- JoinSide::Right
- } else {
- JoinSide::Left
- };
- ColumnIndex {
- index: idx.index,
- side,
- }
- })
- .collect();
+fn swap_filter(filter: &JoinFilter) -> JoinFilter {
+ let column_indices = filter
+ .column_indices()
+ .iter()
+ .map(|idx| ColumnIndex {
+ index: idx.index,
+ side: idx.side.negate(),
+ })
+ .collect();
- JoinFilter::new(
- filter.expression().clone(),
- column_indices,
- filter.schema().clone(),
- )
- })
+ JoinFilter::new(
+ filter.expression().clone(),
+ column_indices,
+ filter.schema().clone(),
+ )
+}
+
+/// Swaps join sides for filter column indices and produces new `JoinFilter`
(if exists).
+fn swap_join_filter(filter: Option<&JoinFilter>) -> Option<JoinFilter> {
+ filter.map(swap_filter)
}
impl PhysicalOptimizerRule for JoinSelection {
@@ -214,63 +214,32 @@ impl PhysicalOptimizerRule for JoinSelection {
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
+ let pipeline = PipelineStatePropagator::new(plan);
+ // First, we make pipeline-fixing modifications to joins so as to
accommodate
+ // unbounded inputs. Each pipeline-fixing subrule, which is a function
+ // of type `PipelineFixerSubrule`, takes a single
[`PipelineStatePropagator`]
+ // argument storing state variables that indicate the unboundedness
status
+ // of the current [`ExecutionPlan`] as we traverse the plan tree.
+ let subrules: Vec<Box<PipelineFixerSubrule>> = vec![
+ Box::new(hash_join_convert_symmetric_subrule),
+ Box::new(hash_join_swap_subrule),
+ ];
+ let state = pipeline.transform_up(&|p| apply_subrules(p, &subrules,
config))?;
+ // Next, we apply another subrule that tries to optimize joins using
any
+ // statistics their inputs might have.
+ // - For a hash join with partition mode [`PartitionMode::Auto`], we
will
+ // make a cost-based decision to select which `PartitionMode` mode
+ // (`Partitioned`/`CollectLeft`) is optimal. If the statistics
information
+ // is not available, we will fall back to
[`PartitionMode::Partitioned`].
+ // - We optimize/swap join sides so that the left (build) side of the
join
+ // is the small side. If the statistics information is not
available, we
+ // do not modify join sides.
+ // - We will also swap left and right sides for cross joins so that
the left
+ // side is the small side.
let config = &config.optimizer;
let collect_left_threshold =
config.hash_join_single_partition_threshold;
- plan.transform_up(&|plan| {
- let transformed = if let Some(hash_join) =
- plan.as_any().downcast_ref::<HashJoinExec>()
- {
- match hash_join.partition_mode() {
- PartitionMode::Auto => {
- try_collect_left(hash_join,
Some(collect_left_threshold))?
- .map_or_else(
- || partitioned_hash_join(hash_join).map(Some),
- |v| Ok(Some(v)),
- )?
- }
- PartitionMode::CollectLeft => try_collect_left(hash_join,
None)?
- .map_or_else(
- || partitioned_hash_join(hash_join).map(Some),
- |v| Ok(Some(v)),
- )?,
- PartitionMode::Partitioned => {
- let left = hash_join.left();
- let right = hash_join.right();
- if should_swap_join_order(&**left, &**right)
- && supports_swap(*hash_join.join_type())
- {
- swap_hash_join(hash_join,
PartitionMode::Partitioned)
- .map(Some)?
- } else {
- None
- }
- }
- }
- } else if let Some(cross_join) =
plan.as_any().downcast_ref::<CrossJoinExec>()
- {
- let left = cross_join.left();
- let right = cross_join.right();
- if should_swap_join_order(&**left, &**right) {
- let new_join =
- CrossJoinExec::new(Arc::clone(right),
Arc::clone(left));
- // TODO avoid adding ProjectionExec again and again, only
adding Final Projection
- let proj: Arc<dyn ExecutionPlan> =
Arc::new(ProjectionExec::try_new(
- swap_reverting_projection(&left.schema(),
&right.schema()),
- Arc::new(new_join),
- )?);
- Some(proj)
- } else {
- None
- }
- } else {
- None
- };
-
- Ok(if let Some(transformed) = transformed {
- Transformed::Yes(transformed)
- } else {
- Transformed::No(plan)
- })
+ state.plan.transform_up(&|plan| {
+ statistical_join_selection_subrule(plan, collect_left_threshold)
})
}
@@ -283,13 +252,17 @@ impl PhysicalOptimizerRule for JoinSelection {
}
}
-/// Try to create the PartitionMode::CollectLeft HashJoinExec when possible.
-/// The method will first consider the current join type and check whether it
is applicable to run CollectLeft mode
-/// and will try to swap the join if the orignal type is unapplicable to run
CollectLeft.
-/// When the collect_threshold is provided, the method will also check both
the left side and right side sizes
+/// Tries to create a [`HashJoinExec`] in [`PartitionMode::CollectLeft`] when
possible.
///
-/// For [JoinType::Full], it is alway unable to run CollectLeft mode and will
return None.
-/// For [JoinType::Left] and [JoinType::LeftAnti], can not run CollectLeft
mode, should swap join type to [JoinType::Right] and [JoinType::RightAnti]
+/// This function will first consider the given join type and check whether the
+/// `CollectLeft` mode is applicable. Otherwise, it will try to swap the join
sides.
+/// When the `collect_threshold` is provided, this function will also check
left
+/// and right sizes.
+///
+/// For [`JoinType::Full`], it can not use `CollectLeft` mode and will return
`None`.
+/// For [`JoinType::Left`] and [`JoinType::LeftAnti`], it can not run
`CollectLeft`
+/// mode as is, but it can do so by changing the join type to
[`JoinType::Right`]
+/// and [`JoinType::RightAnti`], respectively.
fn try_collect_left(
hash_join: &HashJoinExec,
collect_threshold: Option<usize>,
@@ -375,8 +348,238 @@ fn partitioned_hash_join(hash_join: &HashJoinExec) ->
Result<Arc<dyn ExecutionPl
}
}
+/// This subrule tries to modify a given plan so that it can
+/// optimize hash and cross joins in the plan according to available
statistical information.
+fn statistical_join_selection_subrule(
+ plan: Arc<dyn ExecutionPlan>,
+ collect_left_threshold: usize,
+) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
+ let transformed = if let Some(hash_join) =
+ plan.as_any().downcast_ref::<HashJoinExec>()
+ {
+ match hash_join.partition_mode() {
+ PartitionMode::Auto => {
+ try_collect_left(hash_join,
Some(collect_left_threshold))?.map_or_else(
+ || partitioned_hash_join(hash_join).map(Some),
+ |v| Ok(Some(v)),
+ )?
+ }
+ PartitionMode::CollectLeft => try_collect_left(hash_join, None)?
+ .map_or_else(
+ || partitioned_hash_join(hash_join).map(Some),
+ |v| Ok(Some(v)),
+ )?,
+ PartitionMode::Partitioned => {
+ let left = hash_join.left();
+ let right = hash_join.right();
+ if should_swap_join_order(&**left, &**right)
+ && supports_swap(*hash_join.join_type())
+ {
+ swap_hash_join(hash_join,
PartitionMode::Partitioned).map(Some)?
+ } else {
+ None
+ }
+ }
+ }
+ } else if let Some(cross_join) =
plan.as_any().downcast_ref::<CrossJoinExec>() {
+ let left = cross_join.left();
+ let right = cross_join.right();
+ if should_swap_join_order(&**left, &**right) {
+ let new_join = CrossJoinExec::new(Arc::clone(right),
Arc::clone(left));
+ // TODO avoid adding ProjectionExec again and again, only adding
Final Projection
+ let proj: Arc<dyn ExecutionPlan> =
Arc::new(ProjectionExec::try_new(
+ swap_reverting_projection(&left.schema(), &right.schema()),
+ Arc::new(new_join),
+ )?);
+ Some(proj)
+ } else {
+ None
+ }
+ } else {
+ None
+ };
+
+ Ok(if let Some(transformed) = transformed {
+ Transformed::Yes(transformed)
+ } else {
+ Transformed::No(plan)
+ })
+}
+
+/// Pipeline-fixing join selection subrule.
+pub type PipelineFixerSubrule = dyn Fn(
+ PipelineStatePropagator,
+ &ConfigOptions,
+) -> Option<Result<PipelineStatePropagator>>;
+
+/// This subrule checks if we can replace a hash join with a symmetric hash
+/// join when we are dealing with infinite inputs on both sides. This change
+/// avoids pipeline breaking and preserves query runnability. If possible,
+/// this subrule makes this replacement; otherwise, it has no effect.
+fn hash_join_convert_symmetric_subrule(
+ mut input: PipelineStatePropagator,
+ config_options: &ConfigOptions,
+) -> Option<Result<PipelineStatePropagator>> {
+ if let Some(hash_join) =
input.plan.as_any().downcast_ref::<HashJoinExec>() {
+ let ub_flags = &input.children_unbounded;
+ let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
+ input.unbounded = left_unbounded || right_unbounded;
+ let result = if left_unbounded && right_unbounded {
+ let mode = if config_options.optimizer.repartition_joins {
+ StreamJoinPartitionMode::Partitioned
+ } else {
+ StreamJoinPartitionMode::SinglePartition
+ };
+ SymmetricHashJoinExec::try_new(
+ hash_join.left().clone(),
+ hash_join.right().clone(),
+ hash_join.on().to_vec(),
+ hash_join.filter().cloned(),
+ hash_join.join_type(),
+ hash_join.null_equals_null(),
+ mode,
+ )
+ .map(|exec| {
+ input.plan = Arc::new(exec) as _;
+ input
+ })
+ } else {
+ Ok(input)
+ };
+ Some(result)
+ } else {
+ None
+ }
+}
+
+/// This subrule will swap build/probe sides of a hash join depending on
whether
+/// one of its inputs may produce an infinite stream of records. The rule
ensures
+/// that the left (build) side of the hash join always operates on an input
stream
+/// that will produce a finite set of records. If the left side can not be
chosen
+/// to be "finite", the join sides stay the same as the original query.
+/// ```text
+/// For example, this rule makes the following transformation:
+///
+///
+///
+/// +--------------+ +--------------+
+/// | | unbounded | |
+/// Left | Infinite | true | Hash |\true
+/// | Data source |--------------| Repartition | \
+--------------+ +--------------+
+/// | | | | \ |
| | |
+/// +--------------+ +--------------+ - | Hash Join
|-------| Projection |
+/// - |
| | |
+/// +--------------+ +--------------+ /
+--------------+ +--------------+
+/// | | unbounded | | /
+/// Right | Finite | false | Hash |/false
+/// | Data Source |--------------| Repartition |
+/// | | | |
+/// +--------------+ +--------------+
+///
+///
+///
+/// +--------------+ +--------------+
+/// | | unbounded | |
+/// Left | Finite | false | Hash |\false
+/// | Data source |--------------| Repartition | \
+--------------+ +--------------+
+/// | | | | \ |
| true | | true
+/// +--------------+ +--------------+ - | Hash Join
|-------| Projection |-----
+/// - |
| | |
+/// +--------------+ +--------------+ /
+--------------+ +--------------+
+/// | | unbounded | | /
+/// Right | Infinite | true | Hash |/true
+/// | Data Source |--------------| Repartition |
+/// | | | |
+/// +--------------+ +--------------+
+///
+/// ```
+fn hash_join_swap_subrule(
+ mut input: PipelineStatePropagator,
+ _config_options: &ConfigOptions,
+) -> Option<Result<PipelineStatePropagator>> {
+ if let Some(hash_join) =
input.plan.as_any().downcast_ref::<HashJoinExec>() {
+ let ub_flags = &input.children_unbounded;
+ let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
+ input.unbounded = left_unbounded || right_unbounded;
+ let result = if left_unbounded
+ && !right_unbounded
+ && matches!(
+ *hash_join.join_type(),
+ JoinType::Inner
+ | JoinType::Left
+ | JoinType::LeftSemi
+ | JoinType::LeftAnti
+ ) {
+ swap_join_according_to_unboundedness(hash_join).map(|plan| {
+ input.plan = plan;
+ input
+ })
+ } else {
+ Ok(input)
+ };
+ Some(result)
+ } else {
+ None
+ }
+}
+
+/// This function swaps sides of a hash join to make it runnable even if one of
+/// its inputs are infinite. Note that this is not always possible; i.e.
+/// [`JoinType::Full`], [`JoinType::Right`], [`JoinType::RightAnti`] and
+/// [`JoinType::RightSemi`] can not run with an unbounded left side, even if
+/// we swap join sides. Therefore, we do not consider them here.
+fn swap_join_according_to_unboundedness(
+ hash_join: &HashJoinExec,
+) -> Result<Arc<dyn ExecutionPlan>> {
+ let partition_mode = hash_join.partition_mode();
+ let join_type = hash_join.join_type();
+ match (*partition_mode, *join_type) {
+ (
+ _,
+ JoinType::Right | JoinType::RightSemi | JoinType::RightAnti |
JoinType::Full,
+ ) => Err(DataFusionError::Internal(format!(
+ "{join_type} join cannot be swapped for unbounded input."
+ ))),
+ (PartitionMode::Partitioned, _) => {
+ swap_hash_join(hash_join, PartitionMode::Partitioned)
+ }
+ (PartitionMode::CollectLeft, _) => {
+ swap_hash_join(hash_join, PartitionMode::CollectLeft)
+ }
+ (PartitionMode::Auto, _) => Err(DataFusionError::Internal(
+ "Auto is not acceptable for unbounded input here.".to_string(),
+ )),
+ }
+}
+
+/// Apply given `PipelineFixerSubrule`s to a given plan. This plan, along with
+/// auxiliary boundedness information, is in the `PipelineStatePropagator`
object.
+fn apply_subrules(
+ mut input: PipelineStatePropagator,
+ subrules: &Vec<Box<PipelineFixerSubrule>>,
+ config_options: &ConfigOptions,
+) -> Result<Transformed<PipelineStatePropagator>> {
+ for subrule in subrules {
+ if let Some(value) = subrule(input.clone(),
config_options).transpose()? {
+ input = value;
+ }
+ }
+ let is_unbounded = input
+ .plan
+ .unbounded_output(&input.children_unbounded)
+ // Treat the case where an operator can not run on unbounded data as
+ // if it can and it outputs unbounded data. Do not raise an error yet.
+ // Such operators may be fixed, adjusted or replaced later on during
+ // optimization passes -- sorts may be removed, windows may be adjusted
+ // etc. If this doesn't happen, the final `PipelineChecker` rule will
+ // catch this and raise an error anyway.
+ .unwrap_or(true);
+ input.unbounded = is_unbounded;
+ Ok(Transformed::Yes(input))
+}
+
#[cfg(test)]
-mod tests {
+mod tests_statistical {
use crate::{
physical_plan::{
displayable, joins::PartitionMode, ColumnStatistics, Statistics,
@@ -388,7 +591,9 @@ mod tests {
use std::sync::Arc;
use arrow::datatypes::{DataType, Field, Schema};
- use datafusion_common::ScalarValue;
+ use datafusion_common::{JoinType, ScalarValue};
+ use datafusion_physical_expr::expressions::Column;
+ use datafusion_physical_expr::PhysicalExpr;
fn create_big_and_small() -> (Arc<dyn ExecutionPlan>, Arc<dyn
ExecutionPlan>) {
let big = Arc::new(StatisticsExec::new(
@@ -556,7 +761,6 @@ mod tests {
.expect("A proj is required to swap columns back to their original
order");
assert_eq!(swapping_projection.expr().len(), 2);
- println!("swapping_projection {swapping_projection:?}");
let (col, name) = &swapping_projection.expr()[0];
assert_eq!(name, "small_col");
assert_col_expr(col, "small_col", 1);
@@ -693,7 +897,7 @@ mod tests {
" StatisticsExec: col_count=1, row_count=Some(1000)",
" StatisticsExec: col_count=1, row_count=Some(100000)",
" StatisticsExec: col_count=1, row_count=Some(10000)",
- ""
+ "",
];
assert_optimized!(expected, join);
}
@@ -967,3 +1171,484 @@ mod tests {
}
}
}
+
+#[cfg(test)]
+mod util_tests {
+ use datafusion_expr::Operator;
+ use datafusion_physical_expr::expressions::{BinaryExpr, Column,
NegativeExpr};
+ use datafusion_physical_expr::intervals::check_support;
+ use datafusion_physical_expr::PhysicalExpr;
+ use std::sync::Arc;
+
+ #[test]
+ fn check_expr_supported() {
+ let supported_expr = Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("a", 0)),
+ Operator::Plus,
+ Arc::new(Column::new("a", 0)),
+ )) as Arc<dyn PhysicalExpr>;
+ assert!(check_support(&supported_expr));
+ let supported_expr_2 = Arc::new(Column::new("a", 0)) as Arc<dyn
PhysicalExpr>;
+ assert!(check_support(&supported_expr_2));
+ let unsupported_expr = Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("a", 0)),
+ Operator::Or,
+ Arc::new(Column::new("a", 0)),
+ )) as Arc<dyn PhysicalExpr>;
+ assert!(!check_support(&unsupported_expr));
+ let unsupported_expr_2 = Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("a", 0)),
+ Operator::Or,
+ Arc::new(NegativeExpr::new(Arc::new(Column::new("a", 0)))),
+ )) as Arc<dyn PhysicalExpr>;
+ assert!(!check_support(&unsupported_expr_2));
+ }
+}
+
+#[cfg(test)]
+mod hash_join_tests {
+ use super::*;
+ use crate::physical_optimizer::join_selection::swap_join_type;
+ use crate::physical_optimizer::test_utils::SourceType;
+ use crate::physical_plan::expressions::Column;
+ use crate::physical_plan::joins::PartitionMode;
+ use crate::physical_plan::projection::ProjectionExec;
+ use crate::test_util::UnboundedExec;
+ use arrow::datatypes::{DataType, Field, Schema};
+ use arrow::record_batch::RecordBatch;
+ use datafusion_common::utils::DataPtr;
+ use datafusion_common::JoinType;
+ use std::sync::Arc;
+
+ struct TestCase {
+ case: String,
+ initial_sources_unbounded: (SourceType, SourceType),
+ initial_join_type: JoinType,
+ initial_mode: PartitionMode,
+ expected_sources_unbounded: (SourceType, SourceType),
+ expected_join_type: JoinType,
+ expected_mode: PartitionMode,
+ expecting_swap: bool,
+ }
+
+ #[tokio::test]
+ async fn test_join_with_swap_full() -> Result<()> {
+ // NOTE: Currently, some initial conditions are not viable after join
order selection.
+ // For example, full join always comes in partitioned mode. See
the warning in
+ // function "swap". If this changes in the future, we should
update these tests.
+ let cases = vec![
+ TestCase {
+ case: "Bounded - Unbounded 1".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
+ initial_join_type: JoinType::Full,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
+ expected_join_type: JoinType::Full,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ },
+ TestCase {
+ case: "Unbounded - Bounded 2".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Bounded),
+ initial_join_type: JoinType::Full,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Unbounded,
SourceType::Bounded),
+ expected_join_type: JoinType::Full,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ },
+ TestCase {
+ case: "Bounded - Bounded 3".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
+ initial_join_type: JoinType::Full,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
+ expected_join_type: JoinType::Full,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ },
+ TestCase {
+ case: "Unbounded - Unbounded 4".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Unbounded),
+ initial_join_type: JoinType::Full,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (
+ SourceType::Unbounded,
+ SourceType::Unbounded,
+ ),
+ expected_join_type: JoinType::Full,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ },
+ ];
+ for case in cases.into_iter() {
+ test_join_with_maybe_swap_unbounded_case(case).await?
+ }
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_cases_without_collect_left_check() -> Result<()> {
+ let mut cases = vec![];
+ let join_types = vec![JoinType::LeftSemi, JoinType::Inner];
+ for join_type in join_types {
+ cases.push(TestCase {
+ case: "Unbounded - Bounded / CollectLeft".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Bounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::CollectLeft,
+ expected_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
+ expected_join_type: swap_join_type(join_type),
+ expected_mode: PartitionMode::CollectLeft,
+ expecting_swap: true,
+ });
+ cases.push(TestCase {
+ case: "Bounded - Unbounded / CollectLeft".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::CollectLeft,
+ expected_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::CollectLeft,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Unbounded - Unbounded / CollectLeft".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Unbounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::CollectLeft,
+ expected_sources_unbounded: (
+ SourceType::Unbounded,
+ SourceType::Unbounded,
+ ),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::CollectLeft,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Bounded - Bounded / CollectLeft".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::CollectLeft,
+ expected_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::CollectLeft,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Unbounded - Bounded / Partitioned".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Bounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
+ expected_join_type: swap_join_type(join_type),
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: true,
+ });
+ cases.push(TestCase {
+ case: "Bounded - Unbounded / Partitioned".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Bounded - Bounded / Partitioned".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Unbounded - Unbounded / Partitioned".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Unbounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (
+ SourceType::Unbounded,
+ SourceType::Unbounded,
+ ),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ });
+ }
+
+ for case in cases.into_iter() {
+ test_join_with_maybe_swap_unbounded_case(case).await?
+ }
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_not_support_collect_left() -> Result<()> {
+ let mut cases = vec![];
+ // After [JoinSelection] optimization, these join types cannot run in
CollectLeft mode except
+ // [JoinType::LeftSemi]
+ let the_ones_not_support_collect_left = vec![JoinType::Left,
JoinType::LeftAnti];
+ for join_type in the_ones_not_support_collect_left {
+ cases.push(TestCase {
+ case: "Unbounded - Bounded".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Bounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
+ expected_join_type: swap_join_type(join_type),
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: true,
+ });
+ cases.push(TestCase {
+ case: "Bounded - Unbounded".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Bounded - Bounded".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Unbounded - Unbounded".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Unbounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (
+ SourceType::Unbounded,
+ SourceType::Unbounded,
+ ),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ });
+ }
+
+ for case in cases.into_iter() {
+ test_join_with_maybe_swap_unbounded_case(case).await?
+ }
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_not_supporting_swaps_possible_collect_left() -> Result<()> {
+ let mut cases = vec![];
+ let the_ones_not_support_collect_left =
+ vec![JoinType::Right, JoinType::RightAnti, JoinType::RightSemi];
+ for join_type in the_ones_not_support_collect_left {
+ // We expect that (SourceType::Unbounded, SourceType::Bounded)
will change, regardless of the
+ // statistics.
+ cases.push(TestCase {
+ case: "Unbounded - Bounded / CollectLeft".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Bounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::CollectLeft,
+ expected_sources_unbounded: (SourceType::Unbounded,
SourceType::Bounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::CollectLeft,
+ expecting_swap: false,
+ });
+ // We expect that (SourceType::Bounded, SourceType::Unbounded)
will stay same, regardless of the
+ // statistics.
+ cases.push(TestCase {
+ case: "Bounded - Unbounded / CollectLeft".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::CollectLeft,
+ expected_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::CollectLeft,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Unbounded - Unbounded / CollectLeft".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Unbounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::CollectLeft,
+ expected_sources_unbounded: (
+ SourceType::Unbounded,
+ SourceType::Unbounded,
+ ),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::CollectLeft,
+ expecting_swap: false,
+ });
+ //
+ cases.push(TestCase {
+ case: "Bounded - Bounded / CollectLeft".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::CollectLeft,
+ expected_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::CollectLeft,
+ expecting_swap: false,
+ });
+ // If cases are partitioned, only unbounded & bounded check will
affect the order.
+ cases.push(TestCase {
+ case: "Unbounded - Bounded / Partitioned".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Bounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Unbounded,
SourceType::Bounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Bounded - Unbounded / Partitioned".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Bounded - Bounded / Partitioned".to_string(),
+ initial_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ });
+ cases.push(TestCase {
+ case: "Unbounded - Unbounded / Partitioned".to_string(),
+ initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Unbounded),
+ initial_join_type: join_type,
+ initial_mode: PartitionMode::Partitioned,
+ expected_sources_unbounded: (
+ SourceType::Unbounded,
+ SourceType::Unbounded,
+ ),
+ expected_join_type: join_type,
+ expected_mode: PartitionMode::Partitioned,
+ expecting_swap: false,
+ });
+ }
+
+ for case in cases.into_iter() {
+ test_join_with_maybe_swap_unbounded_case(case).await?
+ }
+ Ok(())
+ }
+
+ async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) ->
Result<()> {
+ let left_unbounded = t.initial_sources_unbounded.0 ==
SourceType::Unbounded;
+ let right_unbounded = t.initial_sources_unbounded.1 ==
SourceType::Unbounded;
+ let left_exec = Arc::new(UnboundedExec::new(
+ (!left_unbounded).then_some(1),
+ RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new(
+ "a",
+ DataType::Int32,
+ false,
+ )]))),
+ 2,
+ )) as Arc<dyn ExecutionPlan>;
+ let right_exec = Arc::new(UnboundedExec::new(
+ (!right_unbounded).then_some(1),
+ RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new(
+ "b",
+ DataType::Int32,
+ false,
+ )]))),
+ 2,
+ )) as Arc<dyn ExecutionPlan>;
+
+ let join = HashJoinExec::try_new(
+ Arc::clone(&left_exec),
+ Arc::clone(&right_exec),
+ vec![(
+ Column::new_with_schema("a", &left_exec.schema())?,
+ Column::new_with_schema("b", &right_exec.schema())?,
+ )],
+ None,
+ &t.initial_join_type,
+ t.initial_mode,
+ false,
+ )?;
+
+ let initial_hash_join_state = PipelineStatePropagator {
+ plan: Arc::new(join),
+ unbounded: false,
+ children_unbounded: vec![left_unbounded, right_unbounded],
+ };
+
+ let optimized_hash_join =
+ hash_join_swap_subrule(initial_hash_join_state,
&ConfigOptions::new())
+ .unwrap()?;
+ let optimized_join_plan = optimized_hash_join.plan;
+
+ // If swap did happen
+ let projection_added =
optimized_join_plan.as_any().is::<ProjectionExec>();
+ let plan = if projection_added {
+ let proj = optimized_join_plan
+ .as_any()
+ .downcast_ref::<ProjectionExec>()
+ .expect(
+ "A proj is required to swap columns back to their original
order",
+ );
+ proj.input().clone()
+ } else {
+ optimized_join_plan
+ };
+
+ if let Some(HashJoinExec {
+ left,
+ right,
+ join_type,
+ mode,
+ ..
+ }) = plan.as_any().downcast_ref::<HashJoinExec>()
+ {
+ let left_changed = Arc::data_ptr_eq(left, &right_exec);
+ let right_changed = Arc::data_ptr_eq(right, &left_exec);
+ // If this is not equal, we have a bigger problem.
+ assert_eq!(left_changed, right_changed);
+ assert_eq!(
+ (
+ t.case.as_str(),
+ if left.unbounded_output(&[])? {
+ SourceType::Unbounded
+ } else {
+ SourceType::Bounded
+ },
+ if right.unbounded_output(&[])? {
+ SourceType::Unbounded
+ } else {
+ SourceType::Bounded
+ },
+ join_type,
+ mode,
+ left_changed && right_changed
+ ),
+ (
+ t.case.as_str(),
+ t.expected_sources_unbounded.0,
+ t.expected_sources_unbounded.1,
+ &t.expected_join_type,
+ &t.expected_mode,
+ t.expecting_swap
+ )
+ );
+ };
+ Ok(())
+ }
+}
diff --git a/datafusion/core/src/physical_optimizer/mod.rs
b/datafusion/core/src/physical_optimizer/mod.rs
index 8ee95ea663..e1a5089879 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -35,7 +35,6 @@ pub mod sort_enforcement;
mod sort_pushdown;
mod utils;
-pub mod pipeline_fixer;
#[cfg(test)]
pub mod test_utils;
diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs
b/datafusion/core/src/physical_optimizer/optimizer.rs
index d35c82abd2..3f6698c6cf 100644
--- a/datafusion/core/src/physical_optimizer/optimizer.rs
+++ b/datafusion/core/src/physical_optimizer/optimizer.rs
@@ -26,7 +26,6 @@ use
crate::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAgg
use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
use crate::physical_optimizer::join_selection::JoinSelection;
use crate::physical_optimizer::pipeline_checker::PipelineChecker;
-use crate::physical_optimizer::pipeline_fixer::PipelineFixer;
use crate::physical_optimizer::repartition::Repartition;
use crate::physical_optimizer::sort_enforcement::EnforceSorting;
use crate::{error::Result, physical_plan::ExecutionPlan};
@@ -76,12 +75,6 @@ impl PhysicalOptimizer {
// repartitioning and local sorting steps to meet distribution and
ordering requirements.
// Therefore, it should run before EnforceDistribution and
EnforceSorting.
Arc::new(JoinSelection::new()),
- // If the query is processing infinite inputs, the PipelineFixer
rule applies the
- // necessary transformations to make the query runnable (if it is
not already runnable).
- // If the query can not be made runnable, the rule emits an error
with a diagnostic message.
- // Since the transformations it applies may alter output
partitioning properties of operators
- // (e.g. by swapping hash join sides), this rule runs before
EnforceDistribution.
- Arc::new(PipelineFixer::new()),
// In order to increase the parallelism, the Repartition rule will
change the
// output partitioning of some operators in the plan tree, which
will influence
// other rules. Therefore, it should run as soon as possible. It
is optional because:
diff --git a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
deleted file mode 100644
index 7db3e99c39..0000000000
--- a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs
+++ /dev/null
@@ -1,716 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-//http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! The [PipelineFixer] rule tries to modify a given plan so that it can
-//! accommodate its infinite sources, if there are any. In other words,
-//! it tries to obtain a runnable query (with the given infinite sources)
-//! from an non-runnable query by transforming pipeline-breaking operations
-//! to pipeline-friendly ones. If this can not be done, the rule emits a
-//! diagnostic error message.
-//!
-use crate::config::ConfigOptions;
-use crate::error::Result;
-use crate::physical_optimizer::join_selection::swap_hash_join;
-use crate::physical_optimizer::pipeline_checker::PipelineStatePropagator;
-use crate::physical_optimizer::PhysicalOptimizerRule;
-use crate::physical_plan::joins::{
- HashJoinExec, PartitionMode, StreamJoinPartitionMode,
SymmetricHashJoinExec,
-};
-use crate::physical_plan::ExecutionPlan;
-use datafusion_common::tree_node::{Transformed, TreeNode};
-use datafusion_common::DataFusionError;
-use datafusion_expr::logical_plan::JoinType;
-
-use std::sync::Arc;
-
-/// The [`PipelineFixer`] rule tries to modify a given plan so that it can
-/// accommodate its infinite sources, if there are any. If this is not
-/// possible, the rule emits a diagnostic error message.
-#[derive(Default)]
-pub struct PipelineFixer {}
-
-impl PipelineFixer {
- #[allow(missing_docs)]
- pub fn new() -> Self {
- Self {}
- }
-}
-/// [`PipelineFixer`] subrules are functions of this type. Such functions take
a
-/// single [`PipelineStatePropagator`] argument, which stores state variables
-/// indicating the unboundedness status of the current [`ExecutionPlan`] as
-/// the `PipelineFixer` rule traverses the entire plan tree.
-type PipelineFixerSubrule =
- dyn Fn(PipelineStatePropagator) -> Option<Result<PipelineStatePropagator>>;
-
-impl PhysicalOptimizerRule for PipelineFixer {
- fn optimize(
- &self,
- plan: Arc<dyn ExecutionPlan>,
- _config: &ConfigOptions,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- let pipeline = PipelineStatePropagator::new(plan);
- let subrules: Vec<Box<PipelineFixerSubrule>> = vec![
- Box::new(hash_join_convert_symmetric_subrule),
- Box::new(hash_join_swap_subrule),
- ];
- let state = pipeline.transform_up(&|p| apply_subrules(p, &subrules))?;
- Ok(state.plan)
- }
-
- fn name(&self) -> &str {
- "PipelineFixer"
- }
-
- fn schema_check(&self) -> bool {
- true
- }
-}
-
-/// This subrule checks if one can replace a hash join with a symmetric hash
-/// join so that the pipeline does not break due to the join operation in
-/// question. If possible, it makes this replacement; otherwise, it has no
-/// effect.
-fn hash_join_convert_symmetric_subrule(
- mut input: PipelineStatePropagator,
-) -> Option<Result<PipelineStatePropagator>> {
- if let Some(hash_join) =
input.plan.as_any().downcast_ref::<HashJoinExec>() {
- let ub_flags = &input.children_unbounded;
- let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
- input.unbounded = left_unbounded || right_unbounded;
- let result = if left_unbounded && right_unbounded {
- SymmetricHashJoinExec::try_new(
- hash_join.left().clone(),
- hash_join.right().clone(),
- hash_join
- .on()
- .iter()
- .map(|(l, r)| (l.clone(), r.clone()))
- .collect(),
- hash_join.filter().cloned(),
- hash_join.join_type(),
- hash_join.null_equals_null(),
- StreamJoinPartitionMode::Partitioned,
- )
- .map(|exec| {
- input.plan = Arc::new(exec) as _;
- input
- })
- } else {
- Ok(input)
- };
- Some(result)
- } else {
- None
- }
-}
-
-/// This subrule will swap build/probe sides of a hash join depending on
whether its inputs
-/// may produce an infinite stream of records. The rule ensures that the left
(build) side
-/// of the hash join always operates on an input stream that will produce a
finite set of.
-/// records If the left side can not be chosen to be "finite", the order stays
the
-/// same as the original query.
-/// ```text
-/// For example, this rule makes the following transformation:
-///
-///
-///
-/// +--------------+ +--------------+
-/// | | unbounded | |
-/// Left | Infinite | true | Hash |\true
-/// | Data source |--------------| Repartition | \
+--------------+ +--------------+
-/// | | | | \ |
| | |
-/// +--------------+ +--------------+ - | Hash Join
|-------| Projection |
-/// - |
| | |
-/// +--------------+ +--------------+ /
+--------------+ +--------------+
-/// | | unbounded | | /
-/// Right | Finite | false | Hash |/false
-/// | Data Source |--------------| Repartition |
-/// | | | |
-/// +--------------+ +--------------+
-///
-///
-///
-/// +--------------+ +--------------+
-/// | | unbounded | |
-/// Left | Finite | false | Hash |\false
-/// | Data source |--------------| Repartition | \
+--------------+ +--------------+
-/// | | | | \ |
| true | | true
-/// +--------------+ +--------------+ - | Hash Join
|-------| Projection |-----
-/// - |
| | |
-/// +--------------+ +--------------+ /
+--------------+ +--------------+
-/// | | unbounded | | /
-/// Right | Infinite | true | Hash |/true
-/// | Data Source |--------------| Repartition |
-/// | | | |
-/// +--------------+ +--------------+
-///
-/// ```
-fn hash_join_swap_subrule(
- mut input: PipelineStatePropagator,
-) -> Option<Result<PipelineStatePropagator>> {
- if let Some(hash_join) =
input.plan.as_any().downcast_ref::<HashJoinExec>() {
- let ub_flags = &input.children_unbounded;
- let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
- input.unbounded = left_unbounded || right_unbounded;
- let result = if left_unbounded
- && !right_unbounded
- && matches!(
- *hash_join.join_type(),
- JoinType::Inner
- | JoinType::Left
- | JoinType::LeftSemi
- | JoinType::LeftAnti
- ) {
- swap(hash_join).map(|plan| {
- input.plan = plan;
- input
- })
- } else {
- Ok(input)
- };
- Some(result)
- } else {
- None
- }
-}
-
-/// This function swaps sides of a hash join to make it runnable even if one
of its
-/// inputs are infinite. Note that this is not always possible; i.e.
[JoinType::Full],
-/// [JoinType::Right], [JoinType::RightAnti] and [JoinType::RightSemi] can not
run with
-/// an unbounded left side, even if we swap. Therefore, we do not consider
them here.
-fn swap(hash_join: &HashJoinExec) -> Result<Arc<dyn ExecutionPlan>> {
- let partition_mode = hash_join.partition_mode();
- let join_type = hash_join.join_type();
- match (*partition_mode, *join_type) {
- (
- _,
- JoinType::Right | JoinType::RightSemi | JoinType::RightAnti |
JoinType::Full,
- ) => Err(DataFusionError::Internal(format!(
- "{join_type} join cannot be swapped for unbounded input."
- ))),
- (PartitionMode::Partitioned, _) => {
- swap_hash_join(hash_join, PartitionMode::Partitioned)
- }
- (PartitionMode::CollectLeft, _) => {
- swap_hash_join(hash_join, PartitionMode::CollectLeft)
- }
- (PartitionMode::Auto, _) => Err(DataFusionError::Internal(
- "Auto is not acceptable for unbounded input here.".to_string(),
- )),
- }
-}
-
-fn apply_subrules(
- mut input: PipelineStatePropagator,
- subrules: &Vec<Box<PipelineFixerSubrule>>,
-) -> Result<Transformed<PipelineStatePropagator>> {
- for subrule in subrules {
- if let Some(value) = subrule(input.clone()).transpose()? {
- input = value;
- }
- }
- let is_unbounded = input
- .plan
- .unbounded_output(&input.children_unbounded)
- // Treat the case where an operator can not run on unbounded data as
- // if it can and it outputs unbounded data. Do not raise an error yet.
- // Such operators may be fixed, adjusted or replaced later on during
- // optimization passes -- sorts may be removed, windows may be adjusted
- // etc. If this doesn't happen, the final `PipelineChecker` rule will
- // catch this and raise an error anyway.
- .unwrap_or(true);
- input.unbounded = is_unbounded;
- Ok(Transformed::Yes(input))
-}
-
-#[cfg(test)]
-mod util_tests {
- use datafusion_expr::Operator;
- use datafusion_physical_expr::expressions::{BinaryExpr, Column,
NegativeExpr};
- use datafusion_physical_expr::intervals::check_support;
- use datafusion_physical_expr::PhysicalExpr;
- use std::sync::Arc;
-
- #[test]
- fn check_expr_supported() {
- let supported_expr = Arc::new(BinaryExpr::new(
- Arc::new(Column::new("a", 0)),
- Operator::Plus,
- Arc::new(Column::new("a", 0)),
- )) as Arc<dyn PhysicalExpr>;
- assert!(check_support(&supported_expr));
- let supported_expr_2 = Arc::new(Column::new("a", 0)) as Arc<dyn
PhysicalExpr>;
- assert!(check_support(&supported_expr_2));
- let unsupported_expr = Arc::new(BinaryExpr::new(
- Arc::new(Column::new("a", 0)),
- Operator::Or,
- Arc::new(Column::new("a", 0)),
- )) as Arc<dyn PhysicalExpr>;
- assert!(!check_support(&unsupported_expr));
- let unsupported_expr_2 = Arc::new(BinaryExpr::new(
- Arc::new(Column::new("a", 0)),
- Operator::Or,
- Arc::new(NegativeExpr::new(Arc::new(Column::new("a", 0)))),
- )) as Arc<dyn PhysicalExpr>;
- assert!(!check_support(&unsupported_expr_2));
- }
-}
-
-#[cfg(test)]
-mod hash_join_tests {
- use super::*;
- use crate::physical_optimizer::join_selection::swap_join_type;
- use crate::physical_optimizer::test_utils::SourceType;
- use crate::physical_plan::expressions::Column;
- use crate::physical_plan::joins::PartitionMode;
- use crate::physical_plan::projection::ProjectionExec;
- use crate::test_util::UnboundedExec;
- use arrow::datatypes::{DataType, Field, Schema};
- use arrow::record_batch::RecordBatch;
- use datafusion_common::utils::DataPtr;
- use std::sync::Arc;
-
- struct TestCase {
- case: String,
- initial_sources_unbounded: (SourceType, SourceType),
- initial_join_type: JoinType,
- initial_mode: PartitionMode,
- expected_sources_unbounded: (SourceType, SourceType),
- expected_join_type: JoinType,
- expected_mode: PartitionMode,
- expecting_swap: bool,
- }
-
- #[tokio::test]
- async fn test_join_with_swap_full() -> Result<()> {
- // NOTE: Currently, some initial conditions are not viable after join
order selection.
- // For example, full join always comes in partitioned mode. See
the warning in
- // function "swap". If this changes in the future, we should
update these tests.
- let cases = vec![
- TestCase {
- case: "Bounded - Unbounded 1".to_string(),
- initial_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
- initial_join_type: JoinType::Full,
- initial_mode: PartitionMode::Partitioned,
- expected_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
- expected_join_type: JoinType::Full,
- expected_mode: PartitionMode::Partitioned,
- expecting_swap: false,
- },
- TestCase {
- case: "Unbounded - Bounded 2".to_string(),
- initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Bounded),
- initial_join_type: JoinType::Full,
- initial_mode: PartitionMode::Partitioned,
- expected_sources_unbounded: (SourceType::Unbounded,
SourceType::Bounded),
- expected_join_type: JoinType::Full,
- expected_mode: PartitionMode::Partitioned,
- expecting_swap: false,
- },
- TestCase {
- case: "Bounded - Bounded 3".to_string(),
- initial_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
- initial_join_type: JoinType::Full,
- initial_mode: PartitionMode::Partitioned,
- expected_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
- expected_join_type: JoinType::Full,
- expected_mode: PartitionMode::Partitioned,
- expecting_swap: false,
- },
- TestCase {
- case: "Unbounded - Unbounded 4".to_string(),
- initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Unbounded),
- initial_join_type: JoinType::Full,
- initial_mode: PartitionMode::Partitioned,
- expected_sources_unbounded: (
- SourceType::Unbounded,
- SourceType::Unbounded,
- ),
- expected_join_type: JoinType::Full,
- expected_mode: PartitionMode::Partitioned,
- expecting_swap: false,
- },
- ];
- for case in cases.into_iter() {
- test_join_with_maybe_swap_unbounded_case(case).await?
- }
- Ok(())
- }
-
- #[tokio::test]
- async fn test_cases_without_collect_left_check() -> Result<()> {
- let mut cases = vec![];
- let join_types = vec![JoinType::LeftSemi, JoinType::Inner];
- for join_type in join_types {
- cases.push(TestCase {
- case: "Unbounded - Bounded / CollectLeft".to_string(),
- initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Bounded),
- initial_join_type: join_type,
- initial_mode: PartitionMode::CollectLeft,
- expected_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
- expected_join_type: swap_join_type(join_type),
- expected_mode: PartitionMode::CollectLeft,
- expecting_swap: true,
- });
- cases.push(TestCase {
- case: "Bounded - Unbounded / CollectLeft".to_string(),
- initial_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
- initial_join_type: join_type,
- initial_mode: PartitionMode::CollectLeft,
- expected_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
- expected_join_type: join_type,
- expected_mode: PartitionMode::CollectLeft,
- expecting_swap: false,
- });
- cases.push(TestCase {
- case: "Unbounded - Unbounded / CollectLeft".to_string(),
- initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Unbounded),
- initial_join_type: join_type,
- initial_mode: PartitionMode::CollectLeft,
- expected_sources_unbounded: (
- SourceType::Unbounded,
- SourceType::Unbounded,
- ),
- expected_join_type: join_type,
- expected_mode: PartitionMode::CollectLeft,
- expecting_swap: false,
- });
- cases.push(TestCase {
- case: "Bounded - Bounded / CollectLeft".to_string(),
- initial_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
- initial_join_type: join_type,
- initial_mode: PartitionMode::CollectLeft,
- expected_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
- expected_join_type: join_type,
- expected_mode: PartitionMode::CollectLeft,
- expecting_swap: false,
- });
- cases.push(TestCase {
- case: "Unbounded - Bounded / Partitioned".to_string(),
- initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Bounded),
- initial_join_type: join_type,
- initial_mode: PartitionMode::Partitioned,
- expected_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
- expected_join_type: swap_join_type(join_type),
- expected_mode: PartitionMode::Partitioned,
- expecting_swap: true,
- });
- cases.push(TestCase {
- case: "Bounded - Unbounded / Partitioned".to_string(),
- initial_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
- initial_join_type: join_type,
- initial_mode: PartitionMode::Partitioned,
- expected_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
- expected_join_type: join_type,
- expected_mode: PartitionMode::Partitioned,
- expecting_swap: false,
- });
- cases.push(TestCase {
- case: "Bounded - Bounded / Partitioned".to_string(),
- initial_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
- initial_join_type: join_type,
- initial_mode: PartitionMode::Partitioned,
- expected_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
- expected_join_type: join_type,
- expected_mode: PartitionMode::Partitioned,
- expecting_swap: false,
- });
- cases.push(TestCase {
- case: "Unbounded - Unbounded / Partitioned".to_string(),
- initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Unbounded),
- initial_join_type: join_type,
- initial_mode: PartitionMode::Partitioned,
- expected_sources_unbounded: (
- SourceType::Unbounded,
- SourceType::Unbounded,
- ),
- expected_join_type: join_type,
- expected_mode: PartitionMode::Partitioned,
- expecting_swap: false,
- });
- }
-
- for case in cases.into_iter() {
- test_join_with_maybe_swap_unbounded_case(case).await?
- }
- Ok(())
- }
-
- #[tokio::test]
- async fn test_not_support_collect_left() -> Result<()> {
- let mut cases = vec![];
- // After [JoinSelection] optimization, these join types cannot run in
CollectLeft mode except
- // [JoinType::LeftSemi]
- let the_ones_not_support_collect_left = vec![JoinType::Left,
JoinType::LeftAnti];
- for join_type in the_ones_not_support_collect_left {
- cases.push(TestCase {
- case: "Unbounded - Bounded".to_string(),
- initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Bounded),
- initial_join_type: join_type,
- initial_mode: PartitionMode::Partitioned,
- expected_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
- expected_join_type: swap_join_type(join_type),
- expected_mode: PartitionMode::Partitioned,
- expecting_swap: true,
- });
- cases.push(TestCase {
- case: "Bounded - Unbounded".to_string(),
- initial_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
- initial_join_type: join_type,
- initial_mode: PartitionMode::Partitioned,
- expected_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
- expected_join_type: join_type,
- expected_mode: PartitionMode::Partitioned,
- expecting_swap: false,
- });
- cases.push(TestCase {
- case: "Bounded - Bounded".to_string(),
- initial_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
- initial_join_type: join_type,
- initial_mode: PartitionMode::Partitioned,
- expected_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
- expected_join_type: join_type,
- expected_mode: PartitionMode::Partitioned,
- expecting_swap: false,
- });
- cases.push(TestCase {
- case: "Unbounded - Unbounded".to_string(),
- initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Unbounded),
- initial_join_type: join_type,
- initial_mode: PartitionMode::Partitioned,
- expected_sources_unbounded: (
- SourceType::Unbounded,
- SourceType::Unbounded,
- ),
- expected_join_type: join_type,
- expected_mode: PartitionMode::Partitioned,
- expecting_swap: false,
- });
- }
-
- for case in cases.into_iter() {
- test_join_with_maybe_swap_unbounded_case(case).await?
- }
- Ok(())
- }
-
- #[tokio::test]
- async fn test_not_supporting_swaps_possible_collect_left() -> Result<()> {
- let mut cases = vec![];
- let the_ones_not_support_collect_left =
- vec![JoinType::Right, JoinType::RightAnti, JoinType::RightSemi];
- for join_type in the_ones_not_support_collect_left {
- // We expect that (SourceType::Unbounded, SourceType::Bounded)
will change, regardless of the
- // statistics.
- cases.push(TestCase {
- case: "Unbounded - Bounded / CollectLeft".to_string(),
- initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Bounded),
- initial_join_type: join_type,
- initial_mode: PartitionMode::CollectLeft,
- expected_sources_unbounded: (SourceType::Unbounded,
SourceType::Bounded),
- expected_join_type: join_type,
- expected_mode: PartitionMode::CollectLeft,
- expecting_swap: false,
- });
- // We expect that (SourceType::Bounded, SourceType::Unbounded)
will stay same, regardless of the
- // statistics.
- cases.push(TestCase {
- case: "Bounded - Unbounded / CollectLeft".to_string(),
- initial_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
- initial_join_type: join_type,
- initial_mode: PartitionMode::CollectLeft,
- expected_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
- expected_join_type: join_type,
- expected_mode: PartitionMode::CollectLeft,
- expecting_swap: false,
- });
- cases.push(TestCase {
- case: "Unbounded - Unbounded / CollectLeft".to_string(),
- initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Unbounded),
- initial_join_type: join_type,
- initial_mode: PartitionMode::CollectLeft,
- expected_sources_unbounded: (
- SourceType::Unbounded,
- SourceType::Unbounded,
- ),
- expected_join_type: join_type,
- expected_mode: PartitionMode::CollectLeft,
- expecting_swap: false,
- });
- //
- cases.push(TestCase {
- case: "Bounded - Bounded / CollectLeft".to_string(),
- initial_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
- initial_join_type: join_type,
- initial_mode: PartitionMode::CollectLeft,
- expected_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
- expected_join_type: join_type,
- expected_mode: PartitionMode::CollectLeft,
- expecting_swap: false,
- });
- // If cases are partitioned, only unbounded & bounded check will
affect the order.
- cases.push(TestCase {
- case: "Unbounded - Bounded / Partitioned".to_string(),
- initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Bounded),
- initial_join_type: join_type,
- initial_mode: PartitionMode::Partitioned,
- expected_sources_unbounded: (SourceType::Unbounded,
SourceType::Bounded),
- expected_join_type: join_type,
- expected_mode: PartitionMode::Partitioned,
- expecting_swap: false,
- });
- cases.push(TestCase {
- case: "Bounded - Unbounded / Partitioned".to_string(),
- initial_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
- initial_join_type: join_type,
- initial_mode: PartitionMode::Partitioned,
- expected_sources_unbounded: (SourceType::Bounded,
SourceType::Unbounded),
- expected_join_type: join_type,
- expected_mode: PartitionMode::Partitioned,
- expecting_swap: false,
- });
- cases.push(TestCase {
- case: "Bounded - Bounded / Partitioned".to_string(),
- initial_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
- initial_join_type: join_type,
- initial_mode: PartitionMode::Partitioned,
- expected_sources_unbounded: (SourceType::Bounded,
SourceType::Bounded),
- expected_join_type: join_type,
- expected_mode: PartitionMode::Partitioned,
- expecting_swap: false,
- });
- cases.push(TestCase {
- case: "Unbounded - Unbounded / Partitioned".to_string(),
- initial_sources_unbounded: (SourceType::Unbounded,
SourceType::Unbounded),
- initial_join_type: join_type,
- initial_mode: PartitionMode::Partitioned,
- expected_sources_unbounded: (
- SourceType::Unbounded,
- SourceType::Unbounded,
- ),
- expected_join_type: join_type,
- expected_mode: PartitionMode::Partitioned,
- expecting_swap: false,
- });
- }
-
- for case in cases.into_iter() {
- test_join_with_maybe_swap_unbounded_case(case).await?
- }
- Ok(())
- }
-
- async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) ->
Result<()> {
- let left_unbounded = t.initial_sources_unbounded.0 ==
SourceType::Unbounded;
- let right_unbounded = t.initial_sources_unbounded.1 ==
SourceType::Unbounded;
- let left_exec = Arc::new(UnboundedExec::new(
- (!left_unbounded).then_some(1),
- RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new(
- "a",
- DataType::Int32,
- false,
- )]))),
- 2,
- )) as Arc<dyn ExecutionPlan>;
- let right_exec = Arc::new(UnboundedExec::new(
- (!right_unbounded).then_some(1),
- RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new(
- "b",
- DataType::Int32,
- false,
- )]))),
- 2,
- )) as Arc<dyn ExecutionPlan>;
-
- let join = HashJoinExec::try_new(
- Arc::clone(&left_exec),
- Arc::clone(&right_exec),
- vec![(
- Column::new_with_schema("a", &left_exec.schema())?,
- Column::new_with_schema("b", &right_exec.schema())?,
- )],
- None,
- &t.initial_join_type,
- t.initial_mode,
- false,
- )?;
-
- let initial_hash_join_state = PipelineStatePropagator {
- plan: Arc::new(join),
- unbounded: false,
- children_unbounded: vec![left_unbounded, right_unbounded],
- };
- let optimized_hash_join =
- hash_join_swap_subrule(initial_hash_join_state).unwrap()?;
- let optimized_join_plan = optimized_hash_join.plan;
-
- // If swap did happen
- let projection_added =
optimized_join_plan.as_any().is::<ProjectionExec>();
- let plan = if projection_added {
- let proj = optimized_join_plan
- .as_any()
- .downcast_ref::<ProjectionExec>()
- .expect(
- "A proj is required to swap columns back to their original
order",
- );
- proj.input().clone()
- } else {
- optimized_join_plan
- };
-
- if let Some(HashJoinExec {
- left,
- right,
- join_type,
- mode,
- ..
- }) = plan.as_any().downcast_ref::<HashJoinExec>()
- {
- let left_changed = Arc::data_ptr_eq(left, &right_exec);
- let right_changed = Arc::data_ptr_eq(right, &left_exec);
- // If this is not equal, we have a bigger problem.
- assert_eq!(left_changed, right_changed);
- assert_eq!(
- (
- t.case.as_str(),
- if left.unbounded_output(&[])? {
- SourceType::Unbounded
- } else {
- SourceType::Bounded
- },
- if right.unbounded_output(&[])? {
- SourceType::Unbounded
- } else {
- SourceType::Bounded
- },
- join_type,
- mode,
- left_changed && right_changed
- ),
- (
- t.case.as_str(),
- t.expected_sources_unbounded.0,
- t.expected_sources_unbounded.1,
- &t.expected_join_type,
- &t.expected_mode,
- t.expecting_swap
- )
- );
- };
- Ok(())
- }
-}
diff --git a/datafusion/core/tests/memory_limit.rs
b/datafusion/core/tests/memory_limit.rs
index 36872b7361..5b18d616b3 100644
--- a/datafusion/core/tests/memory_limit.rs
+++ b/datafusion/core/tests/memory_limit.rs
@@ -28,7 +28,7 @@ use datafusion::datasource::MemTable;
use datafusion::execution::context::SessionState;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
-use datafusion::physical_optimizer::pipeline_fixer::PipelineFixer;
+use datafusion::physical_optimizer::join_selection::JoinSelection;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_common::assert_contains;
@@ -304,7 +304,7 @@ async fn run_streaming_test_with_config(
// Disable all physical optimizer rules except the PipelineFixer rule to
avoid sorts or
// repartition, as they also have memory budgets that may be hit first
let state = SessionState::with_config_rt(config, Arc::new(runtime))
- .with_physical_optimizer_rules(vec![Arc::new(PipelineFixer::new())]);
+ .with_physical_optimizer_rules(vec![Arc::new(JoinSelection::new())]);
// Create a new session context with the session state
let ctx = SessionContext::with_state(state);
diff --git a/datafusion/core/tests/sqllogictests/test_files/explain.slt
b/datafusion/core/tests/sqllogictests/test_files/explain.slt
index 56f2fdf10a..bd3513550a 100644
--- a/datafusion/core/tests/sqllogictests/test_files/explain.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/explain.slt
@@ -240,7 +240,6 @@ logical_plan TableScan: simple_explain_test projection=[a,
b, c]
initial_physical_plan CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b,
c], has_header=true
physical_plan after aggregate_statistics SAME TEXT AS ABOVE
physical_plan after join_selection SAME TEXT AS ABOVE
-physical_plan after PipelineFixer SAME TEXT AS ABOVE
physical_plan after repartition SAME TEXT AS ABOVE
physical_plan after EnforceDistribution SAME TEXT AS ABOVE
physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE