This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d9ebd2b83b implement rewrite for ExtractEquijoinPredicate and avoid
clone in filter (#10165)
d9ebd2b83b is described below
commit d9ebd2b83b2a5cff60012a75521aadff0b7817da
Author: Lordworms <[email protected]>
AuthorDate: Mon Apr 22 18:45:35 2024 -0500
implement rewrite for ExtractEquijoinPredicate and avoid clone in filter
(#10165)
* implement rewrite for ExtractEquijoinPredicate and avoid clone in filter
* fix clippy
* optimize code
---
.../optimizer/src/extract_equijoin_predicate.rs | 113 +++++++++++----------
1 file changed, 61 insertions(+), 52 deletions(-)
diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs
b/datafusion/optimizer/src/extract_equijoin_predicate.rs
index 60b9ba3031..c47a86974c 100644
--- a/datafusion/optimizer/src/extract_equijoin_predicate.rs
+++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs
@@ -18,12 +18,13 @@
//! [`ExtractEquijoinPredicate`] identifies equality join (equijoin) predicates
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
-use datafusion_common::DFSchema;
+use datafusion_common::tree_node::Transformed;
use datafusion_common::Result;
-use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair,
split_conjunction};
+use datafusion_common::{internal_err, DFSchema};
+use datafusion_expr::utils::split_conjunction_owned;
+use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair};
use datafusion_expr::{BinaryExpr, Expr, ExprSchemable, Join, LogicalPlan,
Operator};
use std::sync::Arc;
-
// equijoin predicate
type EquijoinPredicate = (Expr, Expr);
@@ -51,15 +52,34 @@ impl ExtractEquijoinPredicate {
impl OptimizerRule for ExtractEquijoinPredicate {
fn try_optimize(
&self,
- plan: &LogicalPlan,
+ _plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
+ internal_err!("Should have called ExtractEquijoinPredicate::rewrite")
+ }
+ fn supports_rewrite(&self) -> bool {
+ true
+ }
+
+ fn name(&self) -> &str {
+ "extract_equijoin_predicate"
+ }
+
+ fn apply_order(&self) -> Option<ApplyOrder> {
+ Some(ApplyOrder::BottomUp)
+ }
+
+ fn rewrite(
+ &self,
+ plan: LogicalPlan,
+ _config: &dyn OptimizerConfig,
+ ) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Join(Join {
left,
right,
- on,
- filter,
+ mut on,
+ filter: Some(expr),
join_type,
join_constraint,
schema,
@@ -67,66 +87,55 @@ impl OptimizerRule for ExtractEquijoinPredicate {
}) => {
let left_schema = left.schema();
let right_schema = right.schema();
-
- filter.as_ref().map_or(Result::Ok(None), |expr| {
- let (equijoin_predicates, non_equijoin_expr) =
- split_eq_and_noneq_join_predicate(
- expr,
- left_schema,
- right_schema,
- )?;
-
- let optimized_plan =
(!equijoin_predicates.is_empty()).then(|| {
- let mut new_on = on.clone();
- new_on.extend(equijoin_predicates);
-
- LogicalPlan::Join(Join {
- left: left.clone(),
- right: right.clone(),
- on: new_on,
- filter: non_equijoin_expr,
- join_type: *join_type,
- join_constraint: *join_constraint,
- schema: schema.clone(),
- null_equals_null: *null_equals_null,
- })
- });
-
- Ok(optimized_plan)
- })
+ let (equijoin_predicates, non_equijoin_expr) =
+ split_eq_and_noneq_join_predicate(expr, left_schema,
right_schema)?;
+
+ if !equijoin_predicates.is_empty() {
+ on.extend(equijoin_predicates);
+ Ok(Transformed::yes(LogicalPlan::Join(Join {
+ left,
+ right,
+ on,
+ filter: non_equijoin_expr,
+ join_type,
+ join_constraint,
+ schema,
+ null_equals_null,
+ })))
+ } else {
+ Ok(Transformed::no(LogicalPlan::Join(Join {
+ left,
+ right,
+ on,
+ filter: non_equijoin_expr,
+ join_type,
+ join_constraint,
+ schema,
+ null_equals_null,
+ })))
+ }
}
- _ => Ok(None),
+ _ => Ok(Transformed::no(plan)),
}
}
-
- fn name(&self) -> &str {
- "extract_equijoin_predicate"
- }
-
- fn apply_order(&self) -> Option<ApplyOrder> {
- Some(ApplyOrder::BottomUp)
- }
}
fn split_eq_and_noneq_join_predicate(
- filter: &Expr,
+ filter: Expr,
left_schema: &Arc<DFSchema>,
right_schema: &Arc<DFSchema>,
) -> Result<(Vec<EquijoinPredicate>, Option<Expr>)> {
- let exprs = split_conjunction(filter);
+ let exprs = split_conjunction_owned(filter);
let mut accum_join_keys: Vec<(Expr, Expr)> = vec![];
let mut accum_filters: Vec<Expr> = vec![];
for expr in exprs {
match expr {
Expr::BinaryExpr(BinaryExpr {
- left,
+ ref left,
op: Operator::Eq,
- right,
+ ref right,
}) => {
- let left = left.as_ref();
- let right = right.as_ref();
-
let join_key_pair = find_valid_equijoin_key_pair(
left,
right,
@@ -141,13 +150,13 @@ fn split_eq_and_noneq_join_predicate(
if can_hash(&left_expr_type) && can_hash(&right_expr_type)
{
accum_join_keys.push((left_expr, right_expr));
} else {
- accum_filters.push(expr.clone());
+ accum_filters.push(expr);
}
} else {
- accum_filters.push(expr.clone());
+ accum_filters.push(expr);
}
}
- _ => accum_filters.push(expr.clone()),
+ _ => accum_filters.push(expr),
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]