This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3c3cb87198 Improve `TreeNode` and `LogicalPlan` APIs to accept owned
closures, deprecate `transform_down_mut()` and `transform_up_mut()` (#10126)
3c3cb87198 is described below
commit 3c3cb87198c6a0238640a99d9636f7554aa38f88
Author: Peter Toth <[email protected]>
AuthorDate: Mon Apr 22 21:19:46 2024 +0200
Improve `TreeNode` and `LogicalPlan` APIs to accept owned closures,
deprecate `transform_down_mut()` and `transform_up_mut()` (#10126)
* Deprecate `TreeNode::transform_down_mut()` and
`TreeNode::transform_up_mut()` methods
* Refactor `TreeNode` and `LogicalPlan` apply, transform, transform_up,
transform_down and transform_down_up APIs to accept owned closures
---
datafusion-examples/examples/function_factory.rs | 2 +-
datafusion-examples/examples/rewrite_expr.rs | 6 +-
datafusion/common/src/tree_node.rs | 105 ++++++++++-------
datafusion/core/src/datasource/listing/helpers.rs | 2 +-
.../src/physical_optimizer/coalesce_batches.rs | 2 +-
.../combine_partial_final_agg.rs | 4 +-
.../src/physical_optimizer/convert_first_last.rs | 2 +-
.../src/physical_optimizer/enforce_distribution.rs | 12 +-
.../core/src/physical_optimizer/enforce_sorting.rs | 18 +--
.../core/src/physical_optimizer/join_selection.rs | 8 +-
.../limited_distinct_aggregation.rs | 6 +-
.../src/physical_optimizer/output_requirements.rs | 2 +-
.../src/physical_optimizer/pipeline_checker.rs | 2 +-
.../src/physical_optimizer/projection_pushdown.rs | 8 +-
datafusion/core/src/physical_optimizer/pruning.rs | 2 +-
.../replace_with_order_preserving_variants.rs | 2 +-
.../core/src/physical_optimizer/test_utils.rs | 2 +-
.../src/physical_optimizer/topk_aggregation.rs | 6 +-
.../user_defined/user_defined_scalar_functions.rs | 2 +-
datafusion/expr/src/expr.rs | 2 +-
datafusion/expr/src/expr_rewriter/mod.rs | 14 +--
datafusion/expr/src/expr_rewriter/order_by.rs | 2 +-
datafusion/expr/src/logical_plan/plan.rs | 14 +--
datafusion/expr/src/logical_plan/tree_node.rs | 129 ++++++++++++---------
datafusion/expr/src/utils.rs | 10 +-
.../optimizer/src/analyzer/count_wildcard_rule.rs | 5 +-
.../optimizer/src/analyzer/function_rewrite.rs | 4 +-
.../optimizer/src/analyzer/inline_table_scan.rs | 4 +-
datafusion/optimizer/src/analyzer/mod.rs | 4 +-
datafusion/optimizer/src/analyzer/subquery.rs | 2 +-
datafusion/optimizer/src/decorrelate.rs | 6 +-
datafusion/optimizer/src/optimize_projections.rs | 2 +-
datafusion/optimizer/src/plan_signature.rs | 2 +-
datafusion/optimizer/src/push_down_filter.rs | 6 +-
.../optimizer/src/scalar_subquery_to_join.rs | 4 +-
datafusion/optimizer/src/utils.rs | 2 +-
datafusion/physical-expr/src/equivalence/class.rs | 4 +-
datafusion/physical-expr/src/equivalence/mod.rs | 2 +-
.../physical-expr/src/equivalence/projection.rs | 2 +-
.../physical-expr/src/equivalence/properties.rs | 2 +-
datafusion/physical-expr/src/expressions/case.rs | 4 +-
datafusion/physical-expr/src/utils/mod.rs | 8 +-
.../physical-plan/src/joins/stream_join_utils.rs | 2 +-
datafusion/physical-plan/src/joins/utils.rs | 2 +-
datafusion/physical-plan/src/recursive_query.rs | 4 +-
datafusion/sql/src/cte.rs | 2 +-
datafusion/sql/src/select.rs | 2 +-
datafusion/sql/src/unparser/utils.rs | 2 +-
datafusion/sql/src/utils.rs | 6 +-
49 files changed, 238 insertions(+), 209 deletions(-)
diff --git a/datafusion-examples/examples/function_factory.rs
b/datafusion-examples/examples/function_factory.rs
index a7c8558c6d..3973e50474 100644
--- a/datafusion-examples/examples/function_factory.rs
+++ b/datafusion-examples/examples/function_factory.rs
@@ -164,7 +164,7 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
impl ScalarFunctionWrapper {
// replaces placeholders such as $1 with actual arguments (args[0]
fn replacement(expr: &Expr, args: &[Expr]) -> Result<Expr> {
- let result = expr.clone().transform(&|e| {
+ let result = expr.clone().transform(|e| {
let r = match e {
Expr::Placeholder(placeholder) => {
let placeholder_position =
diff --git a/datafusion-examples/examples/rewrite_expr.rs
b/datafusion-examples/examples/rewrite_expr.rs
index dcebbb55fb..9b94a71a50 100644
--- a/datafusion-examples/examples/rewrite_expr.rs
+++ b/datafusion-examples/examples/rewrite_expr.rs
@@ -91,7 +91,7 @@ impl AnalyzerRule for MyAnalyzerRule {
impl MyAnalyzerRule {
fn analyze_plan(plan: LogicalPlan) -> Result<LogicalPlan> {
- plan.transform(&|plan| {
+ plan.transform(|plan| {
Ok(match plan {
LogicalPlan::Filter(filter) => {
let predicate =
Self::analyze_expr(filter.predicate.clone())?;
@@ -107,7 +107,7 @@ impl MyAnalyzerRule {
}
fn analyze_expr(expr: Expr) -> Result<Expr> {
- expr.transform(&|expr| {
+ expr.transform(|expr| {
// closure is invoked for all sub expressions
Ok(match expr {
Expr::Literal(ScalarValue::Int64(i)) => {
@@ -163,7 +163,7 @@ impl OptimizerRule for MyOptimizerRule {
/// use rewrite_expr to modify the expression tree.
fn my_rewrite(expr: Expr) -> Result<Expr> {
- expr.transform(&|expr| {
+ expr.transform(|expr| {
// closure is invoked for all sub expressions
Ok(match expr {
Expr::Between(Between {
diff --git a/datafusion/common/src/tree_node.rs
b/datafusion/common/src/tree_node.rs
index dff22d4959..f41d264d35 100644
--- a/datafusion/common/src/tree_node.rs
+++ b/datafusion/common/src/tree_node.rs
@@ -31,18 +31,6 @@ macro_rules! handle_transform_recursion {
}};
}
-macro_rules! handle_transform_recursion_down {
- ($F_DOWN:expr, $F_CHILD:expr) => {{
- $F_DOWN?.transform_children(|n| n.map_children($F_CHILD))
- }};
-}
-
-macro_rules! handle_transform_recursion_up {
- ($SELF:expr, $F_CHILD:expr, $F_UP:expr) => {{
- $SELF.map_children($F_CHILD)?.transform_parent(|n| $F_UP(n))
- }};
-}
-
/// Defines a visitable and rewriteable tree node. This trait is implemented
/// for plans ([`ExecutionPlan`] and [`LogicalPlan`]) as well as expression
/// trees ([`PhysicalExpr`], [`Expr`]) in DataFusion.
@@ -137,17 +125,24 @@ pub trait TreeNode: Sized {
/// or run a check on the tree.
fn apply<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
&self,
- f: &mut F,
+ mut f: F,
) -> Result<TreeNodeRecursion> {
- f(self)?.visit_children(|| self.apply_children(|c| c.apply(f)))
+ fn apply_impl<N: TreeNode, F: FnMut(&N) -> Result<TreeNodeRecursion>>(
+ node: &N,
+ f: &mut F,
+ ) -> Result<TreeNodeRecursion> {
+ f(node)?.visit_children(|| node.apply_children(|c| apply_impl(c,
f)))
+ }
+
+ apply_impl(self, &mut f)
}
/// Convenience utility for writing optimizer rules: Recursively apply the
/// given function `f` to the tree in a bottom-up (post-order) fashion.
When
/// `f` does not apply to a given node, it is left unchanged.
- fn transform<F: Fn(Self) -> Result<Transformed<Self>>>(
+ fn transform<F: FnMut(Self) -> Result<Transformed<Self>>>(
self,
- f: &F,
+ f: F,
) -> Result<Transformed<Self>> {
self.transform_up(f)
}
@@ -155,43 +150,60 @@ pub trait TreeNode: Sized {
/// Convenience utility for writing optimizer rules: Recursively apply the
/// given function `f` to a node and then to its children (pre-order
traversal).
/// When `f` does not apply to a given node, it is left unchanged.
- fn transform_down<F: Fn(Self) -> Result<Transformed<Self>>>(
+ fn transform_down<F: FnMut(Self) -> Result<Transformed<Self>>>(
self,
- f: &F,
+ mut f: F,
) -> Result<Transformed<Self>> {
- handle_transform_recursion_down!(f(self), |c| c.transform_down(f))
+ fn transform_down_impl<N: TreeNode, F: FnMut(N) ->
Result<Transformed<N>>>(
+ node: N,
+ f: &mut F,
+ ) -> Result<Transformed<N>> {
+ f(node)?.transform_children(|n| n.map_children(|c|
transform_down_impl(c, f)))
+ }
+
+ transform_down_impl(self, &mut f)
}
/// Convenience utility for writing optimizer rules: Recursively apply the
/// given mutable function `f` to a node and then to its children
(pre-order
/// traversal). When `f` does not apply to a given node, it is left
unchanged.
+ #[deprecated(since = "38.0.0", note = "Use `transform_down` instead")]
fn transform_down_mut<F: FnMut(Self) -> Result<Transformed<Self>>>(
self,
f: &mut F,
) -> Result<Transformed<Self>> {
- handle_transform_recursion_down!(f(self), |c| c.transform_down_mut(f))
+ self.transform_down(f)
}
/// Convenience utility for writing optimizer rules: Recursively apply the
/// given function `f` to all children of a node, and then to the node
itself
/// (post-order traversal). When `f` does not apply to a given node, it is
/// left unchanged.
- fn transform_up<F: Fn(Self) -> Result<Transformed<Self>>>(
+ fn transform_up<F: FnMut(Self) -> Result<Transformed<Self>>>(
self,
- f: &F,
+ mut f: F,
) -> Result<Transformed<Self>> {
- handle_transform_recursion_up!(self, |c| c.transform_up(f), f)
+ fn transform_up_impl<N: TreeNode, F: FnMut(N) ->
Result<Transformed<N>>>(
+ node: N,
+ f: &mut F,
+ ) -> Result<Transformed<N>> {
+ node.map_children(|c| transform_up_impl(c, f))?
+ .transform_parent(f)
+ }
+
+ transform_up_impl(self, &mut f)
}
/// Convenience utility for writing optimizer rules: Recursively apply the
/// given mutable function `f` to all children of a node, and then to the
/// node itself (post-order traversal). When `f` does not apply to a given
/// node, it is left unchanged.
+ #[deprecated(since = "38.0.0", note = "Use `transform_up` instead")]
fn transform_up_mut<F: FnMut(Self) -> Result<Transformed<Self>>>(
self,
f: &mut F,
) -> Result<Transformed<Self>> {
- handle_transform_recursion_up!(self, |c| c.transform_up_mut(f), f)
+ self.transform_up(f)
}
/// Transforms the tree using `f_down` while traversing the tree top-down
@@ -200,8 +212,8 @@ pub trait TreeNode: Sized {
///
/// Use this method if you want to start the `f_up` process right where
`f_down` jumps.
/// This can make the whole process faster by reducing the number of
`f_up` steps.
- /// If you don't need this, it's just like using `transform_down_mut`
followed by
- /// `transform_up_mut` on the same tree.
+ /// If you don't need this, it's just like using `transform_down` followed
by
+ /// `transform_up` on the same tree.
///
/// Consider the following tree structure:
/// ```text
@@ -288,14 +300,26 @@ pub trait TreeNode: Sized {
FU: FnMut(Self) -> Result<Transformed<Self>>,
>(
self,
- f_down: &mut FD,
- f_up: &mut FU,
+ mut f_down: FD,
+ mut f_up: FU,
) -> Result<Transformed<Self>> {
- handle_transform_recursion!(
- f_down(self),
- |c| c.transform_down_up(f_down, f_up),
- f_up
- )
+ fn transform_down_up_impl<
+ N: TreeNode,
+ FD: FnMut(N) -> Result<Transformed<N>>,
+ FU: FnMut(N) -> Result<Transformed<N>>,
+ >(
+ node: N,
+ f_down: &mut FD,
+ f_up: &mut FU,
+ ) -> Result<Transformed<N>> {
+ handle_transform_recursion!(
+ f_down(node),
+ |c| transform_down_up_impl(c, f_down, f_up),
+ f_up
+ )
+ }
+
+ transform_down_up_impl(self, &mut f_down, &mut f_up)
}
/// Returns true if `f` returns true for node in the tree.
@@ -303,7 +327,7 @@ pub trait TreeNode: Sized {
/// Stops recursion as soon as a matching node is found
fn exists<F: FnMut(&Self) -> bool>(&self, mut f: F) -> bool {
let mut found = false;
- self.apply(&mut |n| {
+ self.apply(|n| {
Ok(if f(n) {
found = true;
TreeNodeRecursion::Stop
@@ -439,9 +463,7 @@ impl TreeNodeRecursion {
/// This struct is used by tree transformation APIs such as
/// - [`TreeNode::rewrite`],
/// - [`TreeNode::transform_down`],
-/// - [`TreeNode::transform_down_mut`],
/// - [`TreeNode::transform_up`],
-/// - [`TreeNode::transform_up_mut`],
/// - [`TreeNode::transform_down_up`]
///
/// to control the transformation and return the transformed result.
@@ -1362,7 +1384,7 @@ mod tests {
fn $NAME() -> Result<()> {
let tree = test_tree();
let mut visits = vec![];
- tree.apply(&mut |node| {
+ tree.apply(|node| {
visits.push(format!("f_down({})", node.data));
$F(node)
})?;
@@ -1451,10 +1473,7 @@ mod tests {
#[test]
fn $NAME() -> Result<()> {
let tree = test_tree();
- assert_eq!(
- tree.transform_down_up(&mut $F_DOWN, &mut $F_UP,)?,
- $EXPECTED_TREE
- );
+ assert_eq!(tree.transform_down_up($F_DOWN, $F_UP,)?,
$EXPECTED_TREE);
Ok(())
}
@@ -1466,7 +1485,7 @@ mod tests {
#[test]
fn $NAME() -> Result<()> {
let tree = test_tree();
- assert_eq!(tree.transform_down_mut(&mut $F)?, $EXPECTED_TREE);
+ assert_eq!(tree.transform_down($F)?, $EXPECTED_TREE);
Ok(())
}
@@ -1478,7 +1497,7 @@ mod tests {
#[test]
fn $NAME() -> Result<()> {
let tree = test_tree();
- assert_eq!(tree.transform_up_mut(&mut $F)?, $EXPECTED_TREE);
+ assert_eq!(tree.transform_up($F)?, $EXPECTED_TREE);
Ok(())
}
diff --git a/datafusion/core/src/datasource/listing/helpers.rs
b/datafusion/core/src/datasource/listing/helpers.rs
index b415ce9d91..637a0daf13 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -50,7 +50,7 @@ use object_store::{ObjectMeta, ObjectStore};
/// was performed
pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
let mut is_applicable = true;
- expr.apply(&mut |expr| {
+ expr.apply(|expr| {
match expr {
Expr::Column(Column { ref name, .. }) => {
is_applicable &= col_names.contains(name);
diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index 1e234eaae1..42b7463600 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -54,7 +54,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
}
let target_batch_size = config.execution.batch_size;
- plan.transform_up(&|plan| {
+ plan.transform_up(|plan| {
let plan_any = plan.as_any();
// The goal here is to detect operators that could produce small
batches and only
// wrap those ones with a CoalesceBatchesExec operator. An
alternate approach here
diff --git
a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index 1cba8f0258..3d8f89d569 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -51,7 +51,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
- plan.transform_down(&|plan| {
+ plan.transform_down(|plan| {
let transformed =
plan.as_any()
.downcast_ref::<AggregateExec>()
@@ -179,7 +179,7 @@ fn normalize_group_exprs(group_exprs: GroupExprsRef) ->
GroupExprs {
fn discard_column_index(group_expr: Arc<dyn PhysicalExpr>) -> Arc<dyn
PhysicalExpr> {
group_expr
.clone()
- .transform(&|expr| {
+ .transform(|expr| {
let normalized_form: Option<Arc<dyn PhysicalExpr>> =
match expr.as_any().downcast_ref::<Column>() {
Some(column) => Some(Arc::new(Column::new(column.name(),
0))),
diff --git a/datafusion/core/src/physical_optimizer/convert_first_last.rs
b/datafusion/core/src/physical_optimizer/convert_first_last.rs
index 4102313d31..14860eecf1 100644
--- a/datafusion/core/src/physical_optimizer/convert_first_last.rs
+++ b/datafusion/core/src/physical_optimizer/convert_first_last.rs
@@ -60,7 +60,7 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder {
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
- plan.transform_up(&get_common_requirement_of_aggregate_input)
+ plan.transform_up(get_common_requirement_of_aggregate_input)
.data()
}
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 3cddf73c8e..eacc842c34 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -197,12 +197,12 @@ impl PhysicalOptimizerRule for EnforceDistribution {
// Run a top-down process to adjust input key ordering recursively
let plan_requirements = PlanWithKeyRequirements::new_default(plan);
let adjusted = plan_requirements
- .transform_down(&adjust_input_keys_ordering)
+ .transform_down(adjust_input_keys_ordering)
.data()?;
adjusted.plan
} else {
// Run a bottom-up process
- plan.transform_up(&|plan| {
+ plan.transform_up(|plan| {
Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
})
.data()?
@@ -211,7 +211,7 @@ impl PhysicalOptimizerRule for EnforceDistribution {
let distribution_context = DistributionContext::new_default(adjusted);
// Distribution enforcement needs to be applied bottom-up.
let distribution_context = distribution_context
- .transform_up(&|distribution_context| {
+ .transform_up(|distribution_context| {
ensure_distribution(distribution_context, config)
})
.data()?;
@@ -1768,14 +1768,14 @@ pub(crate) mod tests {
let plan_requirements =
PlanWithKeyRequirements::new_default($PLAN.clone());
let adjusted = plan_requirements
- .transform_down(&adjust_input_keys_ordering)
+ .transform_down(adjust_input_keys_ordering)
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.
adjusted.plan
} else {
// Run reorder_join_keys_to_inputs rule
- $PLAN.clone().transform_up(&|plan| {
+ $PLAN.clone().transform_up(|plan| {
Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
})
.data()?
@@ -1783,7 +1783,7 @@ pub(crate) mod tests {
// Then run ensure_distribution rule
DistributionContext::new_default(adjusted)
- .transform_up(&|distribution_context| {
+ .transform_up(|distribution_context| {
ensure_distribution(distribution_context, &config)
})
.data()
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index 5bf21c3dfa..2dced0de6a 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -160,12 +160,12 @@ impl PhysicalOptimizerRule for EnforceSorting {
let plan_requirements = PlanWithCorrespondingSort::new_default(plan);
// Execute a bottom-up traversal to enforce sorting requirements,
// remove unnecessary sorts, and optimize sort-sensitive operators:
- let adjusted = plan_requirements.transform_up(&ensure_sorting)?.data;
+ let adjusted = plan_requirements.transform_up(ensure_sorting)?.data;
let new_plan = if config.optimizer.repartition_sorts {
let plan_with_coalesce_partitions =
PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
let parallel = plan_with_coalesce_partitions
- .transform_up(¶llelize_sorts)
+ .transform_up(parallelize_sorts)
.data()?;
parallel.plan
} else {
@@ -174,7 +174,7 @@ impl PhysicalOptimizerRule for EnforceSorting {
let plan_with_pipeline_fixer =
OrderPreservationContext::new_default(new_plan);
let updated_plan = plan_with_pipeline_fixer
- .transform_up(&|plan_with_pipeline_fixer| {
+ .transform_up(|plan_with_pipeline_fixer| {
replace_with_order_preserving_variants(
plan_with_pipeline_fixer,
false,
@@ -188,11 +188,11 @@ impl PhysicalOptimizerRule for EnforceSorting {
// missed by the bottom-up traversal:
let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan);
assign_initial_requirements(&mut sort_pushdown);
- let adjusted = sort_pushdown.transform_down(&pushdown_sorts)?.data;
+ let adjusted = sort_pushdown.transform_down(pushdown_sorts)?.data;
adjusted
.plan
- .transform_up(&|plan|
Ok(Transformed::yes(replace_with_partial_sort(plan)?)))
+ .transform_up(|plan|
Ok(Transformed::yes(replace_with_partial_sort(plan)?)))
.data()
}
@@ -681,7 +681,7 @@ mod tests {
{
let plan_requirements =
PlanWithCorrespondingSort::new_default($PLAN.clone());
let adjusted = plan_requirements
- .transform_up(&ensure_sorting)
+ .transform_up(ensure_sorting)
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.
@@ -690,7 +690,7 @@ mod tests {
let plan_with_coalesce_partitions =
PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
let parallel = plan_with_coalesce_partitions
- .transform_up(¶llelize_sorts)
+ .transform_up(parallelize_sorts)
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.
@@ -701,7 +701,7 @@ mod tests {
let plan_with_pipeline_fixer =
OrderPreservationContext::new_default(new_plan);
let updated_plan = plan_with_pipeline_fixer
- .transform_up(&|plan_with_pipeline_fixer| {
+ .transform_up(|plan_with_pipeline_fixer| {
replace_with_order_preserving_variants(
plan_with_pipeline_fixer,
false,
@@ -716,7 +716,7 @@ mod tests {
let mut sort_pushdown =
SortPushDown::new_default(updated_plan.plan);
assign_initial_requirements(&mut sort_pushdown);
sort_pushdown
- .transform_down(&pushdown_sorts)
+ .transform_down(pushdown_sorts)
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs
b/datafusion/core/src/physical_optimizer/join_selection.rs
index a8b308d3de..b20f041366 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -295,7 +295,7 @@ impl PhysicalOptimizerRule for JoinSelection {
Box::new(hash_join_swap_subrule),
];
let new_plan = plan
- .transform_up(&|p| apply_subrules(p, &subrules, config))
+ .transform_up(|p| apply_subrules(p, &subrules, config))
.data()?;
// Next, we apply another subrule that tries to optimize joins using
any
// statistics their inputs might have.
@@ -312,7 +312,7 @@ impl PhysicalOptimizerRule for JoinSelection {
let collect_threshold_byte_size =
config.hash_join_single_partition_threshold;
let collect_threshold_num_rows =
config.hash_join_single_partition_threshold_rows;
new_plan
- .transform_up(&|plan| {
+ .transform_up(|plan| {
statistical_join_selection_subrule(
plan,
collect_threshold_byte_size,
@@ -891,13 +891,13 @@ mod tests_statistical {
Box::new(hash_join_swap_subrule),
];
let new_plan = plan
- .transform_up(&|p| apply_subrules(p, &subrules,
&ConfigOptions::new()))
+ .transform_up(|p| apply_subrules(p, &subrules,
&ConfigOptions::new()))
.data()?;
// TODO: End state payloads will be checked here.
let config = ConfigOptions::new().optimizer;
let collect_left_threshold =
config.hash_join_single_partition_threshold;
let collect_threshold_num_rows =
config.hash_join_single_partition_threshold_rows;
- let _ = new_plan.transform_up(&|plan| {
+ let _ = new_plan.transform_up(|plan| {
statistical_join_selection_subrule(
plan,
collect_left_threshold,
diff --git
a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs
b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs
index 9509d4e4c8..dbdcfed2ae 100644
--- a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs
+++ b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs
@@ -107,7 +107,7 @@ impl LimitedDistinctAggregation {
let mut found_match_aggr = false;
let mut rewrite_applicable = true;
- let mut closure = |plan: Arc<dyn ExecutionPlan>| {
+ let closure = |plan: Arc<dyn ExecutionPlan>| {
if !rewrite_applicable {
return Ok(Transformed::no(plan));
}
@@ -138,7 +138,7 @@ impl LimitedDistinctAggregation {
rewrite_applicable = false;
Ok(Transformed::no(plan))
};
- let child = child.clone().transform_down_mut(&mut
closure).data().ok()?;
+ let child = child.clone().transform_down(closure).data().ok()?;
if is_global_limit {
return Some(Arc::new(GlobalLimitExec::new(
child,
@@ -163,7 +163,7 @@ impl PhysicalOptimizerRule for LimitedDistinctAggregation {
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
if config.optimizer.enable_distinct_aggregation_soft_limit {
- plan.transform_down(&|plan| {
+ plan.transform_down(|plan| {
Ok(
if let Some(plan) =
LimitedDistinctAggregation::transform_limit(plan.clone())
diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs
b/datafusion/core/src/physical_optimizer/output_requirements.rs
index 829d523c99..5bf86e88d6 100644
--- a/datafusion/core/src/physical_optimizer/output_requirements.rs
+++ b/datafusion/core/src/physical_optimizer/output_requirements.rs
@@ -198,7 +198,7 @@ impl PhysicalOptimizerRule for OutputRequirements {
match self.mode {
RuleMode::Add => require_top_ordering(plan),
RuleMode::Remove => plan
- .transform_up(&|plan| {
+ .transform_up(|plan| {
if let Some(sort_req) =
plan.as_any().downcast_ref::<OutputRequirementExec>()
{
diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs
b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
index 1dc8bc5042..5c6a0ab8ea 100644
--- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs
+++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
@@ -50,7 +50,7 @@ impl PhysicalOptimizerRule for PipelineChecker {
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
- plan.transform_up(&|p| check_finiteness_requirements(p,
&config.optimizer))
+ plan.transform_up(|p| check_finiteness_requirements(p,
&config.optimizer))
.data()
}
diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index a5f5a28fb2..337c566e8f 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -75,7 +75,7 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
- plan.transform_down(&remove_unnecessary_projections).data()
+ plan.transform_down(remove_unnecessary_projections).data()
}
fn name(&self) -> &str {
@@ -273,7 +273,7 @@ fn try_unifying_projections(
// Collect the column references usage in the outer projection.
projection.expr().iter().for_each(|(expr, _)| {
- expr.apply(&mut |expr| {
+ expr.apply(|expr| {
Ok({
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
*column_ref_map.entry(column.clone()).or_default() += 1;
@@ -977,7 +977,7 @@ fn update_expr(
let new_expr = expr
.clone()
- .transform_up_mut(&mut |expr: Arc<dyn PhysicalExpr>| {
+ .transform_up(|expr: Arc<dyn PhysicalExpr>| {
if state == RewriteState::RewrittenInvalid {
return Ok(Transformed::no(expr));
}
@@ -1120,7 +1120,7 @@ fn new_columns_for_join_on(
// Rewrite all columns in `on`
(*on)
.clone()
- .transform(&|expr| {
+ .transform(|expr| {
if let Some(column) =
expr.as_any().downcast_ref::<Column>() {
// Find the column in the projection expressions
let new_column = projection_exprs
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs
b/datafusion/core/src/physical_optimizer/pruning.rs
index 74bbe1f95b..c65235f5fd 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -1175,7 +1175,7 @@ fn rewrite_column_expr(
column_old: &phys_expr::Column,
column_new: &phys_expr::Column,
) -> Result<Arc<dyn PhysicalExpr>> {
- e.transform(&|expr| {
+ e.transform(|expr| {
if let Some(column) =
expr.as_any().downcast_ref::<phys_expr::Column>() {
if column == column_old {
return Ok(Transformed::yes(Arc::new(column_new.clone())));
diff --git
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index ad19215fbf..b438e40ece 100644
---
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -395,7 +395,7 @@ mod tests {
// Run the rule top-down
let config =
SessionConfig::new().with_prefer_existing_sort($PREFER_EXISTING_SORT);
let plan_with_pipeline_fixer =
OrderPreservationContext::new_default(physical_plan);
- let parallel =
plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer|
replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false,
config.options())).data().and_then(check_integrity)?;
+ let parallel =
plan_with_pipeline_fixer.transform_up(|plan_with_pipeline_fixer|
replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false,
config.options())).data().and_then(check_integrity)?;
let optimized_physical_plan = parallel.plan;
// Get string representation of the plan
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs
b/datafusion/core/src/physical_optimizer/test_utils.rs
index 2e6e3af5df..7bc1eeb7c4 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -380,7 +380,7 @@ pub fn sort_exec(
/// replaced with direct plan equality checks.
pub fn check_integrity<T: Clone>(context: PlanContext<T>) ->
Result<PlanContext<T>> {
context
- .transform_up(&|node| {
+ .transform_up(|node| {
let children_plans = node.plan.children();
assert_eq!(node.children.len(), children_plans.len());
for (child_plan, child_node) in
diff --git a/datafusion/core/src/physical_optimizer/topk_aggregation.rs
b/datafusion/core/src/physical_optimizer/topk_aggregation.rs
index c47e5e25d1..95f7067cbe 100644
--- a/datafusion/core/src/physical_optimizer/topk_aggregation.rs
+++ b/datafusion/core/src/physical_optimizer/topk_aggregation.rs
@@ -102,7 +102,7 @@ impl TopKAggregation {
};
let mut cardinality_preserved = true;
- let mut closure = |plan: Arc<dyn ExecutionPlan>| {
+ let closure = |plan: Arc<dyn ExecutionPlan>| {
if !cardinality_preserved {
return Ok(Transformed::no(plan));
}
@@ -120,7 +120,7 @@ impl TopKAggregation {
}
Ok(Transformed::no(plan))
};
- let child = child.clone().transform_down_mut(&mut
closure).data().ok()?;
+ let child = child.clone().transform_down(closure).data().ok()?;
let sort = SortExec::new(sort.expr().to_vec(), child)
.with_fetch(sort.fetch())
.with_preserve_partitioning(sort.preserve_partitioning());
@@ -141,7 +141,7 @@ impl PhysicalOptimizerRule for TopKAggregation {
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
if config.optimizer.enable_topk_aggregation {
- plan.transform_down(&|plan| {
+ plan.transform_down(|plan| {
Ok(
if let Some(plan) =
TopKAggregation::transform_sort(plan.clone()) {
Transformed::yes(plan)
diff --git
a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
index 86be887198..e31a108162 100644
--- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
+++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
@@ -817,7 +817,7 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
impl ScalarFunctionWrapper {
// replaces placeholders with actual arguments
fn replacement(expr: &Expr, args: &[Expr]) -> Result<Expr> {
- let result = expr.clone().transform(&|e| {
+ let result = expr.clone().transform(|e| {
let r = match e {
Expr::Placeholder(placeholder) => {
let placeholder_position =
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 08d495c3be..b2357e77b1 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -1240,7 +1240,7 @@ impl Expr {
/// For example, gicen an expression like `<int32> = $0` will infer `$0` to
/// have type `int32`.
pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<Expr> {
- self.transform(&|mut expr| {
+ self.transform(|mut expr| {
// Default to assuming the arguments are the same type
if let Expr::BinaryExpr(BinaryExpr { left, op: _, right }) = &mut
expr {
rewrite_placeholder(left.as_mut(), right.as_ref(), schema)?;
diff --git a/datafusion/expr/src/expr_rewriter/mod.rs
b/datafusion/expr/src/expr_rewriter/mod.rs
index c11619fc0e..f5779df812 100644
--- a/datafusion/expr/src/expr_rewriter/mod.rs
+++ b/datafusion/expr/src/expr_rewriter/mod.rs
@@ -62,7 +62,7 @@ pub trait FunctionRewrite {
/// Recursively call [`Column::normalize_with_schemas`] on all [`Column`]
expressions
/// in the `expr` expression tree.
pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result<Expr> {
- expr.transform(&|expr| {
+ expr.transform(|expr| {
Ok({
if let Expr::Column(c) = expr {
let col = LogicalPlanBuilder::normalize(plan, c)?;
@@ -91,7 +91,7 @@ pub fn normalize_col_with_schemas_and_ambiguity_check(
return Ok(Expr::Unnest(Unnest { expr: Box::new(e) }));
}
- expr.transform(&|expr| {
+ expr.transform(|expr| {
Ok({
if let Expr::Column(c) = expr {
let col =
@@ -119,7 +119,7 @@ pub fn normalize_cols(
/// Recursively replace all [`Column`] expressions in a given expression tree
with
/// `Column` expressions provided by the hash map argument.
pub fn replace_col(expr: Expr, replace_map: &HashMap<&Column, &Column>) ->
Result<Expr> {
- expr.transform(&|expr| {
+ expr.transform(|expr| {
Ok({
if let Expr::Column(c) = &expr {
match replace_map.get(c) {
@@ -140,7 +140,7 @@ pub fn replace_col(expr: Expr, replace_map:
&HashMap<&Column, &Column>) -> Resul
/// For example, if there were expressions like `foo.bar` this would
/// rewrite it to just `bar`.
pub fn unnormalize_col(expr: Expr) -> Expr {
- expr.transform(&|expr| {
+ expr.transform(|expr| {
Ok({
if let Expr::Column(c) = expr {
let col = Column {
@@ -190,7 +190,7 @@ pub fn unnormalize_cols(exprs: impl IntoIterator<Item =
Expr>) -> Vec<Expr> {
/// Recursively remove all the ['OuterReferenceColumn'] and return the inside
Column
/// in the expression tree.
pub fn strip_outer_reference(expr: Expr) -> Expr {
- expr.transform(&|expr| {
+ expr.transform(|expr| {
Ok({
if let Expr::OuterReferenceColumn(_, col) = expr {
Transformed::yes(Expr::Column(col))
@@ -336,7 +336,7 @@ mod test {
// rewrites "foo" --> "bar"
let rewritten = col("state")
.eq(lit("foo"))
- .transform(&transformer)
+ .transform(transformer)
.data()
.unwrap();
assert_eq!(rewritten, col("state").eq(lit("bar")));
@@ -344,7 +344,7 @@ mod test {
// doesn't rewrite
let rewritten = col("state")
.eq(lit("baz"))
- .transform(&transformer)
+ .transform(transformer)
.data()
.unwrap();
assert_eq!(rewritten, col("state").eq(lit("baz")));
diff --git a/datafusion/expr/src/expr_rewriter/order_by.rs
b/datafusion/expr/src/expr_rewriter/order_by.rs
index 2fb522b979..eb38fee7ca 100644
--- a/datafusion/expr/src/expr_rewriter/order_by.rs
+++ b/datafusion/expr/src/expr_rewriter/order_by.rs
@@ -84,7 +84,7 @@ fn rewrite_in_terms_of_projection(
) -> Result<Expr> {
// assumption is that each item in exprs, such as "b + c" is
// available as an output column named "b + c"
- expr.transform(&|expr| {
+ expr.transform(|expr| {
// search for unnormalized names first such as "c1" (such as aliases)
if let Some(found) = proj_exprs.iter().find(|a| (**a) == expr) {
let col = Expr::Column(
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index dbff504601..97f5e22287 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -358,7 +358,7 @@ impl LogicalPlan {
pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>,
DataFusionError> {
let mut using_columns: Vec<HashSet<Column>> = vec![];
- self.apply_with_subqueries(&mut |plan| {
+ self.apply_with_subqueries(|plan| {
if let LogicalPlan::Join(Join {
join_constraint: JoinConstraint::Using,
on,
@@ -554,7 +554,7 @@ impl LogicalPlan {
// AND lineitem.l_quantity < Decimal128(Some(2400),15,2)
let predicate = predicate
- .transform_down(&|expr| {
+ .transform_down(|expr| {
match expr {
Expr::Exists { .. }
| Expr::ScalarSubquery(_)
@@ -1017,10 +1017,10 @@ impl LogicalPlan {
self,
param_values: &ParamValues,
) -> Result<LogicalPlan> {
- self.transform_up_with_subqueries(&|plan| {
+ self.transform_up_with_subqueries(|plan| {
let schema = plan.schema().clone();
plan.map_expressions(|e| {
- e.infer_placeholder_types(&schema)?.transform_up(&|e| {
+ e.infer_placeholder_types(&schema)?.transform_up(|e| {
if let Expr::Placeholder(Placeholder { id, .. }) = e {
let value =
param_values.get_placeholders_with_values(&id)?;
Ok(Transformed::yes(Expr::Literal(value)))
@@ -1039,9 +1039,9 @@ impl LogicalPlan {
) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
let mut param_types: HashMap<String, Option<DataType>> =
HashMap::new();
- self.apply_with_subqueries(&mut |plan| {
+ self.apply_with_subqueries(|plan| {
plan.apply_expressions(|expr| {
- expr.apply(&mut |expr| {
+ expr.apply(|expr| {
if let Expr::Placeholder(Placeholder { id, data_type }) =
expr {
let prev = param_types.get(id);
match (prev, data_type) {
@@ -3170,7 +3170,7 @@ digraph {
// after transformation, because plan is not the same anymore,
// the parent plan is built again with call to
LogicalPlan::with_new_inputs -> with_new_exprs
let plan = plan
- .transform(&|plan| match plan {
+ .transform(|plan| match plan {
LogicalPlan::TableScan(table) => {
let filter = Filter::try_new(
external_filter.clone(),
diff --git a/datafusion/expr/src/logical_plan/tree_node.rs
b/datafusion/expr/src/logical_plan/tree_node.rs
index 48f047c070..f5db5a2704 100644
--- a/datafusion/expr/src/logical_plan/tree_node.rs
+++ b/datafusion/expr/src/logical_plan/tree_node.rs
@@ -431,23 +431,6 @@ macro_rules! handle_transform_recursion {
}};
}
-macro_rules! handle_transform_recursion_down {
- ($F_DOWN:expr, $F_CHILD:expr) => {{
- $F_DOWN?
- .transform_children(|n| n.map_subqueries($F_CHILD))?
- .transform_sibling(|n| n.map_children($F_CHILD))
- }};
-}
-
-macro_rules! handle_transform_recursion_up {
- ($SELF:expr, $F_CHILD:expr, $F_UP:expr) => {{
- $SELF
- .map_subqueries($F_CHILD)?
- .transform_sibling(|n| n.map_children($F_CHILD))?
- .transform_parent(|n| $F_UP(n))
- }};
-}
-
impl LogicalPlan {
/// Calls `f` on all expressions in the current `LogicalPlan` node.
///
@@ -787,19 +770,32 @@ impl LogicalPlan {
/// ...)`.
pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
&self,
- f: &mut F,
+ mut f: F,
) -> Result<TreeNodeRecursion> {
- f(self)?
- .visit_children(|| self.apply_subqueries(|c|
c.apply_with_subqueries(f)))?
- .visit_sibling(|| self.apply_children(|c|
c.apply_with_subqueries(f)))
+ fn apply_with_subqueries_impl<
+ F: FnMut(&LogicalPlan) -> Result<TreeNodeRecursion>,
+ >(
+ node: &LogicalPlan,
+ f: &mut F,
+ ) -> Result<TreeNodeRecursion> {
+ f(node)?
+ .visit_children(|| {
+ node.apply_subqueries(|c| apply_with_subqueries_impl(c, f))
+ })?
+ .visit_sibling(|| {
+ node.apply_children(|c| apply_with_subqueries_impl(c, f))
+ })
+ }
+
+ apply_with_subqueries_impl(self, &mut f)
}
/// Similarly to [`Self::transform`], rewrites this node and its inputs
using `f`,
/// including subqueries that may appear in expressions such as `IN (SELECT
/// ...)`.
- pub fn transform_with_subqueries<F: Fn(Self) -> Result<Transformed<Self>>>(
+ pub fn transform_with_subqueries<F: FnMut(Self) ->
Result<Transformed<Self>>>(
self,
- f: &F,
+ f: F,
) -> Result<Transformed<Self>> {
self.transform_up_with_subqueries(f)
}
@@ -807,43 +803,49 @@ impl LogicalPlan {
/// Similarly to [`Self::transform_down`], rewrites this node and its
inputs using `f`,
/// including subqueries that may appear in expressions such as `IN (SELECT
/// ...)`.
- pub fn transform_down_with_subqueries<F: Fn(Self) ->
Result<Transformed<Self>>>(
+ pub fn transform_down_with_subqueries<F: FnMut(Self) ->
Result<Transformed<Self>>>(
self,
- f: &F,
+ mut f: F,
) -> Result<Transformed<Self>> {
- handle_transform_recursion_down!(f(self), |c|
c.transform_down_with_subqueries(f))
- }
+ fn transform_down_with_subqueries_impl<
+ F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
+ >(
+ node: LogicalPlan,
+ f: &mut F,
+ ) -> Result<Transformed<LogicalPlan>> {
+ f(node)?
+ .transform_children(|n| {
+ n.map_subqueries(|c|
transform_down_with_subqueries_impl(c, f))
+ })?
+ .transform_sibling(|n| {
+ n.map_children(|c| transform_down_with_subqueries_impl(c,
f))
+ })
+ }
- /// Similarly to [`Self::transform_down_mut`], rewrites this node and its
inputs using `f`,
- /// including subqueries that may appear in expressions such as `IN (SELECT
- /// ...)`.
- pub fn transform_down_mut_with_subqueries<
- F: FnMut(Self) -> Result<Transformed<Self>>,
- >(
- self,
- f: &mut F,
- ) -> Result<Transformed<Self>> {
- handle_transform_recursion_down!(f(self), |c| c
- .transform_down_mut_with_subqueries(f))
+ transform_down_with_subqueries_impl(self, &mut f)
}
/// Similarly to [`Self::transform_up`], rewrites this node and its inputs
using `f`,
/// including subqueries that may appear in expressions such as `IN (SELECT
/// ...)`.
- pub fn transform_up_with_subqueries<F: Fn(Self) ->
Result<Transformed<Self>>>(
+ pub fn transform_up_with_subqueries<F: FnMut(Self) ->
Result<Transformed<Self>>>(
self,
- f: &F,
+ mut f: F,
) -> Result<Transformed<Self>> {
- handle_transform_recursion_up!(self, |c|
c.transform_up_with_subqueries(f), f)
- }
+ fn transform_up_with_subqueries_impl<
+ F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
+ >(
+ node: LogicalPlan,
+ f: &mut F,
+ ) -> Result<Transformed<LogicalPlan>> {
+ node.map_subqueries(|c| transform_up_with_subqueries_impl(c, f))?
+ .transform_sibling(|n| {
+ n.map_children(|c| transform_up_with_subqueries_impl(c, f))
+ })?
+ .transform_parent(f)
+ }
- pub fn transform_up_mut_with_subqueries<
- F: FnMut(Self) -> Result<Transformed<Self>>,
- >(
- self,
- f: &mut F,
- ) -> Result<Transformed<Self>> {
- handle_transform_recursion_up!(self, |c|
c.transform_up_mut_with_subqueries(f), f)
+ transform_up_with_subqueries_impl(self, &mut f)
}
/// Similarly to [`Self::transform_down`], rewrites this node and its
inputs using `f`,
@@ -854,14 +856,25 @@ impl LogicalPlan {
FU: FnMut(Self) -> Result<Transformed<Self>>,
>(
self,
- f_down: &mut FD,
- f_up: &mut FU,
+ mut f_down: FD,
+ mut f_up: FU,
) -> Result<Transformed<Self>> {
- handle_transform_recursion!(
- f_down(self),
- |c| c.transform_down_up_with_subqueries(f_down, f_up),
- f_up
- )
+ fn transform_down_up_with_subqueries_impl<
+ FD: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
+ FU: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
+ >(
+ node: LogicalPlan,
+ f_down: &mut FD,
+ f_up: &mut FU,
+ ) -> Result<Transformed<LogicalPlan>> {
+ handle_transform_recursion!(
+ f_down(node),
+ |c| transform_down_up_with_subqueries_impl(c, f_down, f_up),
+ f_up
+ )
+ }
+
+ transform_down_up_with_subqueries_impl(self, &mut f_down, &mut f_up)
}
/// Similarly to [`Self::apply`], calls `f` on this node and its inputs
@@ -872,7 +885,7 @@ impl LogicalPlan {
mut f: F,
) -> Result<TreeNodeRecursion> {
self.apply_expressions(|expr| {
- expr.apply(&mut |expr| match expr {
+ expr.apply(|expr| match expr {
Expr::Exists(Exists { subquery, .. })
| Expr::InSubquery(InSubquery { subquery, .. })
| Expr::ScalarSubquery(subquery) => {
@@ -895,7 +908,7 @@ impl LogicalPlan {
mut f: F,
) -> Result<Transformed<Self>> {
self.map_expressions(|expr| {
- expr.transform_down_mut(&mut |expr| match expr {
+ expr.transform_down(|expr| match expr {
Expr::Exists(Exists { subquery, negated }) => {
f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
LogicalPlan::Subquery(subquery) => {
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 8c6b98f179..8da93c244c 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -264,7 +264,7 @@ pub fn grouping_set_to_exprlist(group_expr: &[Expr]) ->
Result<Vec<Expr>> {
/// Recursively walk an expression tree, collecting the unique set of columns
/// referenced in the expression
pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()>
{
- expr.apply(&mut |expr| {
+ expr.apply(|expr| {
match expr {
Expr::Column(qc) => {
accum.insert(qc.clone());
@@ -661,7 +661,7 @@ where
F: Fn(&Expr) -> bool,
{
let mut exprs = vec![];
- expr.apply(&mut |expr| {
+ expr.apply(|expr| {
if test_fn(expr) {
if !(exprs.contains(expr)) {
exprs.push(expr.clone())
@@ -683,7 +683,7 @@ where
F: FnMut(&Expr) -> Result<(), E>,
{
let mut err = Ok(());
- expr.apply(&mut |expr| {
+ expr.apply(|expr| {
if let Err(e) = f(expr) {
// save the error for later (it may not be a DataFusionError
err = Err(e);
@@ -839,7 +839,7 @@ pub fn find_column_exprs(exprs: &[Expr]) -> Vec<Expr> {
pub(crate) fn find_columns_referenced_by_expr(e: &Expr) -> Vec<Column> {
let mut exprs = vec![];
- e.apply(&mut |expr| {
+ e.apply(|expr| {
if let Expr::Column(c) = expr {
exprs.push(c.clone())
}
@@ -868,7 +868,7 @@ pub(crate) fn find_column_indexes_referenced_by_expr(
schema: &DFSchemaRef,
) -> Vec<usize> {
let mut indexes = vec![];
- e.apply(&mut |expr| {
+ e.apply(|expr| {
match expr {
Expr::Column(qc) => {
if let Ok(idx) = schema.index_of_column(qc) {
diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
index c5e60ee319..835c041fc3 100644
--- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
+++ b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
@@ -41,8 +41,7 @@ impl CountWildcardRule {
impl AnalyzerRule for CountWildcardRule {
fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) ->
Result<LogicalPlan> {
- plan.transform_down_with_subqueries(&analyze_internal)
- .data()
+ plan.transform_down_with_subqueries(analyze_internal).data()
}
fn name(&self) -> &str {
@@ -78,7 +77,7 @@ fn analyze_internal(plan: LogicalPlan) ->
Result<Transformed<LogicalPlan>> {
let name_preserver = NamePreserver::new(&plan);
plan.map_expressions(|expr| {
let original_name = name_preserver.save(&expr)?;
- let transformed_expr = expr.transform_up(&|expr| match expr {
+ let transformed_expr = expr.transform_up(|expr| match expr {
Expr::WindowFunction(mut window_function)
if is_count_star_window_aggregate(&window_function) =>
{
diff --git a/datafusion/optimizer/src/analyzer/function_rewrite.rs
b/datafusion/optimizer/src/analyzer/function_rewrite.rs
index 4dd3222a32..098c934bf7 100644
--- a/datafusion/optimizer/src/analyzer/function_rewrite.rs
+++ b/datafusion/optimizer/src/analyzer/function_rewrite.rs
@@ -64,7 +64,7 @@ impl ApplyFunctionRewrites {
let original_name = name_preserver.save(&expr)?;
// recursively transform the expression, applying the rewrites at
each step
- let transformed_expr = expr.transform_up(&|expr| {
+ let transformed_expr = expr.transform_up(|expr| {
let mut result = Transformed::no(expr);
for rewriter in self.function_rewrites.iter() {
result = result.transform_data(|expr| {
@@ -85,7 +85,7 @@ impl AnalyzerRule for ApplyFunctionRewrites {
}
fn analyze(&self, plan: LogicalPlan, options: &ConfigOptions) ->
Result<LogicalPlan> {
- plan.transform_up_with_subqueries(&|plan| self.rewrite_plan(plan,
options))
+ plan.transform_up_with_subqueries(|plan| self.rewrite_plan(plan,
options))
.map(|res| res.data)
}
}
diff --git a/datafusion/optimizer/src/analyzer/inline_table_scan.rs
b/datafusion/optimizer/src/analyzer/inline_table_scan.rs
index cc5f870a9c..db1ce18e86 100644
--- a/datafusion/optimizer/src/analyzer/inline_table_scan.rs
+++ b/datafusion/optimizer/src/analyzer/inline_table_scan.rs
@@ -38,7 +38,7 @@ impl InlineTableScan {
impl AnalyzerRule for InlineTableScan {
fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) ->
Result<LogicalPlan> {
- plan.transform_up(&analyze_internal).data()
+ plan.transform_up(analyze_internal).data()
}
fn name(&self) -> &str {
@@ -49,7 +49,7 @@ impl AnalyzerRule for InlineTableScan {
fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
// rewrite any subqueries in the plan first
let transformed_plan =
- plan.map_subqueries(|plan| plan.transform_up(&analyze_internal))?;
+ plan.map_subqueries(|plan| plan.transform_up(analyze_internal))?;
let transformed_plan = transformed_plan.transform_data(|plan| {
match plan {
diff --git a/datafusion/optimizer/src/analyzer/mod.rs
b/datafusion/optimizer/src/analyzer/mod.rs
index d0b83d2429..fb0eb14da6 100644
--- a/datafusion/optimizer/src/analyzer/mod.rs
+++ b/datafusion/optimizer/src/analyzer/mod.rs
@@ -155,10 +155,10 @@ impl Analyzer {
/// Do necessary check and fail the invalid plan
fn check_plan(plan: &LogicalPlan) -> Result<()> {
- plan.apply_with_subqueries(&mut |plan: &LogicalPlan| {
+ plan.apply_with_subqueries(|plan: &LogicalPlan| {
plan.apply_expressions(|expr| {
// recursively look for subqueries
- expr.apply(&mut |expr| {
+ expr.apply(|expr| {
match expr {
Expr::Exists(Exists { subquery, .. })
| Expr::InSubquery(InSubquery { subquery, .. })
diff --git a/datafusion/optimizer/src/analyzer/subquery.rs
b/datafusion/optimizer/src/analyzer/subquery.rs
index 002885266e..b46516017a 100644
--- a/datafusion/optimizer/src/analyzer/subquery.rs
+++ b/datafusion/optimizer/src/analyzer/subquery.rs
@@ -276,7 +276,7 @@ fn strip_inner_query(inner_plan: &LogicalPlan) ->
&LogicalPlan {
fn get_correlated_expressions(inner_plan: &LogicalPlan) -> Result<Vec<Expr>> {
let mut exprs = vec![];
- inner_plan.apply_with_subqueries(&mut |plan| {
+ inner_plan.apply_with_subqueries(|plan| {
if let LogicalPlan::Filter(Filter { predicate, .. }) = plan {
let (correlated, _): (Vec<_>, Vec<_>) =
split_conjunction(predicate)
.into_iter()
diff --git a/datafusion/optimizer/src/decorrelate.rs
b/datafusion/optimizer/src/decorrelate.rs
index 7eda45fb56..a6abec9efd 100644
--- a/datafusion/optimizer/src/decorrelate.rs
+++ b/datafusion/optimizer/src/decorrelate.rs
@@ -381,7 +381,7 @@ fn agg_exprs_evaluation_result_on_empty_batch(
for e in agg_expr.iter() {
let result_expr = e
.clone()
- .transform_up(&|expr| {
+ .transform_up(|expr| {
let new_expr = match expr {
Expr::AggregateFunction(expr::AggregateFunction {
func_def, ..
@@ -429,7 +429,7 @@ fn proj_exprs_evaluation_result_on_empty_batch(
for expr in proj_expr.iter() {
let result_expr = expr
.clone()
- .transform_up(&|expr| {
+ .transform_up(|expr| {
if let Expr::Column(Column { name, .. }) = &expr {
if let Some(result_expr) =
input_expr_result_map_for_count_bug.get(name)
@@ -468,7 +468,7 @@ fn filter_exprs_evaluation_result_on_empty_batch(
) -> Result<Option<Expr>> {
let result_expr = filter_expr
.clone()
- .transform_up(&|expr| {
+ .transform_up(|expr| {
if let Expr::Column(Column { name, .. }) = &expr {
if let Some(result_expr) =
input_expr_result_map_for_count_bug.get(name) {
Ok(Transformed::yes(result_expr.clone()))
diff --git a/datafusion/optimizer/src/optimize_projections.rs
b/datafusion/optimizer/src/optimize_projections.rs
index c49095c4a3..70ffd8f244 100644
--- a/datafusion/optimizer/src/optimize_projections.rs
+++ b/datafusion/optimizer/src/optimize_projections.rs
@@ -613,7 +613,7 @@ fn rewrite_expr(expr: &Expr, input: &Projection) ->
Result<Option<Expr>> {
/// columns are collected.
fn outer_columns(expr: &Expr, columns: &mut HashSet<Column>) {
// inspect_expr_pre doesn't handle subquery references, so find them
explicitly
- expr.apply(&mut |expr| {
+ expr.apply(|expr| {
match expr {
Expr::OuterReferenceColumn(_, col) => {
columns.insert(col.clone());
diff --git a/datafusion/optimizer/src/plan_signature.rs
b/datafusion/optimizer/src/plan_signature.rs
index a8e323ff42..d642e2c26e 100644
--- a/datafusion/optimizer/src/plan_signature.rs
+++ b/datafusion/optimizer/src/plan_signature.rs
@@ -73,7 +73,7 @@ impl LogicalPlanSignature {
/// Get total number of [`LogicalPlan`]s in the plan.
fn get_node_number(plan: &LogicalPlan) -> NonZeroUsize {
let mut node_number = 0;
- plan.apply_with_subqueries(&mut |_plan| {
+ plan.apply_with_subqueries(|_plan| {
node_number += 1;
Ok(TreeNodeRecursion::Continue)
})
diff --git a/datafusion/optimizer/src/push_down_filter.rs
b/datafusion/optimizer/src/push_down_filter.rs
index 2b123e3559..950932f479 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -218,7 +218,7 @@ fn can_pushdown_join_predicate(predicate: &Expr, schema:
&DFSchema) -> Result<bo
// Determine whether the predicate can evaluate as the join conditions
fn can_evaluate_as_join_condition(predicate: &Expr) -> Result<bool> {
let mut is_evaluate = true;
- predicate.apply(&mut |expr| match expr {
+ predicate.apply(|expr| match expr {
Expr::Column(_)
| Expr::Literal(_)
| Expr::Placeholder(_)
@@ -993,7 +993,7 @@ pub fn replace_cols_by_name(
e: Expr,
replace_map: &HashMap<String, Expr>,
) -> Result<Expr> {
- e.transform_up(&|expr| {
+ e.transform_up(|expr| {
Ok(if let Expr::Column(c) = &expr {
match replace_map.get(&c.flat_name()) {
Some(new_c) => Transformed::yes(new_c.clone()),
@@ -1009,7 +1009,7 @@ pub fn replace_cols_by_name(
/// check whether the expression uses the columns in `check_map`.
fn contain(e: &Expr, check_map: &HashMap<String, Expr>) -> bool {
let mut is_contain = false;
- e.apply(&mut |expr| {
+ e.apply(|expr| {
Ok(if let Expr::Column(c) = &expr {
match check_map.get(&c.flat_name()) {
Some(_) => {
diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs
b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index a8999f9c1d..f9f602297f 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -95,7 +95,7 @@ impl OptimizerRule for ScalarSubqueryToJoin {
if !expr_check_map.is_empty() {
rewrite_expr = rewrite_expr
.clone()
- .transform_up(&|expr| {
+ .transform_up(|expr| {
if let Expr::Column(col) = &expr {
if let Some(map_expr) =
expr_check_map.get(&col.name)
@@ -152,7 +152,7 @@ impl OptimizerRule for ScalarSubqueryToJoin {
{
let new_expr = rewrite_expr
.clone()
- .transform_up(&|expr| {
+ .transform_up(|expr| {
if let Expr::Column(col) = &expr {
if let Some(map_expr) =
expr_check_map.get(&col.name)
diff --git a/datafusion/optimizer/src/utils.rs
b/datafusion/optimizer/src/utils.rs
index 0e89e2452a..aad89be7db 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -100,7 +100,7 @@ pub fn log_plan(description: &str, plan: &LogicalPlan) {
/// check whether the expression is volatile predicates
pub(crate) fn is_volatile_expression(e: &Expr) -> Result<bool> {
let mut is_volatile_expr = false;
- e.apply(&mut |expr| {
+ e.apply(|expr| {
Ok(if is_volatile(expr)? {
is_volatile_expr = true;
TreeNodeRecursion::Stop
diff --git a/datafusion/physical-expr/src/equivalence/class.rs
b/datafusion/physical-expr/src/equivalence/class.rs
index 6d7d8bf3cc..8a3f8b8280 100644
--- a/datafusion/physical-expr/src/equivalence/class.rs
+++ b/datafusion/physical-expr/src/equivalence/class.rs
@@ -262,7 +262,7 @@ impl EquivalenceGroup {
/// class it matches with (if any).
pub fn normalize_expr(&self, expr: Arc<dyn PhysicalExpr>) -> Arc<dyn
PhysicalExpr> {
expr.clone()
- .transform(&|expr| {
+ .transform(|expr| {
for cls in self.iter() {
if cls.contains(&expr) {
return
Ok(Transformed::yes(cls.canonical_expr().unwrap()));
@@ -452,7 +452,7 @@ impl EquivalenceGroup {
// Rewrite rhs to point to the right side of the join:
let new_rhs = rhs
.clone()
- .transform(&|expr| {
+ .transform(|expr| {
if let Some(column) =
expr.as_any().downcast_ref::<Column>()
{
diff --git a/datafusion/physical-expr/src/equivalence/mod.rs
b/datafusion/physical-expr/src/equivalence/mod.rs
index fd8123c45b..f78d69d672 100644
--- a/datafusion/physical-expr/src/equivalence/mod.rs
+++ b/datafusion/physical-expr/src/equivalence/mod.rs
@@ -97,7 +97,7 @@ pub fn add_offset_to_expr(
expr: Arc<dyn PhysicalExpr>,
offset: usize,
) -> Arc<dyn PhysicalExpr> {
- expr.transform_down(&|e| match e.as_any().downcast_ref::<Column>() {
+ expr.transform_down(|e| match e.as_any().downcast_ref::<Column>() {
Some(col) => Ok(Transformed::yes(Arc::new(Column::new(
col.name(),
offset + col.index(),
diff --git a/datafusion/physical-expr/src/equivalence/projection.rs
b/datafusion/physical-expr/src/equivalence/projection.rs
index 92772e4623..8c747ab8a2 100644
--- a/datafusion/physical-expr/src/equivalence/projection.rs
+++ b/datafusion/physical-expr/src/equivalence/projection.rs
@@ -59,7 +59,7 @@ impl ProjectionMapping {
let target_expr = Arc::new(Column::new(name, expr_idx)) as _;
expression
.clone()
- .transform_down(&|e| match
e.as_any().downcast_ref::<Column>() {
+ .transform_down(|e| match
e.as_any().downcast_ref::<Column>() {
Some(col) => {
// Sometimes, an expression and its name in the
input_schema
// doesn't match. This can cause problems, so we
make sure
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs
b/datafusion/physical-expr/src/equivalence/properties.rs
index c8a087db20..1036779c14 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -857,7 +857,7 @@ impl EquivalenceProperties {
/// the given expression.
pub fn get_expr_ordering(&self, expr: Arc<dyn PhysicalExpr>) ->
ExprOrdering {
ExprOrdering::new_default(expr.clone())
- .transform_up(&|expr| Ok(update_ordering(expr, self)))
+ .transform_up(|expr| Ok(update_ordering(expr, self)))
.data()
// Guaranteed to always return `Ok`.
.unwrap()
diff --git a/datafusion/physical-expr/src/expressions/case.rs
b/datafusion/physical-expr/src/expressions/case.rs
index 609349509b..e376d3e7bb 100644
--- a/datafusion/physical-expr/src/expressions/case.rs
+++ b/datafusion/physical-expr/src/expressions/case.rs
@@ -958,7 +958,7 @@ mod tests {
let expr2 = expr
.clone()
- .transform(&|e| {
+ .transform(|e| {
let transformed =
match
e.as_any().downcast_ref::<crate::expressions::Literal>() {
Some(lit_value) => match lit_value.value() {
@@ -980,7 +980,7 @@ mod tests {
let expr3 = expr
.clone()
- .transform_down(&|e| {
+ .transform_down(|e| {
let transformed =
match
e.as_any().downcast_ref::<crate::expressions::Literal>() {
Some(lit_value) => match lit_value.value() {
diff --git a/datafusion/physical-expr/src/utils/mod.rs
b/datafusion/physical-expr/src/utils/mod.rs
index a0d6436586..2232f6de44 100644
--- a/datafusion/physical-expr/src/utils/mod.rs
+++ b/datafusion/physical-expr/src/utils/mod.rs
@@ -194,9 +194,7 @@ where
constructor,
};
// Use the builder to transform the expression tree node into a DAG.
- let root = init
- .transform_up_mut(&mut |node| builder.mutate(node))
- .data()?;
+ let root = init.transform_up(|node| builder.mutate(node)).data()?;
// Return a tuple containing the root node index and the DAG.
Ok((root.data.unwrap(), builder.graph))
}
@@ -204,7 +202,7 @@ where
/// Recursively extract referenced [`Column`]s within a [`PhysicalExpr`].
pub fn collect_columns(expr: &Arc<dyn PhysicalExpr>) -> HashSet<Column> {
let mut columns = HashSet::<Column>::new();
- expr.apply(&mut |expr| {
+ expr.apply(|expr| {
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
if !columns.iter().any(|c| c.eq(column)) {
columns.insert(column.clone());
@@ -224,7 +222,7 @@ pub fn reassign_predicate_columns(
schema: &SchemaRef,
ignore_not_found: bool,
) -> Result<Arc<dyn PhysicalExpr>> {
- pred.transform_down(&|expr| {
+ pred.transform_down(|expr| {
let expr_any = expr.as_any();
if let Some(column) = expr_any.downcast_ref::<Column>() {
diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs
b/datafusion/physical-plan/src/joins/stream_join_utils.rs
index 9824c723d9..ef3fda3737 100644
--- a/datafusion/physical-plan/src/joins/stream_join_utils.rs
+++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs
@@ -285,7 +285,7 @@ pub fn convert_sort_expr_with_filter_schema(
// Since we are sure that one to one column mapping includes all
columns, we convert
// the sort expression into a filter expression.
let converted_filter_expr = expr
- .transform_up(&|p| {
+ .transform_up(|p| {
convert_filter_columns(p.as_ref(),
&column_map).map(|transformed| {
match transformed {
Some(transformed) => Transformed::yes(transformed),
diff --git a/datafusion/physical-plan/src/joins/utils.rs
b/datafusion/physical-plan/src/joins/utils.rs
index a3d20b97d1..acf9ed4d7e 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -478,7 +478,7 @@ fn replace_on_columns_of_right_ordering(
let new_expr = item
.expr
.clone()
- .transform(&|e| {
+ .transform(|e| {
if e.eq(right_col) {
Ok(Transformed::yes(left_col.clone()))
} else {
diff --git a/datafusion/physical-plan/src/recursive_query.rs
b/datafusion/physical-plan/src/recursive_query.rs
index ba7d1a5454..ed897d78f0 100644
--- a/datafusion/physical-plan/src/recursive_query.rs
+++ b/datafusion/physical-plan/src/recursive_query.rs
@@ -325,7 +325,7 @@ fn assign_work_table(
work_table: Arc<WorkTable>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut work_table_refs = 0;
- plan.transform_down_mut(&mut |plan| {
+ plan.transform_down(|plan| {
if let Some(exec) = plan.as_any().downcast_ref::<WorkTableExec>() {
if work_table_refs > 0 {
not_impl_err!(
@@ -353,7 +353,7 @@ fn assign_work_table(
/// However, if the data of the left table is derived from the work table, it
will become outdated
/// as the work table changes. When the next iteration executes this plan
again, we must clear the left table.
fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn
ExecutionPlan>> {
- plan.transform_up(&|plan| {
+ plan.transform_up(|plan| {
// WorkTableExec's states have already been updated correctly.
if plan.as_any().is::<WorkTableExec>() {
Ok(Transformed::no(plan))
diff --git a/datafusion/sql/src/cte.rs b/datafusion/sql/src/cte.rs
index 5b1f81e820..4f7b9bb6d1 100644
--- a/datafusion/sql/src/cte.rs
+++ b/datafusion/sql/src/cte.rs
@@ -197,7 +197,7 @@ fn has_work_table_reference(
work_table_source: &Arc<dyn TableSource>,
) -> bool {
let mut has_reference = false;
- plan.apply(&mut |node| {
+ plan.apply(|node| {
if let LogicalPlan::TableScan(scan) = node {
if Arc::ptr_eq(&scan.source, work_table_source) {
has_reference = true;
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index 78c01e8e28..fdcef0ef6a 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -293,7 +293,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
data: transformed_expr,
transformed,
tnr: _,
- } = expr.transform_up_mut(&mut |expr: Expr| {
+ } = expr.transform_up(|expr: Expr| {
if let Expr::Unnest(Unnest { expr: ref arg }) = expr {
let column_name = expr.display_name()?;
unnest_columns.push(column_name.clone());
diff --git a/datafusion/sql/src/unparser/utils.rs
b/datafusion/sql/src/unparser/utils.rs
index 9d098c4945..c1b02c330f 100644
--- a/datafusion/sql/src/unparser/utils.rs
+++ b/datafusion/sql/src/unparser/utils.rs
@@ -60,7 +60,7 @@ pub(crate) fn find_agg_node_within_select(
/// into an actual aggregate expression COUNT(*) as identified in the
aggregate node.
pub(crate) fn unproject_agg_exprs(expr: &Expr, agg: &Aggregate) ->
Result<Expr> {
expr.clone()
- .transform(&|sub_expr| {
+ .transform(|sub_expr| {
if let Expr::Column(c) = sub_expr {
// find the column in the agg schmea
if let Ok(n) = agg.schema.index_of_column(&c) {
diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs
index 206c933a4f..2c50d3af1f 100644
--- a/datafusion/sql/src/utils.rs
+++ b/datafusion/sql/src/utils.rs
@@ -34,7 +34,7 @@ use sqlparser::ast::Ident;
/// Make a best-effort attempt at resolving all columns in the expression tree
pub(crate) fn resolve_columns(expr: &Expr, plan: &LogicalPlan) -> Result<Expr>
{
expr.clone()
- .transform_up(&|nested_expr| {
+ .transform_up(|nested_expr| {
match nested_expr {
Expr::Column(col) => {
let (qualifier, field) =
@@ -72,7 +72,7 @@ pub(crate) fn rebase_expr(
plan: &LogicalPlan,
) -> Result<Expr> {
expr.clone()
- .transform_down(&|nested_expr| {
+ .transform_down(|nested_expr| {
if base_exprs.contains(&nested_expr) {
Ok(Transformed::yes(expr_as_column_expr(&nested_expr, plan)?))
} else {
@@ -178,7 +178,7 @@ pub(crate) fn resolve_aliases_to_exprs(
aliases: &HashMap<String, Expr>,
) -> Result<Expr> {
expr.clone()
- .transform_up(&|nested_expr| match nested_expr {
+ .transform_up(|nested_expr| match nested_expr {
Expr::Column(c) if c.relation.is_none() => {
if let Some(aliased_expr) = aliases.get(&c.name) {
Ok(Transformed::yes(aliased_expr.clone()))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]