This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ff5eb8f4f2 refactor `EliminateDuplicatedExpr` optimizer pass to avoid
clone (#10218)
ff5eb8f4f2 is described below
commit ff5eb8f4f25d337c179afcfdc8870b8397125c25
Author: Lordworms <[email protected]>
AuthorDate: Fri Apr 26 06:45:05 2024 -0500
refactor `EliminateDuplicatedExpr` optimizer pass to avoid clone (#10218)
* refactor eliminate duplicated expr to avoid clone
adding dep
* fix bugs
* change lock
* format toml
* refactor
---
datafusion-cli/Cargo.lock | 1 +
datafusion/optimizer/Cargo.toml | 2 +-
.../optimizer/src/eliminate_duplicated_expr.rs | 140 ++++++++++++---------
3 files changed, 85 insertions(+), 58 deletions(-)
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index ba3e68e401..5263b064ff 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1323,6 +1323,7 @@ dependencies = [
"datafusion-expr",
"datafusion-physical-expr",
"hashbrown 0.14.3",
+ "indexmap 2.2.6",
"itertools",
"log",
"regex-syntax",
diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml
index b1a6953501..45ece35c23 100644
--- a/datafusion/optimizer/Cargo.toml
+++ b/datafusion/optimizer/Cargo.toml
@@ -47,10 +47,10 @@ datafusion-common = { workspace = true, default-features =
true }
datafusion-expr = { workspace = true }
datafusion-physical-expr = { workspace = true }
hashbrown = { version = "0.14", features = ["raw"] }
+indexmap = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
regex-syntax = "0.8.0"
-
[dev-dependencies]
ctor = { workspace = true }
datafusion-sql = { workspace = true }
diff --git a/datafusion/optimizer/src/eliminate_duplicated_expr.rs
b/datafusion/optimizer/src/eliminate_duplicated_expr.rs
index ee44a328f8..3dbfc750e8 100644
--- a/datafusion/optimizer/src/eliminate_duplicated_expr.rs
+++ b/datafusion/optimizer/src/eliminate_duplicated_expr.rs
@@ -19,12 +19,12 @@
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
-use datafusion_common::Result;
-use datafusion_expr::expr::Sort as ExprSort;
+use datafusion_common::tree_node::Transformed;
+use datafusion_common::{internal_err, Result};
use datafusion_expr::logical_plan::LogicalPlan;
use datafusion_expr::{Aggregate, Expr, Sort};
-use hashbrown::HashSet;
-
+use indexmap::IndexSet;
+use std::hash::{Hash, Hasher};
/// Optimization rule that eliminate duplicated expr.
#[derive(Default)]
pub struct EliminateDuplicatedExpr;
@@ -35,78 +35,104 @@ impl EliminateDuplicatedExpr {
Self {}
}
}
-
+// use this structure to avoid initial clone
+#[derive(Eq, Clone, Debug)]
+struct SortExprWrapper {
+ expr: Expr,
+}
+impl PartialEq for SortExprWrapper {
+ fn eq(&self, other: &Self) -> bool {
+ match (&self.expr, &other.expr) {
+ (Expr::Sort(own_sort), Expr::Sort(other_sort)) => {
+ own_sort.expr == other_sort.expr
+ }
+ _ => self.expr == other.expr,
+ }
+ }
+}
+impl Hash for SortExprWrapper {
+ fn hash<H: Hasher>(&self, state: &mut H) {
+ match &self.expr {
+ Expr::Sort(sort) => {
+ sort.expr.hash(state);
+ }
+ _ => {
+ self.expr.hash(state);
+ }
+ }
+ }
+}
impl OptimizerRule for EliminateDuplicatedExpr {
fn try_optimize(
&self,
- plan: &LogicalPlan,
+ _plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
+ internal_err!("Should have called EliminateDuplicatedExpr::rewrite")
+ }
+
+ fn apply_order(&self) -> Option<ApplyOrder> {
+ Some(ApplyOrder::TopDown)
+ }
+
+ fn supports_rewrite(&self) -> bool {
+ true
+ }
+
+ fn rewrite(
+ &self,
+ plan: LogicalPlan,
+ _config: &dyn OptimizerConfig,
+ ) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Sort(sort) => {
- let normalized_sort_keys = sort
+ let len = sort.expr.len();
+ let unique_exprs: Vec<_> = sort
.expr
- .iter()
- .map(|e| match e {
- Expr::Sort(ExprSort { expr, .. }) => {
- Expr::Sort(ExprSort::new(expr.clone(), true,
false))
- }
- _ => e.clone(),
- })
- .collect::<Vec<_>>();
+ .into_iter()
+ .map(|e| SortExprWrapper { expr: e })
+ .collect::<IndexSet<_>>()
+ .into_iter()
+ .map(|wrapper| wrapper.expr)
+ .collect();
- // dedup sort.expr and keep order
- let mut dedup_expr = Vec::new();
- let mut dedup_set = HashSet::new();
- sort.expr.iter().zip(normalized_sort_keys.iter()).for_each(
- |(expr, normalized_expr)| {
- if !dedup_set.contains(normalized_expr) {
- dedup_expr.push(expr);
- dedup_set.insert(normalized_expr);
- }
- },
- );
- if dedup_expr.len() == sort.expr.len() {
- Ok(None)
+ let transformed = if len != unique_exprs.len() {
+ Transformed::yes
} else {
- Ok(Some(LogicalPlan::Sort(Sort {
- expr:
dedup_expr.into_iter().cloned().collect::<Vec<_>>(),
- input: sort.input.clone(),
- fetch: sort.fetch,
- })))
- }
+ Transformed::no
+ };
+
+ Ok(transformed(LogicalPlan::Sort(Sort {
+ expr: unique_exprs,
+ input: sort.input,
+ fetch: sort.fetch,
+ })))
}
LogicalPlan::Aggregate(agg) => {
- // dedup agg.groupby and keep order
- let mut dedup_expr = Vec::new();
- let mut dedup_set = HashSet::new();
- agg.group_expr.iter().for_each(|expr| {
- if !dedup_set.contains(expr) {
- dedup_expr.push(expr.clone());
- dedup_set.insert(expr);
- }
- });
- if dedup_expr.len() == agg.group_expr.len() {
- Ok(None)
+ let len = agg.group_expr.len();
+
+ let unique_exprs: Vec<Expr> = agg
+ .group_expr
+ .into_iter()
+ .collect::<IndexSet<_>>()
+ .into_iter()
+ .collect();
+
+ let transformed = if len != unique_exprs.len() {
+ Transformed::yes
} else {
- Ok(Some(LogicalPlan::Aggregate(Aggregate::try_new(
- agg.input.clone(),
- dedup_expr,
- agg.aggr_expr.clone(),
- )?)))
- }
+ Transformed::no
+ };
+
+ Aggregate::try_new(agg.input, unique_exprs, agg.aggr_expr)
+ .map(|f| transformed(LogicalPlan::Aggregate(f)))
}
- _ => Ok(None),
+ _ => Ok(Transformed::no(plan)),
}
}
-
fn name(&self) -> &str {
"eliminate_duplicated_expr"
}
-
- fn apply_order(&self) -> Option<ApplyOrder> {
- Some(ApplyOrder::TopDown)
- }
}
#[cfg(test)]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]