This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e3487eee13 Stop copying LogicalPlan and Exprs in 
`DecorrelatePredicateSubquery` (#10318)
e3487eee13 is described below

commit e3487eee1365a37b8651bf5a393ae8d2cc16e653
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed May 1 17:41:59 2024 -0400

    Stop copying LogicalPlan and Exprs in `DecorrelatePredicateSubquery` 
(#10318)
    
    * Fix build with missing `use`
    
    * Avoid clones in `DecorrelatePredicateSubquery`
---
 .../src/decorrelate_predicate_subquery.rs          | 168 ++++++++++++---------
 1 file changed, 97 insertions(+), 71 deletions(-)

diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs 
b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
index 2e72632170..b2650ac933 100644
--- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
+++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
@@ -26,17 +26,18 @@ use crate::utils::replace_qualified_name;
 use crate::{OptimizerConfig, OptimizerRule};
 
 use datafusion_common::alias::AliasGenerator;
-use datafusion_common::tree_node::{TransformedResult, TreeNode};
-use datafusion_common::{plan_err, Result};
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_common::{internal_err, plan_err, Result};
 use datafusion_expr::expr::{Exists, InSubquery};
 use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
 use datafusion_expr::logical_plan::{JoinType, Subquery};
-use datafusion_expr::utils::{conjunction, split_conjunction};
+use datafusion_expr::utils::{conjunction, split_conjunction, 
split_conjunction_owned};
 use datafusion_expr::{
     exists, in_subquery, not_exists, not_in_subquery, BinaryExpr, Expr, Filter,
     LogicalPlan, LogicalPlanBuilder, Operator,
 };
 
+use datafusion_expr::logical_plan::tree_node::unwrap_arc;
 use log::debug;
 
 /// Optimizer rule for rewriting predicate(IN/EXISTS) subquery to left 
semi/anti joins
@@ -49,6 +50,16 @@ impl DecorrelatePredicateSubquery {
         Self::default()
     }
 
+    fn rewrite_subquery(
+        &self,
+        mut subquery: Subquery,
+        config: &dyn OptimizerConfig,
+    ) -> Result<Subquery> {
+        subquery.subquery =
+            Arc::new(self.rewrite(unwrap_arc(subquery.subquery), 
config)?.data);
+        Ok(subquery)
+    }
+
     /// Finds expressions that have the predicate subqueries (and recurses 
when found)
     ///
     /// # Arguments
@@ -59,40 +70,32 @@ impl DecorrelatePredicateSubquery {
     /// Returns a tuple (subqueries, non-subquery expressions)
     fn extract_subquery_exprs(
         &self,
-        predicate: &Expr,
+        predicate: Expr,
         config: &dyn OptimizerConfig,
     ) -> Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
-        let filters = split_conjunction(predicate); // TODO: add ExistenceJoin 
to support disjunctions
+        let filters = split_conjunction_owned(predicate); // TODO: add 
ExistenceJoin to support disjunctions
 
         let mut subqueries = vec![];
         let mut others = vec![];
-        for it in filters.iter() {
+        for it in filters.into_iter() {
             match it {
                 Expr::InSubquery(InSubquery {
                     expr,
                     subquery,
                     negated,
                 }) => {
-                    let subquery_plan = self
-                        .try_optimize(&subquery.subquery, config)?
-                        .map(Arc::new)
-                        .unwrap_or_else(|| subquery.subquery.clone());
-                    let new_subquery = subquery.with_plan(subquery_plan);
+                    let new_subquery = self.rewrite_subquery(subquery, 
config)?;
                     subqueries.push(SubqueryInfo::new_with_in_expr(
                         new_subquery,
-                        (**expr).clone(),
-                        *negated,
+                        *expr,
+                        negated,
                     ));
                 }
                 Expr::Exists(Exists { subquery, negated }) => {
-                    let subquery_plan = self
-                        .try_optimize(&subquery.subquery, config)?
-                        .map(Arc::new)
-                        .unwrap_or_else(|| subquery.subquery.clone());
-                    let new_subquery = subquery.with_plan(subquery_plan);
-                    subqueries.push(SubqueryInfo::new(new_subquery, *negated));
+                    let new_subquery = self.rewrite_subquery(subquery, 
config)?;
+                    subqueries.push(SubqueryInfo::new(new_subquery, negated));
                 }
-                _ => others.push((*it).clone()),
+                expr => others.push(expr),
             }
         }
 
@@ -103,62 +106,85 @@ impl DecorrelatePredicateSubquery {
 impl OptimizerRule for DecorrelatePredicateSubquery {
     fn try_optimize(
         &self,
-        plan: &LogicalPlan,
-        config: &dyn OptimizerConfig,
+        _plan: &LogicalPlan,
+        _config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
-        match plan {
-            LogicalPlan::Filter(filter) => {
-                let (subqueries, mut other_exprs) =
-                    self.extract_subquery_exprs(&filter.predicate, config)?;
-                if subqueries.is_empty() {
-                    // regular filter, no subquery exists clause here
-                    return Ok(None);
-                }
+        internal_err!("Should have called 
DecorrelatePredicateSubquery::rewrite")
+    }
 
-                // iterate through all exists clauses in predicate, turning 
each into a join
-                let mut cur_input = filter.input.as_ref().clone();
-                for subquery in subqueries {
-                    if let Some(plan) =
-                        build_join(&subquery, &cur_input, 
config.alias_generator())?
-                    {
-                        cur_input = plan;
-                    } else {
-                        // If the subquery can not be converted to a Join, 
reconstruct the subquery expression and add it to the Filter
-                        let sub_query_expr = match subquery {
-                            SubqueryInfo {
-                                query,
-                                where_in_expr: Some(expr),
-                                negated: false,
-                            } => in_subquery(expr, query.subquery.clone()),
-                            SubqueryInfo {
-                                query,
-                                where_in_expr: Some(expr),
-                                negated: true,
-                            } => not_in_subquery(expr, query.subquery.clone()),
-                            SubqueryInfo {
-                                query,
-                                where_in_expr: None,
-                                negated: false,
-                            } => exists(query.subquery.clone()),
-                            SubqueryInfo {
-                                query,
-                                where_in_expr: None,
-                                negated: true,
-                            } => not_exists(query.subquery.clone()),
-                        };
-                        other_exprs.push(sub_query_expr);
-                    }
-                }
+    fn supports_rewrite(&self) -> bool {
+        true
+    }
 
-                let expr = conjunction(other_exprs);
-                if let Some(expr) = expr {
-                    let new_filter = Filter::try_new(expr, 
Arc::new(cur_input))?;
-                    cur_input = LogicalPlan::Filter(new_filter);
-                }
-                Ok(Some(cur_input))
+    fn rewrite(
+        &self,
+        plan: LogicalPlan,
+        config: &dyn OptimizerConfig,
+    ) -> Result<Transformed<LogicalPlan>> {
+        let LogicalPlan::Filter(filter) = plan else {
+            return Ok(Transformed::no(plan));
+        };
+
+        // if there are no subqueries in the predicate, return the original 
plan
+        let has_subqueries = split_conjunction(&filter.predicate)
+            .iter()
+            .any(|expr| matches!(expr, Expr::InSubquery(_) | Expr::Exists(_)));
+        if !has_subqueries {
+            return Ok(Transformed::no(LogicalPlan::Filter(filter)));
+        }
+
+        let Filter {
+            predicate, input, ..
+        } = filter;
+        let (subqueries, mut other_exprs) =
+            self.extract_subquery_exprs(predicate, config)?;
+        if subqueries.is_empty() {
+            return internal_err!(
+                "can not find expected subqueries in 
DecorrelatePredicateSubquery"
+            );
+        }
+
+        // iterate through all exists clauses in predicate, turning each into 
a join
+        let mut cur_input = unwrap_arc(input);
+        for subquery in subqueries {
+            if let Some(plan) =
+                build_join(&subquery, &cur_input, config.alias_generator())?
+            {
+                cur_input = plan;
+            } else {
+                // If the subquery can not be converted to a Join, reconstruct 
the subquery expression and add it to the Filter
+                let sub_query_expr = match subquery {
+                    SubqueryInfo {
+                        query,
+                        where_in_expr: Some(expr),
+                        negated: false,
+                    } => in_subquery(expr, query.subquery),
+                    SubqueryInfo {
+                        query,
+                        where_in_expr: Some(expr),
+                        negated: true,
+                    } => not_in_subquery(expr, query.subquery),
+                    SubqueryInfo {
+                        query,
+                        where_in_expr: None,
+                        negated: false,
+                    } => exists(query.subquery),
+                    SubqueryInfo {
+                        query,
+                        where_in_expr: None,
+                        negated: true,
+                    } => not_exists(query.subquery),
+                };
+                other_exprs.push(sub_query_expr);
             }
-            _ => Ok(None),
         }
+
+        let expr = conjunction(other_exprs);
+        if let Some(expr) = expr {
+            let new_filter = Filter::try_new(expr, Arc::new(cur_input))?;
+            cur_input = LogicalPlan::Filter(new_filter);
+        }
+        Ok(Transformed::yes(cur_input))
     }
 
     fn name(&self) -> &str {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to