This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7219744567 Minor: Simplify + document `EliminateCrossJoin` better
(#10427)
7219744567 is described below
commit 721974456742bf44bdae291b3114bc23fe478bcd
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu May 9 12:57:14 2024 -0400
Minor: Simplify + document `EliminateCrossJoin` better (#10427)
---
datafusion/expr/src/logical_plan/builder.rs | 4 +-
datafusion/expr/src/utils.rs | 10 ++---
datafusion/optimizer/src/eliminate_cross_join.rs | 43 +++++++++++++++-------
.../optimizer/src/extract_equijoin_predicate.rs | 14 +++----
4 files changed, 41 insertions(+), 30 deletions(-)
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index 7b1e449801..3f15b84784 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -1085,8 +1085,8 @@ impl LogicalPlanBuilder {
find_valid_equijoin_key_pair(
&normalized_left_key,
&normalized_right_key,
- self.plan.schema().clone(),
- right.schema().clone(),
+ self.plan.schema(),
+ right.schema(),
)?.ok_or_else(||
plan_datafusion_err!(
"can't create join plan, join key should belong to
one input, error key: ({normalized_left_key},{normalized_right_key})"
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 4282952a1e..0c1084674d 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -885,7 +885,7 @@ pub fn can_hash(data_type: &DataType) -> bool {
/// Check whether all columns are from the schema.
pub fn check_all_columns_from_schema(
columns: &HashSet<Column>,
- schema: DFSchemaRef,
+ schema: &DFSchema,
) -> Result<bool> {
for col in columns.iter() {
let exist = schema.is_column_from_schema(col);
@@ -909,8 +909,8 @@ pub fn check_all_columns_from_schema(
pub fn find_valid_equijoin_key_pair(
left_key: &Expr,
right_key: &Expr,
- left_schema: DFSchemaRef,
- right_schema: DFSchemaRef,
+ left_schema: &DFSchema,
+ right_schema: &DFSchema,
) -> Result<Option<(Expr, Expr)>> {
let left_using_columns = left_key.to_columns()?;
let right_using_columns = right_key.to_columns()?;
@@ -920,8 +920,8 @@ pub fn find_valid_equijoin_key_pair(
return Ok(None);
}
- if check_all_columns_from_schema(&left_using_columns, left_schema.clone())?
- && check_all_columns_from_schema(&right_using_columns,
right_schema.clone())?
+ if check_all_columns_from_schema(&left_using_columns, left_schema)?
+ && check_all_columns_from_schema(&right_using_columns, right_schema)?
{
return Ok(Some((left_key.clone(), right_key.clone())));
} else if check_all_columns_from_schema(&right_using_columns, left_schema)?
diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs
b/datafusion/optimizer/src/eliminate_cross_join.rs
index ae6c1b339d..a807ee5ff2 100644
--- a/datafusion/optimizer/src/eliminate_cross_join.rs
+++ b/datafusion/optimizer/src/eliminate_cross_join.rs
@@ -107,7 +107,7 @@ impl OptimizerRule for EliminateCrossJoin {
left = find_inner_join(
&left,
&mut all_inputs,
- &mut possible_join_keys,
+ &possible_join_keys,
&mut all_join_keys,
)?;
}
@@ -144,7 +144,9 @@ impl OptimizerRule for EliminateCrossJoin {
}
}
-/// Recursively accumulate possible_join_keys and inputs from inner joins
(including cross joins).
+/// Recursively accumulate possible_join_keys and inputs from inner joins
+/// (including cross joins).
+///
/// Returns a boolean indicating whether the flattening was successful.
fn try_flatten_join_inputs(
plan: &LogicalPlan,
@@ -159,14 +161,10 @@ fn try_flatten_join_inputs(
return Ok(false);
}
possible_join_keys.extend(join.on.clone());
- let left = &*(join.left);
- let right = &*(join.right);
- vec![left, right]
+ vec![&join.left, &join.right]
}
LogicalPlan::CrossJoin(join) => {
- let left = &*(join.left);
- let right = &*(join.right);
- vec![left, right]
+ vec![&join.left, &join.right]
}
_ => {
return plan_err!("flatten_join_inputs just can call
join/cross_join");
@@ -174,7 +172,8 @@ fn try_flatten_join_inputs(
};
for child in children.iter() {
- match *child {
+ let child = child.as_ref();
+ match child {
LogicalPlan::Join(Join {
join_type: JoinType::Inner,
..
@@ -184,27 +183,39 @@ fn try_flatten_join_inputs(
return Ok(false);
}
}
- _ => all_inputs.push((*child).clone()),
+ _ => all_inputs.push(child.clone()),
}
}
Ok(true)
}
+/// Finds the next to join with the left input plan,
+///
+/// Finds the next `right` from `rights` that can be joined with `left_input`
+/// plan based on the join keys in `possible_join_keys`.
+///
+/// If such a matching `right` is found:
+/// 1. Adds the matching join keys to `all_join_keys`.
+/// 2. Returns `left_input JOIN right ON (all join keys)`.
+///
+/// If no matching `right` is found:
+/// 1. Removes the first plan from `rights`
+/// 2. Returns `left_input CROSS JOIN right`.
fn find_inner_join(
left_input: &LogicalPlan,
rights: &mut Vec<LogicalPlan>,
- possible_join_keys: &mut Vec<(Expr, Expr)>,
+ possible_join_keys: &[(Expr, Expr)],
all_join_keys: &mut HashSet<(Expr, Expr)>,
) -> Result<LogicalPlan> {
for (i, right_input) in rights.iter().enumerate() {
let mut join_keys = vec![];
- for (l, r) in &mut *possible_join_keys {
+ for (l, r) in possible_join_keys.iter() {
let key_pair = find_valid_equijoin_key_pair(
l,
r,
- left_input.schema().clone(),
- right_input.schema().clone(),
+ left_input.schema(),
+ right_input.schema(),
)?;
// Save join keys
@@ -215,6 +226,7 @@ fn find_inner_join(
}
}
+ // Found one or more matching join keys
if !join_keys.is_empty() {
all_join_keys.extend(join_keys.clone());
let right_input = rights.remove(i);
@@ -236,6 +248,9 @@ fn find_inner_join(
}));
}
}
+
+ // no matching right plan had any join keys, cross join with the first
right
+ // plan
let right = rights.remove(0);
let join_schema = Arc::new(build_join_schema(
left_input.schema(),
diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs
b/datafusion/optimizer/src/extract_equijoin_predicate.rs
index c47a86974c..237c003524 100644
--- a/datafusion/optimizer/src/extract_equijoin_predicate.rs
+++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs
@@ -24,7 +24,6 @@ use datafusion_common::{internal_err, DFSchema};
use datafusion_expr::utils::split_conjunction_owned;
use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair};
use datafusion_expr::{BinaryExpr, Expr, ExprSchemable, Join, LogicalPlan,
Operator};
-use std::sync::Arc;
// equijoin predicate
type EquijoinPredicate = (Expr, Expr);
@@ -122,8 +121,8 @@ impl OptimizerRule for ExtractEquijoinPredicate {
fn split_eq_and_noneq_join_predicate(
filter: Expr,
- left_schema: &Arc<DFSchema>,
- right_schema: &Arc<DFSchema>,
+ left_schema: &DFSchema,
+ right_schema: &DFSchema,
) -> Result<(Vec<EquijoinPredicate>, Option<Expr>)> {
let exprs = split_conjunction_owned(filter);
@@ -136,12 +135,8 @@ fn split_eq_and_noneq_join_predicate(
op: Operator::Eq,
ref right,
}) => {
- let join_key_pair = find_valid_equijoin_key_pair(
- left,
- right,
- left_schema.clone(),
- right_schema.clone(),
- )?;
+ let join_key_pair =
+ find_valid_equijoin_key_pair(left, right, left_schema,
right_schema)?;
if let Some((left_expr, right_expr)) = join_key_pair {
let left_expr_type = left_expr.get_type(left_schema)?;
@@ -172,6 +167,7 @@ mod tests {
use datafusion_expr::{
col, lit, logical_plan::builder::LogicalPlanBuilder, JoinType,
};
+ use std::sync::Arc;
fn assert_plan_eq(plan: LogicalPlan, expected: &str) -> Result<()> {
assert_optimized_plan_eq_display_indent(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]