This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ff5eb8f4f2 refactor `EliminateDuplicatedExpr` optimizer pass to avoid 
clone (#10218)
ff5eb8f4f2 is described below

commit ff5eb8f4f25d337c179afcfdc8870b8397125c25
Author: Lordworms <[email protected]>
AuthorDate: Fri Apr 26 06:45:05 2024 -0500

    refactor `EliminateDuplicatedExpr` optimizer pass to avoid clone (#10218)
    
    * refactor eliminate duplicated expr to avoid clone
    
    adding dep
    
    * fix bugs
    
    * change lock
    
    * format toml
    
    * refactor
---
 datafusion-cli/Cargo.lock                          |   1 +
 datafusion/optimizer/Cargo.toml                    |   2 +-
 .../optimizer/src/eliminate_duplicated_expr.rs     | 140 ++++++++++++---------
 3 files changed, 85 insertions(+), 58 deletions(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index ba3e68e401..5263b064ff 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1323,6 +1323,7 @@ dependencies = [
  "datafusion-expr",
  "datafusion-physical-expr",
  "hashbrown 0.14.3",
+ "indexmap 2.2.6",
  "itertools",
  "log",
  "regex-syntax",
diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml
index b1a6953501..45ece35c23 100644
--- a/datafusion/optimizer/Cargo.toml
+++ b/datafusion/optimizer/Cargo.toml
@@ -47,10 +47,10 @@ datafusion-common = { workspace = true, default-features = 
true }
 datafusion-expr = { workspace = true }
 datafusion-physical-expr = { workspace = true }
 hashbrown = { version = "0.14", features = ["raw"] }
+indexmap = { workspace = true }
 itertools = { workspace = true }
 log = { workspace = true }
 regex-syntax = "0.8.0"
-
 [dev-dependencies]
 ctor = { workspace = true }
 datafusion-sql = { workspace = true }
diff --git a/datafusion/optimizer/src/eliminate_duplicated_expr.rs 
b/datafusion/optimizer/src/eliminate_duplicated_expr.rs
index ee44a328f8..3dbfc750e8 100644
--- a/datafusion/optimizer/src/eliminate_duplicated_expr.rs
+++ b/datafusion/optimizer/src/eliminate_duplicated_expr.rs
@@ -19,12 +19,12 @@
 
 use crate::optimizer::ApplyOrder;
 use crate::{OptimizerConfig, OptimizerRule};
-use datafusion_common::Result;
-use datafusion_expr::expr::Sort as ExprSort;
+use datafusion_common::tree_node::Transformed;
+use datafusion_common::{internal_err, Result};
 use datafusion_expr::logical_plan::LogicalPlan;
 use datafusion_expr::{Aggregate, Expr, Sort};
-use hashbrown::HashSet;
-
+use indexmap::IndexSet;
+use std::hash::{Hash, Hasher};
 /// Optimization rule that eliminate duplicated expr.
 #[derive(Default)]
 pub struct EliminateDuplicatedExpr;
@@ -35,78 +35,104 @@ impl EliminateDuplicatedExpr {
         Self {}
     }
 }
-
+// use this structure to avoid initial clone
+#[derive(Eq, Clone, Debug)]
+struct SortExprWrapper {
+    expr: Expr,
+}
+impl PartialEq for SortExprWrapper {
+    fn eq(&self, other: &Self) -> bool {
+        match (&self.expr, &other.expr) {
+            (Expr::Sort(own_sort), Expr::Sort(other_sort)) => {
+                own_sort.expr == other_sort.expr
+            }
+            _ => self.expr == other.expr,
+        }
+    }
+}
+impl Hash for SortExprWrapper {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        match &self.expr {
+            Expr::Sort(sort) => {
+                sort.expr.hash(state);
+            }
+            _ => {
+                self.expr.hash(state);
+            }
+        }
+    }
+}
 impl OptimizerRule for EliminateDuplicatedExpr {
     fn try_optimize(
         &self,
-        plan: &LogicalPlan,
+        _plan: &LogicalPlan,
         _config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
+        internal_err!("Should have called EliminateDuplicatedExpr::rewrite")
+    }
+
+    fn apply_order(&self) -> Option<ApplyOrder> {
+        Some(ApplyOrder::TopDown)
+    }
+
+    fn supports_rewrite(&self) -> bool {
+        true
+    }
+
+    fn rewrite(
+        &self,
+        plan: LogicalPlan,
+        _config: &dyn OptimizerConfig,
+    ) -> Result<Transformed<LogicalPlan>> {
         match plan {
             LogicalPlan::Sort(sort) => {
-                let normalized_sort_keys = sort
+                let len = sort.expr.len();
+                let unique_exprs: Vec<_> = sort
                     .expr
-                    .iter()
-                    .map(|e| match e {
-                        Expr::Sort(ExprSort { expr, .. }) => {
-                            Expr::Sort(ExprSort::new(expr.clone(), true, 
false))
-                        }
-                        _ => e.clone(),
-                    })
-                    .collect::<Vec<_>>();
+                    .into_iter()
+                    .map(|e| SortExprWrapper { expr: e })
+                    .collect::<IndexSet<_>>()
+                    .into_iter()
+                    .map(|wrapper| wrapper.expr)
+                    .collect();
 
-                // dedup sort.expr and keep order
-                let mut dedup_expr = Vec::new();
-                let mut dedup_set = HashSet::new();
-                sort.expr.iter().zip(normalized_sort_keys.iter()).for_each(
-                    |(expr, normalized_expr)| {
-                        if !dedup_set.contains(normalized_expr) {
-                            dedup_expr.push(expr);
-                            dedup_set.insert(normalized_expr);
-                        }
-                    },
-                );
-                if dedup_expr.len() == sort.expr.len() {
-                    Ok(None)
+                let transformed = if len != unique_exprs.len() {
+                    Transformed::yes
                 } else {
-                    Ok(Some(LogicalPlan::Sort(Sort {
-                        expr: 
dedup_expr.into_iter().cloned().collect::<Vec<_>>(),
-                        input: sort.input.clone(),
-                        fetch: sort.fetch,
-                    })))
-                }
+                    Transformed::no
+                };
+
+                Ok(transformed(LogicalPlan::Sort(Sort {
+                    expr: unique_exprs,
+                    input: sort.input,
+                    fetch: sort.fetch,
+                })))
             }
             LogicalPlan::Aggregate(agg) => {
-                // dedup agg.groupby and keep order
-                let mut dedup_expr = Vec::new();
-                let mut dedup_set = HashSet::new();
-                agg.group_expr.iter().for_each(|expr| {
-                    if !dedup_set.contains(expr) {
-                        dedup_expr.push(expr.clone());
-                        dedup_set.insert(expr);
-                    }
-                });
-                if dedup_expr.len() == agg.group_expr.len() {
-                    Ok(None)
+                let len = agg.group_expr.len();
+
+                let unique_exprs: Vec<Expr> = agg
+                    .group_expr
+                    .into_iter()
+                    .collect::<IndexSet<_>>()
+                    .into_iter()
+                    .collect();
+
+                let transformed = if len != unique_exprs.len() {
+                    Transformed::yes
                 } else {
-                    Ok(Some(LogicalPlan::Aggregate(Aggregate::try_new(
-                        agg.input.clone(),
-                        dedup_expr,
-                        agg.aggr_expr.clone(),
-                    )?)))
-                }
+                    Transformed::no
+                };
+
+                Aggregate::try_new(agg.input, unique_exprs, agg.aggr_expr)
+                    .map(|f| transformed(LogicalPlan::Aggregate(f)))
             }
-            _ => Ok(None),
+            _ => Ok(Transformed::no(plan)),
         }
     }
-
     fn name(&self) -> &str {
         "eliminate_duplicated_expr"
     }
-
-    fn apply_order(&self) -> Option<ApplyOrder> {
-        Some(ApplyOrder::TopDown)
-    }
 }
 
 #[cfg(test)]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to