alamb commented on code in PR #10473: URL: https://github.com/apache/datafusion/pull/10473#discussion_r1650073589
########## datafusion/expr/src/expr.rs: ########## @@ -1461,6 +1462,176 @@ impl Expr { | Expr::Placeholder(..) => false, } } + + /// This method hashes the direct content of an expression node without recursing into + /// its children. This is useful because in `CommonSubexprEliminate` we can build up + /// the deep hash of a node and its descendants during the bottom-up phase of the + /// first traversal and so avoid computing the hash of the node and then the hash of + /// its descendants separately. + /// + /// As it is pretty easy to forget changing this method when `Expr` changes the + /// implementation doesn't use wildcard patterns (`..`, `_`) to catch changes + /// compile time. + pub fn hash_node<H: Hasher>(&self, hasher: &mut H) { + mem::discriminant(self).hash(hasher); + match self { + Expr::Alias(Alias { + expr: _expr, + relation, + name, + }) => { + relation.hash(hasher); + name.hash(hasher); + } + Expr::Column(column) => { + column.hash(hasher); + } + Expr::ScalarVariable(data_type, name) => { + data_type.hash(hasher); + name.hash(hasher); + } + Expr::Literal(scalar_value) => { + scalar_value.hash(hasher); + } + Expr::BinaryExpr(BinaryExpr { + left: _left, + op, + right: _right, + }) => { + op.hash(hasher); + } + Expr::Like(Like { + negated, + expr: _expr, + pattern: _pattern, + escape_char, + case_insensitive, + }) + | Expr::SimilarTo(Like { + negated, + expr: _expr, + pattern: _pattern, + escape_char, + case_insensitive, + }) => { + negated.hash(hasher); + escape_char.hash(hasher); + case_insensitive.hash(hasher); + } + Expr::Not(_expr) + | Expr::IsNotNull(_expr) + | Expr::IsNull(_expr) + | Expr::IsTrue(_expr) + | Expr::IsFalse(_expr) + | Expr::IsUnknown(_expr) + | Expr::IsNotTrue(_expr) + | Expr::IsNotFalse(_expr) + | Expr::IsNotUnknown(_expr) + | Expr::Negative(_expr) => {} + Expr::Between(Between { + expr: _expr, + negated, + low: _low, + high: _high, + }) => { + negated.hash(hasher); + } + Expr::Case(Case { + expr: _expr, + when_then_expr: _when_then_expr, + else_expr: _else_expr, + }) => {} + Expr::Cast(Cast { + expr: _expr, + data_type, + }) + | Expr::TryCast(TryCast { + expr: _expr, + data_type, + }) => { + data_type.hash(hasher); + } + Expr::Sort(Sort { + expr: _expr, + asc, + nulls_first, + }) => { + asc.hash(hasher); + nulls_first.hash(hasher); + } + Expr::ScalarFunction(ScalarFunction { func, args: _args }) => { + func.hash(hasher); + } + Expr::AggregateFunction(AggregateFunction { + func_def, + args: _args, + distinct, + filter: _filter, + order_by: _order_by, + null_treatment, + }) => { + func_def.hash(hasher); + distinct.hash(hasher); + null_treatment.hash(hasher); + } + Expr::WindowFunction(WindowFunction { + fun, + args: _args, + partition_by: _partition_by, + order_by: _order_by, + window_frame, + null_treatment, + }) => { + fun.hash(hasher); + window_frame.hash(hasher); + null_treatment.hash(hasher); + } + Expr::InList(InList { + expr: _expr, + list: _list, + negated, + }) => { + negated.hash(hasher); + } + Expr::Exists(Exists { subquery, negated }) => { + subquery.hash(hasher); + negated.hash(hasher); + } + Expr::InSubquery(InSubquery { + expr: _expr, + subquery, + negated, + }) => { + subquery.hash(hasher); + negated.hash(hasher); + } + Expr::ScalarSubquery(subquery) => { + subquery.hash(hasher); + } + Expr::Wildcard { qualifier } => { + qualifier.hash(hasher); + } + Expr::GroupingSet(grouping_set) => match grouping_set { + GroupingSet::Rollup(_exprs) => { Review Comment: maybe this could use `mem::discriminant(grouping_set`) as well 🤔 ########## datafusion/optimizer/src/common_subexpr_eliminate.rs: ########## @@ -351,56 +468,108 @@ impl CommonSubexprEliminate { schema: orig_schema, .. } = aggregate; - let mut expr_stats = ExprStats::new(); - // track transformed information let mut transformed = false; - // rewrite inputs - let group_arrays = to_arrays(&group_expr, &mut expr_stats, ExprMask::Normal)?; - let aggr_arrays = to_arrays(&aggr_expr, &mut expr_stats, ExprMask::Normal)?; - let name_perserver = NamePreserver::new_for_projection(); let saved_names = aggr_expr .iter() .map(|expr| name_perserver.save(expr)) .collect::<Result<Vec<_>>>()?; - // rewrite both group exprs and aggr_expr - let rewritten = self.rewrite_expr( - vec![group_expr, aggr_expr], - &[&group_arrays, &aggr_arrays], - unwrap_arc(input), - &expr_stats, - config, - )?; - transformed |= rewritten.transformed; - let (mut new_expr, new_input) = rewritten.data; - - // note the reversed pop order. - let new_aggr_expr = pop_expr(&mut new_expr)?; - let new_group_expr = pop_expr(&mut new_expr)?; + let mut expr_stats = ExprStats::new(); + // rewrite inputs + let (group_found_common, group_arrays) = + self.to_arrays(&group_expr, &mut expr_stats, ExprMask::Normal)?; + let (aggr_found_common, aggr_arrays) = + self.to_arrays(&aggr_expr, &mut expr_stats, ExprMask::Normal)?; + let (new_aggr_expr, new_group_expr, new_input) = + if group_found_common || aggr_found_common { + // rewrite both group exprs and aggr_expr + let rewritten = self.rewrite_expr( + vec![group_expr.clone(), aggr_expr.clone()], Review Comment: Given this only happens when we are actually doing a CSE rewrite (and not on all plans) I think it is fine ########## datafusion/optimizer/src/common_subexpr_eliminate.rs: ########## @@ -43,18 +45,35 @@ const CSE_PREFIX: &str = "__common_expr"; /// Identifier that represents a subexpression tree. /// -/// Note that the current implementation contains: -/// - the `Display` of an expression (a `String`) and -/// - the identifiers of the childrens of the expression -/// concatenated. -/// /// An identifier should (ideally) be able to "hash", "accumulate", "equal" and "have no /// collision (as low as possible)" -/// -/// Since an identifier is likely to be copied many times, it is better that an identifier -/// is small or "copy". otherwise some kinds of reference count is needed. String -/// description here is not such a good choose. -type Identifier = String; +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +struct Identifier<'n> { + hash: u64, Review Comment: Can we document what `hash` is here ? Specifically, I think it is *NOT* `expr.hash()` but a hash function built up incrementally during the walk ########## datafusion/expr/src/expr.rs: ########## @@ -1461,6 +1462,176 @@ impl Expr { | Expr::Placeholder(..) => false, } } + + /// This method hashes the direct content of an expression node without recursing into + /// its children. This is useful because in `CommonSubexprEliminate` we can build up + /// the deep hash of a node and its descendants during the bottom-up phase of the + /// first traversal and so avoid computing the hash of the node and then the hash of + /// its descendants separately. + /// + /// As it is pretty easy to forget changing this method when `Expr` changes the + /// implementation doesn't use wildcard patterns (`..`, `_`) to catch changes + /// compile time. Review Comment: Can we please also note if this method will return the same value as expr.hash for expressions with no children. I don't think it needs to, but it would be good to explicitly document the intent ```suggestion /// Hashes the direct content of an `Expr` without recursing into /// its children. /// /// This method is useful to incrementally compute Hashes, such as in `CommonSubexprEliminate` /// which builds a /// deep hash of a node and its descendants during the bottom-up phase of the /// first traversal and so avoid computing the hash of the node and then the hash of /// its descendants separately. /// /// As it is pretty easy to forget changing this method when `Expr` changes the /// implementation doesn't use wildcard patterns (`..`, `_`) to catch changes /// compile time. ``` ########## datafusion/optimizer/src/common_subexpr_eliminate.rs: ########## @@ -118,21 +137,86 @@ type CommonExprs = IndexMap<Identifier, (Expr, String)>; /// ProjectionExec(exprs=[extract (day from new_col), extract (year from new_col)]) <-- reuse here /// ProjectionExec(exprs=[to_date(c1) as new_col]) <-- compute to_date once /// ``` -pub struct CommonSubexprEliminate {} +pub struct CommonSubexprEliminate { + random_state: RandomState, +} impl CommonSubexprEliminate { + pub fn new() -> Self { + Self { + random_state: RandomState::new(), + } + } + + /// Returns the identifier list for each element in `exprs` and a flag to indicate if + /// rewrite phase of CSE make sense. + /// + /// Returns and array with 1 element for each input expr in `exprs` + /// + /// Each element is itself the result of [`CommonSubexprEliminate::expr_to_identifier`] for that expr + /// (e.g. the identifiers for each node in the tree) + fn to_arrays<'n>( + &self, + exprs: &'n [Expr], + expr_stats: &mut ExprStats<'n>, + expr_mask: ExprMask, + ) -> Result<(bool, Vec<IdArray<'n>>)> { + let mut found_common = false; + exprs + .iter() + .map(|e| { + let mut id_array = vec![]; + self.expr_to_identifier(e, expr_stats, &mut id_array, expr_mask) + .map(|fc| { + found_common |= fc; + + id_array + }) + }) + .collect::<Result<Vec<_>>>() + .map(|id_arrays| (found_common, id_arrays)) + } + + /// Go through an expression tree and generate identifier for every node in this tree. + fn expr_to_identifier<'n>( + &self, + expr: &'n Expr, + expr_stats: &mut ExprStats<'n>, + id_array: &mut IdArray<'n>, + expr_mask: ExprMask, + ) -> Result<bool> { + // Don't consider volatile expressions for CSE. Review Comment: 👍 ########## datafusion/optimizer/src/common_subexpr_eliminate.rs: ########## @@ -118,21 +137,86 @@ type CommonExprs = IndexMap<Identifier, (Expr, String)>; /// ProjectionExec(exprs=[extract (day from new_col), extract (year from new_col)]) <-- reuse here /// ProjectionExec(exprs=[to_date(c1) as new_col]) <-- compute to_date once /// ``` -pub struct CommonSubexprEliminate {} +pub struct CommonSubexprEliminate { + random_state: RandomState, +} impl CommonSubexprEliminate { + pub fn new() -> Self { + Self { + random_state: RandomState::new(), + } + } + + /// Returns the identifier list for each element in `exprs` and a flag to indicate if + /// rewrite phase of CSE make sense. + /// + /// Returns and array with 1 element for each input expr in `exprs` + /// + /// Each element is itself the result of [`CommonSubexprEliminate::expr_to_identifier`] for that expr + /// (e.g. the identifiers for each node in the tree) + fn to_arrays<'n>( + &self, + exprs: &'n [Expr], + expr_stats: &mut ExprStats<'n>, + expr_mask: ExprMask, + ) -> Result<(bool, Vec<IdArray<'n>>)> { + let mut found_common = false; + exprs + .iter() + .map(|e| { + let mut id_array = vec![]; + self.expr_to_identifier(e, expr_stats, &mut id_array, expr_mask) + .map(|fc| { + found_common |= fc; + + id_array + }) + }) + .collect::<Result<Vec<_>>>() + .map(|id_arrays| (found_common, id_arrays)) + } + + /// Go through an expression tree and generate identifier for every node in this tree. Review Comment: ```suggestion /// Add an identifier to `id_array` for every subexpression in this tree. ``` ########## datafusion/optimizer/src/common_subexpr_eliminate.rs: ########## @@ -524,41 +667,24 @@ impl CommonSubexprEliminate { /// ``` /// /// where, it is referred once by each `WindowAggr` (total of 2) in the plan. -struct ConsecutiveWindowExprs { Review Comment: That makes sense -- I put it all in a struct initially to try and encapsulate the logic -- I think your changes look good to me ########## datafusion/proto-common/Cargo.toml: ########## @@ -26,7 +26,7 @@ homepage = { workspace = true } repository = { workspace = true } license = { workspace = true } authors = { workspace = true } -rust-version = "1.75" +rust-version = "1.76" Review Comment: By my reading of the MSRV policy https://github.com/apache/datafusion/blob/d8bcff5db22f47e5da778b8012bca9e16df35540/README.md#L100-L103 `1.75` was released [`2023-12-28`](https://github.com/rust-lang/rust/blob/master/RELEASES.md#version-1750-2023-12-28) meaning we need to keep the MSRV at 1.75 until 2024-06-28 (6 days from now) However, since we aren't going to make a release until around July 11 https://github.com/apache/datafusion/issues/11077 this is probably ok ########## datafusion/optimizer/src/common_subexpr_eliminate.rs: ########## @@ -275,68 +367,93 @@ impl CommonSubexprEliminate { config: &dyn OptimizerConfig, ) -> Result<Transformed<LogicalPlan>> { // collect all window expressions from any number of LogicalPlanWindow - let ConsecutiveWindowExprs { - window_exprs, - arrays_per_window, - expr_stats, - plan, - } = ConsecutiveWindowExprs::try_new(window)?; + let (mut window_exprs, mut window_schemas, mut plan) = + get_consecutive_window_exprs(window); - let arrays_per_window = arrays_per_window + let mut found_common = false; + let mut expr_stats = ExprStats::new(); + let arrays_per_window = window_exprs .iter() - .map(|arrays| arrays.as_slice()) - .collect::<Vec<_>>(); + .map(|window_expr| { + self.to_arrays(window_expr, &mut expr_stats, ExprMask::Normal) + .map(|(fc, id_arrays)| { + found_common |= fc; - // save the original names - let name_preserver = NamePreserver::new(&plan); - let mut saved_names = window_exprs - .iter() - .map(|exprs| { - exprs - .iter() - .map(|expr| name_preserver.save(expr)) - .collect::<Result<Vec<_>>>() + id_arrays + }) }) .collect::<Result<Vec<_>>>()?; - assert_eq!(window_exprs.len(), arrays_per_window.len()); - let num_window_exprs = window_exprs.len(); - let rewritten_window_exprs = self.rewrite_expr( - window_exprs, - &arrays_per_window, - plan, - &expr_stats, - config, - )?; - let transformed = rewritten_window_exprs.transformed; + if found_common { + // save the original names + let name_preserver = NamePreserver::new(&plan); + let mut saved_names = window_exprs + .iter() + .map(|exprs| { + exprs + .iter() + .map(|expr| name_preserver.save(expr)) + .collect::<Result<Vec<_>>>() + }) + .collect::<Result<Vec<_>>>()?; - let (mut new_expr, new_input) = rewritten_window_exprs.data; + assert_eq!(window_exprs.len(), arrays_per_window.len()); + let num_window_exprs = window_exprs.len(); + let rewritten_window_exprs = self.rewrite_expr( + window_exprs.clone(), Review Comment: ```suggestion // Must clone as Identifiers use references to original expressions // so we have to keep the original expressions intact. window_exprs.clone(), ``` in terms of the cost of the extra clone, I think the performance results speak for themselves ########## datafusion/optimizer/src/common_subexpr_eliminate.rs: ########## @@ -351,56 +460,108 @@ impl CommonSubexprEliminate { schema: orig_schema, .. } = aggregate; - let mut expr_stats = ExprStats::new(); - // track transformed information let mut transformed = false; - // rewrite inputs - let group_arrays = to_arrays(&group_expr, &mut expr_stats, ExprMask::Normal)?; - let aggr_arrays = to_arrays(&aggr_expr, &mut expr_stats, ExprMask::Normal)?; - let name_perserver = NamePreserver::new_for_projection(); let saved_names = aggr_expr .iter() .map(|expr| name_perserver.save(expr)) .collect::<Result<Vec<_>>>()?; - // rewrite both group exprs and aggr_expr - let rewritten = self.rewrite_expr( - vec![group_expr, aggr_expr], - &[&group_arrays, &aggr_arrays], - unwrap_arc(input), - &expr_stats, - config, - )?; - transformed |= rewritten.transformed; - let (mut new_expr, new_input) = rewritten.data; - - // note the reversed pop order. - let new_aggr_expr = pop_expr(&mut new_expr)?; - let new_group_expr = pop_expr(&mut new_expr)?; + let mut expr_stats = ExprStats::new(); + // rewrite inputs + let (group_found_common, group_arrays) = + self.to_arrays(&group_expr, &mut expr_stats, ExprMask::Normal)?; + let (aggr_found_common, aggr_arrays) = + self.to_arrays(&aggr_expr, &mut expr_stats, ExprMask::Normal)?; + let (new_aggr_expr, new_group_expr, new_input) = + if group_found_common || aggr_found_common { + // rewrite both group exprs and aggr_expr + let rewritten = self.rewrite_expr( + vec![group_expr.clone(), aggr_expr.clone()], + vec![group_arrays, aggr_arrays], + unwrap_arc(input), + &expr_stats, + config, + )?; + assert!(rewritten.transformed); + transformed |= rewritten.transformed; + let (mut new_expr, new_input) = rewritten.data; + + // note the reversed pop order. + let new_aggr_expr = pop_expr(&mut new_expr)?; + let new_group_expr = pop_expr(&mut new_expr)?; + + (new_aggr_expr, new_group_expr, Arc::new(new_input)) + } else { + (aggr_expr, group_expr, input) + }; // create potential projection on top let mut expr_stats = ExprStats::new(); - let new_input_schema = Arc::clone(new_input.schema()); - let aggr_arrays = to_arrays( + let (aggr_found_common, aggr_arrays) = self.to_arrays( &new_aggr_expr, &mut expr_stats, ExprMask::NormalAndAggregates, )?; - let mut common_exprs = IndexMap::new(); - let mut rewritten_exprs = self.rewrite_exprs_list( - vec![new_aggr_expr.clone()], - &[&aggr_arrays], - &expr_stats, - &mut common_exprs, - &config.alias_generator(), - )?; - transformed |= rewritten_exprs.transformed; - let rewritten = pop_expr(&mut rewritten_exprs.data)?; + if aggr_found_common { + let mut common_exprs = CommonExprs::new(); + let mut rewritten_exprs = self.rewrite_exprs_list( + vec![new_aggr_expr.clone()], + vec![aggr_arrays], + &expr_stats, + &mut common_exprs, + &config.alias_generator(), + )?; + assert!(rewritten_exprs.transformed); + let rewritten = pop_expr(&mut rewritten_exprs.data)?; + + assert!(!common_exprs.is_empty()); + let mut agg_exprs = common_exprs + .into_values() + .map(|(expr, expr_alias)| expr.alias(expr_alias)) + .collect::<Vec<_>>(); + + let new_input_schema = Arc::clone(new_input.schema()); + let mut proj_exprs = vec![]; + for expr in &new_group_expr { + extract_expressions(expr, &new_input_schema, &mut proj_exprs)? + } + for (expr_rewritten, expr_orig) in rewritten.into_iter().zip(new_aggr_expr) { + if expr_rewritten == expr_orig { + if let Expr::Alias(Alias { expr, name, .. }) = expr_rewritten { + agg_exprs.push(expr.alias(&name)); + proj_exprs.push(Expr::Column(Column::from_name(name))); + } else { + let expr_alias = config.alias_generator().next(CSE_PREFIX); + let (qualifier, field) = + expr_rewritten.to_field(&new_input_schema)?; + let out_name = qualified_name(qualifier.as_ref(), field.name()); + + agg_exprs.push(expr_rewritten.alias(&expr_alias)); + proj_exprs.push( + Expr::Column(Column::from_name(expr_alias)).alias(out_name), + ); + } + } else { + proj_exprs.push(expr_rewritten); + } + } + + let agg = LogicalPlan::Aggregate(Aggregate::try_new( + new_input, + new_group_expr, + agg_exprs, + )?); - if common_exprs.is_empty() { + Projection::try_new(proj_exprs, Arc::new(agg)) + .map(LogicalPlan::Projection) + .map(Transformed::yes) + } else { + // TODO: How exactly can the name or the schema change in this case? Review Comment: Is this something you plan to do in this PR? Or is it for follow up work (I can file a new ticket if it is follow on work) ########## datafusion/optimizer/src/common_subexpr_eliminate.rs: ########## @@ -43,18 +45,35 @@ const CSE_PREFIX: &str = "__common_expr"; /// Identifier that represents a subexpression tree. /// -/// Note that the current implementation contains: -/// - the `Display` of an expression (a `String`) and -/// - the identifiers of the childrens of the expression -/// concatenated. -/// /// An identifier should (ideally) be able to "hash", "accumulate", "equal" and "have no Review Comment: ```suggestion /// This identifier is designed to be efficient and "hash", "accumulate", "equal" and " ``` ########## datafusion/optimizer/src/common_subexpr_eliminate.rs: ########## @@ -351,56 +468,108 @@ impl CommonSubexprEliminate { schema: orig_schema, .. } = aggregate; - let mut expr_stats = ExprStats::new(); - // track transformed information let mut transformed = false; - // rewrite inputs - let group_arrays = to_arrays(&group_expr, &mut expr_stats, ExprMask::Normal)?; - let aggr_arrays = to_arrays(&aggr_expr, &mut expr_stats, ExprMask::Normal)?; - let name_perserver = NamePreserver::new_for_projection(); let saved_names = aggr_expr .iter() .map(|expr| name_perserver.save(expr)) .collect::<Result<Vec<_>>>()?; - // rewrite both group exprs and aggr_expr - let rewritten = self.rewrite_expr( - vec![group_expr, aggr_expr], - &[&group_arrays, &aggr_arrays], - unwrap_arc(input), - &expr_stats, - config, - )?; - transformed |= rewritten.transformed; - let (mut new_expr, new_input) = rewritten.data; - - // note the reversed pop order. - let new_aggr_expr = pop_expr(&mut new_expr)?; - let new_group_expr = pop_expr(&mut new_expr)?; + let mut expr_stats = ExprStats::new(); + // rewrite inputs + let (group_found_common, group_arrays) = + self.to_arrays(&group_expr, &mut expr_stats, ExprMask::Normal)?; + let (aggr_found_common, aggr_arrays) = + self.to_arrays(&aggr_expr, &mut expr_stats, ExprMask::Normal)?; + let (new_aggr_expr, new_group_expr, new_input) = + if group_found_common || aggr_found_common { + // rewrite both group exprs and aggr_expr + let rewritten = self.rewrite_expr( + vec![group_expr.clone(), aggr_expr.clone()], + vec![group_arrays, aggr_arrays], + unwrap_arc(input), + &expr_stats, + config, + )?; + assert!(rewritten.transformed); + transformed |= rewritten.transformed; + let (mut new_expr, new_input) = rewritten.data; + + // note the reversed pop order. + let new_aggr_expr = pop_expr(&mut new_expr)?; + let new_group_expr = pop_expr(&mut new_expr)?; + + (new_aggr_expr, new_group_expr, Arc::new(new_input)) + } else { + (aggr_expr, group_expr, input) + }; // create potential projection on top let mut expr_stats = ExprStats::new(); - let new_input_schema = Arc::clone(new_input.schema()); - let aggr_arrays = to_arrays( + let (aggr_found_common, aggr_arrays) = self.to_arrays( &new_aggr_expr, &mut expr_stats, ExprMask::NormalAndAggregates, )?; - let mut common_exprs = IndexMap::new(); - let mut rewritten_exprs = self.rewrite_exprs_list( - vec![new_aggr_expr.clone()], - &[&aggr_arrays], - &expr_stats, - &mut common_exprs, - &config.alias_generator(), - )?; - transformed |= rewritten_exprs.transformed; - let rewritten = pop_expr(&mut rewritten_exprs.data)?; + if aggr_found_common { + let mut common_exprs = CommonExprs::new(); + let mut rewritten_exprs = self.rewrite_exprs_list( + vec![new_aggr_expr.clone()], Review Comment: maybe we can add a comment ########## datafusion/optimizer/src/common_subexpr_eliminate.rs: ########## @@ -507,7 +642,7 @@ impl CommonSubexprEliminate { /// ``` /// /// Returns: -/// * `window_exprs`: `[a, b, c, d]` +/// * `window_exprs`: `[[a, b, c], [d]]` Review Comment: 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org