This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3c3cb87198 Improve `TreeNode` and `LogicalPlan` APIs to accept owned 
closures, deprecate `transform_down_mut()` and `transform_up_mut()` (#10126)
3c3cb87198 is described below

commit 3c3cb87198c6a0238640a99d9636f7554aa38f88
Author: Peter Toth <[email protected]>
AuthorDate: Mon Apr 22 21:19:46 2024 +0200

    Improve `TreeNode` and `LogicalPlan` APIs to accept owned closures, 
deprecate `transform_down_mut()` and `transform_up_mut()` (#10126)
    
    * Deprecate `TreeNode::transform_down_mut()` and 
`TreeNode::transform_up_mut()` methods
    
    * Refactor `TreeNode` and `LogicalPlan` apply, transform, transform_up, 
transform_down and transform_down_up APIs to accept owned closures
---
 datafusion-examples/examples/function_factory.rs   |   2 +-
 datafusion-examples/examples/rewrite_expr.rs       |   6 +-
 datafusion/common/src/tree_node.rs                 | 105 ++++++++++-------
 datafusion/core/src/datasource/listing/helpers.rs  |   2 +-
 .../src/physical_optimizer/coalesce_batches.rs     |   2 +-
 .../combine_partial_final_agg.rs                   |   4 +-
 .../src/physical_optimizer/convert_first_last.rs   |   2 +-
 .../src/physical_optimizer/enforce_distribution.rs |  12 +-
 .../core/src/physical_optimizer/enforce_sorting.rs |  18 +--
 .../core/src/physical_optimizer/join_selection.rs  |   8 +-
 .../limited_distinct_aggregation.rs                |   6 +-
 .../src/physical_optimizer/output_requirements.rs  |   2 +-
 .../src/physical_optimizer/pipeline_checker.rs     |   2 +-
 .../src/physical_optimizer/projection_pushdown.rs  |   8 +-
 datafusion/core/src/physical_optimizer/pruning.rs  |   2 +-
 .../replace_with_order_preserving_variants.rs      |   2 +-
 .../core/src/physical_optimizer/test_utils.rs      |   2 +-
 .../src/physical_optimizer/topk_aggregation.rs     |   6 +-
 .../user_defined/user_defined_scalar_functions.rs  |   2 +-
 datafusion/expr/src/expr.rs                        |   2 +-
 datafusion/expr/src/expr_rewriter/mod.rs           |  14 +--
 datafusion/expr/src/expr_rewriter/order_by.rs      |   2 +-
 datafusion/expr/src/logical_plan/plan.rs           |  14 +--
 datafusion/expr/src/logical_plan/tree_node.rs      | 129 ++++++++++++---------
 datafusion/expr/src/utils.rs                       |  10 +-
 .../optimizer/src/analyzer/count_wildcard_rule.rs  |   5 +-
 .../optimizer/src/analyzer/function_rewrite.rs     |   4 +-
 .../optimizer/src/analyzer/inline_table_scan.rs    |   4 +-
 datafusion/optimizer/src/analyzer/mod.rs           |   4 +-
 datafusion/optimizer/src/analyzer/subquery.rs      |   2 +-
 datafusion/optimizer/src/decorrelate.rs            |   6 +-
 datafusion/optimizer/src/optimize_projections.rs   |   2 +-
 datafusion/optimizer/src/plan_signature.rs         |   2 +-
 datafusion/optimizer/src/push_down_filter.rs       |   6 +-
 .../optimizer/src/scalar_subquery_to_join.rs       |   4 +-
 datafusion/optimizer/src/utils.rs                  |   2 +-
 datafusion/physical-expr/src/equivalence/class.rs  |   4 +-
 datafusion/physical-expr/src/equivalence/mod.rs    |   2 +-
 .../physical-expr/src/equivalence/projection.rs    |   2 +-
 .../physical-expr/src/equivalence/properties.rs    |   2 +-
 datafusion/physical-expr/src/expressions/case.rs   |   4 +-
 datafusion/physical-expr/src/utils/mod.rs          |   8 +-
 .../physical-plan/src/joins/stream_join_utils.rs   |   2 +-
 datafusion/physical-plan/src/joins/utils.rs        |   2 +-
 datafusion/physical-plan/src/recursive_query.rs    |   4 +-
 datafusion/sql/src/cte.rs                          |   2 +-
 datafusion/sql/src/select.rs                       |   2 +-
 datafusion/sql/src/unparser/utils.rs               |   2 +-
 datafusion/sql/src/utils.rs                        |   6 +-
 49 files changed, 238 insertions(+), 209 deletions(-)

diff --git a/datafusion-examples/examples/function_factory.rs 
b/datafusion-examples/examples/function_factory.rs
index a7c8558c6d..3973e50474 100644
--- a/datafusion-examples/examples/function_factory.rs
+++ b/datafusion-examples/examples/function_factory.rs
@@ -164,7 +164,7 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
 impl ScalarFunctionWrapper {
     // replaces placeholders such as $1 with actual arguments (args[0]
     fn replacement(expr: &Expr, args: &[Expr]) -> Result<Expr> {
-        let result = expr.clone().transform(&|e| {
+        let result = expr.clone().transform(|e| {
             let r = match e {
                 Expr::Placeholder(placeholder) => {
                     let placeholder_position =
diff --git a/datafusion-examples/examples/rewrite_expr.rs 
b/datafusion-examples/examples/rewrite_expr.rs
index dcebbb55fb..9b94a71a50 100644
--- a/datafusion-examples/examples/rewrite_expr.rs
+++ b/datafusion-examples/examples/rewrite_expr.rs
@@ -91,7 +91,7 @@ impl AnalyzerRule for MyAnalyzerRule {
 
 impl MyAnalyzerRule {
     fn analyze_plan(plan: LogicalPlan) -> Result<LogicalPlan> {
-        plan.transform(&|plan| {
+        plan.transform(|plan| {
             Ok(match plan {
                 LogicalPlan::Filter(filter) => {
                     let predicate = 
Self::analyze_expr(filter.predicate.clone())?;
@@ -107,7 +107,7 @@ impl MyAnalyzerRule {
     }
 
     fn analyze_expr(expr: Expr) -> Result<Expr> {
-        expr.transform(&|expr| {
+        expr.transform(|expr| {
             // closure is invoked for all sub expressions
             Ok(match expr {
                 Expr::Literal(ScalarValue::Int64(i)) => {
@@ -163,7 +163,7 @@ impl OptimizerRule for MyOptimizerRule {
 
 /// use rewrite_expr to modify the expression tree.
 fn my_rewrite(expr: Expr) -> Result<Expr> {
-    expr.transform(&|expr| {
+    expr.transform(|expr| {
         // closure is invoked for all sub expressions
         Ok(match expr {
             Expr::Between(Between {
diff --git a/datafusion/common/src/tree_node.rs 
b/datafusion/common/src/tree_node.rs
index dff22d4959..f41d264d35 100644
--- a/datafusion/common/src/tree_node.rs
+++ b/datafusion/common/src/tree_node.rs
@@ -31,18 +31,6 @@ macro_rules! handle_transform_recursion {
     }};
 }
 
-macro_rules! handle_transform_recursion_down {
-    ($F_DOWN:expr, $F_CHILD:expr) => {{
-        $F_DOWN?.transform_children(|n| n.map_children($F_CHILD))
-    }};
-}
-
-macro_rules! handle_transform_recursion_up {
-    ($SELF:expr, $F_CHILD:expr, $F_UP:expr) => {{
-        $SELF.map_children($F_CHILD)?.transform_parent(|n| $F_UP(n))
-    }};
-}
-
 /// Defines a visitable and rewriteable tree node. This trait is implemented
 /// for plans ([`ExecutionPlan`] and [`LogicalPlan`]) as well as expression
 /// trees ([`PhysicalExpr`], [`Expr`]) in DataFusion.
@@ -137,17 +125,24 @@ pub trait TreeNode: Sized {
     /// or run a check on the tree.
     fn apply<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
         &self,
-        f: &mut F,
+        mut f: F,
     ) -> Result<TreeNodeRecursion> {
-        f(self)?.visit_children(|| self.apply_children(|c| c.apply(f)))
+        fn apply_impl<N: TreeNode, F: FnMut(&N) -> Result<TreeNodeRecursion>>(
+            node: &N,
+            f: &mut F,
+        ) -> Result<TreeNodeRecursion> {
+            f(node)?.visit_children(|| node.apply_children(|c| apply_impl(c, 
f)))
+        }
+
+        apply_impl(self, &mut f)
     }
 
     /// Convenience utility for writing optimizer rules: Recursively apply the
     /// given function `f` to the tree in a bottom-up (post-order) fashion. 
When
     /// `f` does not apply to a given node, it is left unchanged.
-    fn transform<F: Fn(Self) -> Result<Transformed<Self>>>(
+    fn transform<F: FnMut(Self) -> Result<Transformed<Self>>>(
         self,
-        f: &F,
+        f: F,
     ) -> Result<Transformed<Self>> {
         self.transform_up(f)
     }
@@ -155,43 +150,60 @@ pub trait TreeNode: Sized {
     /// Convenience utility for writing optimizer rules: Recursively apply the
     /// given function `f` to a node and then to its children (pre-order 
traversal).
     /// When `f` does not apply to a given node, it is left unchanged.
-    fn transform_down<F: Fn(Self) -> Result<Transformed<Self>>>(
+    fn transform_down<F: FnMut(Self) -> Result<Transformed<Self>>>(
         self,
-        f: &F,
+        mut f: F,
     ) -> Result<Transformed<Self>> {
-        handle_transform_recursion_down!(f(self), |c| c.transform_down(f))
+        fn transform_down_impl<N: TreeNode, F: FnMut(N) -> 
Result<Transformed<N>>>(
+            node: N,
+            f: &mut F,
+        ) -> Result<Transformed<N>> {
+            f(node)?.transform_children(|n| n.map_children(|c| 
transform_down_impl(c, f)))
+        }
+
+        transform_down_impl(self, &mut f)
     }
 
     /// Convenience utility for writing optimizer rules: Recursively apply the
     /// given mutable function `f` to a node and then to its children 
(pre-order
     /// traversal). When `f` does not apply to a given node, it is left 
unchanged.
+    #[deprecated(since = "38.0.0", note = "Use `transform_down` instead")]
     fn transform_down_mut<F: FnMut(Self) -> Result<Transformed<Self>>>(
         self,
         f: &mut F,
     ) -> Result<Transformed<Self>> {
-        handle_transform_recursion_down!(f(self), |c| c.transform_down_mut(f))
+        self.transform_down(f)
     }
 
     /// Convenience utility for writing optimizer rules: Recursively apply the
     /// given function `f` to all children of a node, and then to the node 
itself
     /// (post-order traversal). When `f` does not apply to a given node, it is
     /// left unchanged.
-    fn transform_up<F: Fn(Self) -> Result<Transformed<Self>>>(
+    fn transform_up<F: FnMut(Self) -> Result<Transformed<Self>>>(
         self,
-        f: &F,
+        mut f: F,
     ) -> Result<Transformed<Self>> {
-        handle_transform_recursion_up!(self, |c| c.transform_up(f), f)
+        fn transform_up_impl<N: TreeNode, F: FnMut(N) -> 
Result<Transformed<N>>>(
+            node: N,
+            f: &mut F,
+        ) -> Result<Transformed<N>> {
+            node.map_children(|c| transform_up_impl(c, f))?
+                .transform_parent(f)
+        }
+
+        transform_up_impl(self, &mut f)
     }
 
     /// Convenience utility for writing optimizer rules: Recursively apply the
     /// given mutable function `f` to all children of a node, and then to the
     /// node itself (post-order traversal). When `f` does not apply to a given
     /// node, it is left unchanged.
+    #[deprecated(since = "38.0.0", note = "Use `transform_up` instead")]
     fn transform_up_mut<F: FnMut(Self) -> Result<Transformed<Self>>>(
         self,
         f: &mut F,
     ) -> Result<Transformed<Self>> {
-        handle_transform_recursion_up!(self, |c| c.transform_up_mut(f), f)
+        self.transform_up(f)
     }
 
     /// Transforms the tree using `f_down` while traversing the tree top-down
@@ -200,8 +212,8 @@ pub trait TreeNode: Sized {
     ///
     /// Use this method if you want to start the `f_up` process right where 
`f_down` jumps.
     /// This can make the whole process faster by reducing the number of 
`f_up` steps.
-    /// If you don't need this, it's just like using `transform_down_mut` 
followed by
-    /// `transform_up_mut` on the same tree.
+    /// If you don't need this, it's just like using `transform_down` followed 
by
+    /// `transform_up` on the same tree.
     ///
     /// Consider the following tree structure:
     /// ```text
@@ -288,14 +300,26 @@ pub trait TreeNode: Sized {
         FU: FnMut(Self) -> Result<Transformed<Self>>,
     >(
         self,
-        f_down: &mut FD,
-        f_up: &mut FU,
+        mut f_down: FD,
+        mut f_up: FU,
     ) -> Result<Transformed<Self>> {
-        handle_transform_recursion!(
-            f_down(self),
-            |c| c.transform_down_up(f_down, f_up),
-            f_up
-        )
+        fn transform_down_up_impl<
+            N: TreeNode,
+            FD: FnMut(N) -> Result<Transformed<N>>,
+            FU: FnMut(N) -> Result<Transformed<N>>,
+        >(
+            node: N,
+            f_down: &mut FD,
+            f_up: &mut FU,
+        ) -> Result<Transformed<N>> {
+            handle_transform_recursion!(
+                f_down(node),
+                |c| transform_down_up_impl(c, f_down, f_up),
+                f_up
+            )
+        }
+
+        transform_down_up_impl(self, &mut f_down, &mut f_up)
     }
 
     /// Returns true if `f` returns true for node in the tree.
@@ -303,7 +327,7 @@ pub trait TreeNode: Sized {
     /// Stops recursion as soon as a matching node is found
     fn exists<F: FnMut(&Self) -> bool>(&self, mut f: F) -> bool {
         let mut found = false;
-        self.apply(&mut |n| {
+        self.apply(|n| {
             Ok(if f(n) {
                 found = true;
                 TreeNodeRecursion::Stop
@@ -439,9 +463,7 @@ impl TreeNodeRecursion {
 /// This struct is used by tree transformation APIs such as
 /// - [`TreeNode::rewrite`],
 /// - [`TreeNode::transform_down`],
-/// - [`TreeNode::transform_down_mut`],
 /// - [`TreeNode::transform_up`],
-/// - [`TreeNode::transform_up_mut`],
 /// - [`TreeNode::transform_down_up`]
 ///
 /// to control the transformation and return the transformed result.
@@ -1362,7 +1384,7 @@ mod tests {
             fn $NAME() -> Result<()> {
                 let tree = test_tree();
                 let mut visits = vec![];
-                tree.apply(&mut |node| {
+                tree.apply(|node| {
                     visits.push(format!("f_down({})", node.data));
                     $F(node)
                 })?;
@@ -1451,10 +1473,7 @@ mod tests {
             #[test]
             fn $NAME() -> Result<()> {
                 let tree = test_tree();
-                assert_eq!(
-                    tree.transform_down_up(&mut $F_DOWN, &mut $F_UP,)?,
-                    $EXPECTED_TREE
-                );
+                assert_eq!(tree.transform_down_up($F_DOWN, $F_UP,)?, 
$EXPECTED_TREE);
 
                 Ok(())
             }
@@ -1466,7 +1485,7 @@ mod tests {
             #[test]
             fn $NAME() -> Result<()> {
                 let tree = test_tree();
-                assert_eq!(tree.transform_down_mut(&mut $F)?, $EXPECTED_TREE);
+                assert_eq!(tree.transform_down($F)?, $EXPECTED_TREE);
 
                 Ok(())
             }
@@ -1478,7 +1497,7 @@ mod tests {
             #[test]
             fn $NAME() -> Result<()> {
                 let tree = test_tree();
-                assert_eq!(tree.transform_up_mut(&mut $F)?, $EXPECTED_TREE);
+                assert_eq!(tree.transform_up($F)?, $EXPECTED_TREE);
 
                 Ok(())
             }
diff --git a/datafusion/core/src/datasource/listing/helpers.rs 
b/datafusion/core/src/datasource/listing/helpers.rs
index b415ce9d91..637a0daf13 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -50,7 +50,7 @@ use object_store::{ObjectMeta, ObjectStore};
 /// was performed
 pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
     let mut is_applicable = true;
-    expr.apply(&mut |expr| {
+    expr.apply(|expr| {
         match expr {
             Expr::Column(Column { ref name, .. }) => {
                 is_applicable &= col_names.contains(name);
diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs 
b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index 1e234eaae1..42b7463600 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -54,7 +54,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
         }
 
         let target_batch_size = config.execution.batch_size;
-        plan.transform_up(&|plan| {
+        plan.transform_up(|plan| {
             let plan_any = plan.as_any();
             // The goal here is to detect operators that could produce small 
batches and only
             // wrap those ones with a CoalesceBatchesExec operator. An 
alternate approach here
diff --git 
a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs 
b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index 1cba8f0258..3d8f89d569 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -51,7 +51,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
         plan: Arc<dyn ExecutionPlan>,
         _config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        plan.transform_down(&|plan| {
+        plan.transform_down(|plan| {
             let transformed =
                 plan.as_any()
                     .downcast_ref::<AggregateExec>()
@@ -179,7 +179,7 @@ fn normalize_group_exprs(group_exprs: GroupExprsRef) -> 
GroupExprs {
 fn discard_column_index(group_expr: Arc<dyn PhysicalExpr>) -> Arc<dyn 
PhysicalExpr> {
     group_expr
         .clone()
-        .transform(&|expr| {
+        .transform(|expr| {
             let normalized_form: Option<Arc<dyn PhysicalExpr>> =
                 match expr.as_any().downcast_ref::<Column>() {
                     Some(column) => Some(Arc::new(Column::new(column.name(), 
0))),
diff --git a/datafusion/core/src/physical_optimizer/convert_first_last.rs 
b/datafusion/core/src/physical_optimizer/convert_first_last.rs
index 4102313d31..14860eecf1 100644
--- a/datafusion/core/src/physical_optimizer/convert_first_last.rs
+++ b/datafusion/core/src/physical_optimizer/convert_first_last.rs
@@ -60,7 +60,7 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder {
         plan: Arc<dyn ExecutionPlan>,
         _config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        plan.transform_up(&get_common_requirement_of_aggregate_input)
+        plan.transform_up(get_common_requirement_of_aggregate_input)
             .data()
     }
 
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 3cddf73c8e..eacc842c34 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -197,12 +197,12 @@ impl PhysicalOptimizerRule for EnforceDistribution {
             // Run a top-down process to adjust input key ordering recursively
             let plan_requirements = PlanWithKeyRequirements::new_default(plan);
             let adjusted = plan_requirements
-                .transform_down(&adjust_input_keys_ordering)
+                .transform_down(adjust_input_keys_ordering)
                 .data()?;
             adjusted.plan
         } else {
             // Run a bottom-up process
-            plan.transform_up(&|plan| {
+            plan.transform_up(|plan| {
                 Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
             })
             .data()?
@@ -211,7 +211,7 @@ impl PhysicalOptimizerRule for EnforceDistribution {
         let distribution_context = DistributionContext::new_default(adjusted);
         // Distribution enforcement needs to be applied bottom-up.
         let distribution_context = distribution_context
-            .transform_up(&|distribution_context| {
+            .transform_up(|distribution_context| {
                 ensure_distribution(distribution_context, config)
             })
             .data()?;
@@ -1768,14 +1768,14 @@ pub(crate) mod tests {
                     let plan_requirements =
                         PlanWithKeyRequirements::new_default($PLAN.clone());
                     let adjusted = plan_requirements
-                        .transform_down(&adjust_input_keys_ordering)
+                        .transform_down(adjust_input_keys_ordering)
                         .data()
                         .and_then(check_integrity)?;
                     // TODO: End state payloads will be checked here.
                     adjusted.plan
                 } else {
                     // Run reorder_join_keys_to_inputs rule
-                    $PLAN.clone().transform_up(&|plan| {
+                    $PLAN.clone().transform_up(|plan| {
                         
Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
                     })
                     .data()?
@@ -1783,7 +1783,7 @@ pub(crate) mod tests {
 
                 // Then run ensure_distribution rule
                 DistributionContext::new_default(adjusted)
-                    .transform_up(&|distribution_context| {
+                    .transform_up(|distribution_context| {
                         ensure_distribution(distribution_context, &config)
                     })
                     .data()
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs 
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index 5bf21c3dfa..2dced0de6a 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -160,12 +160,12 @@ impl PhysicalOptimizerRule for EnforceSorting {
         let plan_requirements = PlanWithCorrespondingSort::new_default(plan);
         // Execute a bottom-up traversal to enforce sorting requirements,
         // remove unnecessary sorts, and optimize sort-sensitive operators:
-        let adjusted = plan_requirements.transform_up(&ensure_sorting)?.data;
+        let adjusted = plan_requirements.transform_up(ensure_sorting)?.data;
         let new_plan = if config.optimizer.repartition_sorts {
             let plan_with_coalesce_partitions =
                 
PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
             let parallel = plan_with_coalesce_partitions
-                .transform_up(&parallelize_sorts)
+                .transform_up(parallelize_sorts)
                 .data()?;
             parallel.plan
         } else {
@@ -174,7 +174,7 @@ impl PhysicalOptimizerRule for EnforceSorting {
 
         let plan_with_pipeline_fixer = 
OrderPreservationContext::new_default(new_plan);
         let updated_plan = plan_with_pipeline_fixer
-            .transform_up(&|plan_with_pipeline_fixer| {
+            .transform_up(|plan_with_pipeline_fixer| {
                 replace_with_order_preserving_variants(
                     plan_with_pipeline_fixer,
                     false,
@@ -188,11 +188,11 @@ impl PhysicalOptimizerRule for EnforceSorting {
         // missed by the bottom-up traversal:
         let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan);
         assign_initial_requirements(&mut sort_pushdown);
-        let adjusted = sort_pushdown.transform_down(&pushdown_sorts)?.data;
+        let adjusted = sort_pushdown.transform_down(pushdown_sorts)?.data;
 
         adjusted
             .plan
-            .transform_up(&|plan| 
Ok(Transformed::yes(replace_with_partial_sort(plan)?)))
+            .transform_up(|plan| 
Ok(Transformed::yes(replace_with_partial_sort(plan)?)))
             .data()
     }
 
@@ -681,7 +681,7 @@ mod tests {
             {
                 let plan_requirements = 
PlanWithCorrespondingSort::new_default($PLAN.clone());
                 let adjusted = plan_requirements
-                    .transform_up(&ensure_sorting)
+                    .transform_up(ensure_sorting)
                     .data()
                     .and_then(check_integrity)?;
                 // TODO: End state payloads will be checked here.
@@ -690,7 +690,7 @@ mod tests {
                     let plan_with_coalesce_partitions =
                         
PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
                     let parallel = plan_with_coalesce_partitions
-                        .transform_up(&parallelize_sorts)
+                        .transform_up(parallelize_sorts)
                         .data()
                         .and_then(check_integrity)?;
                     // TODO: End state payloads will be checked here.
@@ -701,7 +701,7 @@ mod tests {
 
                 let plan_with_pipeline_fixer = 
OrderPreservationContext::new_default(new_plan);
                 let updated_plan = plan_with_pipeline_fixer
-                    .transform_up(&|plan_with_pipeline_fixer| {
+                    .transform_up(|plan_with_pipeline_fixer| {
                         replace_with_order_preserving_variants(
                             plan_with_pipeline_fixer,
                             false,
@@ -716,7 +716,7 @@ mod tests {
                 let mut sort_pushdown = 
SortPushDown::new_default(updated_plan.plan);
                 assign_initial_requirements(&mut sort_pushdown);
                 sort_pushdown
-                    .transform_down(&pushdown_sorts)
+                    .transform_down(pushdown_sorts)
                     .data()
                     .and_then(check_integrity)?;
                 // TODO: End state payloads will be checked here.
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs 
b/datafusion/core/src/physical_optimizer/join_selection.rs
index a8b308d3de..b20f041366 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -295,7 +295,7 @@ impl PhysicalOptimizerRule for JoinSelection {
             Box::new(hash_join_swap_subrule),
         ];
         let new_plan = plan
-            .transform_up(&|p| apply_subrules(p, &subrules, config))
+            .transform_up(|p| apply_subrules(p, &subrules, config))
             .data()?;
         // Next, we apply another subrule that tries to optimize joins using 
any
         // statistics their inputs might have.
@@ -312,7 +312,7 @@ impl PhysicalOptimizerRule for JoinSelection {
         let collect_threshold_byte_size = 
config.hash_join_single_partition_threshold;
         let collect_threshold_num_rows = 
config.hash_join_single_partition_threshold_rows;
         new_plan
-            .transform_up(&|plan| {
+            .transform_up(|plan| {
                 statistical_join_selection_subrule(
                     plan,
                     collect_threshold_byte_size,
@@ -891,13 +891,13 @@ mod tests_statistical {
             Box::new(hash_join_swap_subrule),
         ];
         let new_plan = plan
-            .transform_up(&|p| apply_subrules(p, &subrules, 
&ConfigOptions::new()))
+            .transform_up(|p| apply_subrules(p, &subrules, 
&ConfigOptions::new()))
             .data()?;
         // TODO: End state payloads will be checked here.
         let config = ConfigOptions::new().optimizer;
         let collect_left_threshold = 
config.hash_join_single_partition_threshold;
         let collect_threshold_num_rows = 
config.hash_join_single_partition_threshold_rows;
-        let _ = new_plan.transform_up(&|plan| {
+        let _ = new_plan.transform_up(|plan| {
             statistical_join_selection_subrule(
                 plan,
                 collect_left_threshold,
diff --git 
a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs 
b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs
index 9509d4e4c8..dbdcfed2ae 100644
--- a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs
+++ b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs
@@ -107,7 +107,7 @@ impl LimitedDistinctAggregation {
         let mut found_match_aggr = false;
 
         let mut rewrite_applicable = true;
-        let mut closure = |plan: Arc<dyn ExecutionPlan>| {
+        let closure = |plan: Arc<dyn ExecutionPlan>| {
             if !rewrite_applicable {
                 return Ok(Transformed::no(plan));
             }
@@ -138,7 +138,7 @@ impl LimitedDistinctAggregation {
             rewrite_applicable = false;
             Ok(Transformed::no(plan))
         };
-        let child = child.clone().transform_down_mut(&mut 
closure).data().ok()?;
+        let child = child.clone().transform_down(closure).data().ok()?;
         if is_global_limit {
             return Some(Arc::new(GlobalLimitExec::new(
                 child,
@@ -163,7 +163,7 @@ impl PhysicalOptimizerRule for LimitedDistinctAggregation {
         config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         if config.optimizer.enable_distinct_aggregation_soft_limit {
-            plan.transform_down(&|plan| {
+            plan.transform_down(|plan| {
                 Ok(
                     if let Some(plan) =
                         
LimitedDistinctAggregation::transform_limit(plan.clone())
diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs 
b/datafusion/core/src/physical_optimizer/output_requirements.rs
index 829d523c99..5bf86e88d6 100644
--- a/datafusion/core/src/physical_optimizer/output_requirements.rs
+++ b/datafusion/core/src/physical_optimizer/output_requirements.rs
@@ -198,7 +198,7 @@ impl PhysicalOptimizerRule for OutputRequirements {
         match self.mode {
             RuleMode::Add => require_top_ordering(plan),
             RuleMode::Remove => plan
-                .transform_up(&|plan| {
+                .transform_up(|plan| {
                     if let Some(sort_req) =
                         plan.as_any().downcast_ref::<OutputRequirementExec>()
                     {
diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs 
b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
index 1dc8bc5042..5c6a0ab8ea 100644
--- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs
+++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
@@ -50,7 +50,7 @@ impl PhysicalOptimizerRule for PipelineChecker {
         plan: Arc<dyn ExecutionPlan>,
         config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        plan.transform_up(&|p| check_finiteness_requirements(p, 
&config.optimizer))
+        plan.transform_up(|p| check_finiteness_requirements(p, 
&config.optimizer))
             .data()
     }
 
diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs 
b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index a5f5a28fb2..337c566e8f 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -75,7 +75,7 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
         plan: Arc<dyn ExecutionPlan>,
         _config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        plan.transform_down(&remove_unnecessary_projections).data()
+        plan.transform_down(remove_unnecessary_projections).data()
     }
 
     fn name(&self) -> &str {
@@ -273,7 +273,7 @@ fn try_unifying_projections(
 
     // Collect the column references usage in the outer projection.
     projection.expr().iter().for_each(|(expr, _)| {
-        expr.apply(&mut |expr| {
+        expr.apply(|expr| {
             Ok({
                 if let Some(column) = expr.as_any().downcast_ref::<Column>() {
                     *column_ref_map.entry(column.clone()).or_default() += 1;
@@ -977,7 +977,7 @@ fn update_expr(
 
     let new_expr = expr
         .clone()
-        .transform_up_mut(&mut |expr: Arc<dyn PhysicalExpr>| {
+        .transform_up(|expr: Arc<dyn PhysicalExpr>| {
             if state == RewriteState::RewrittenInvalid {
                 return Ok(Transformed::no(expr));
             }
@@ -1120,7 +1120,7 @@ fn new_columns_for_join_on(
             // Rewrite all columns in `on`
             (*on)
                 .clone()
-                .transform(&|expr| {
+                .transform(|expr| {
                     if let Some(column) = 
expr.as_any().downcast_ref::<Column>() {
                         // Find the column in the projection expressions
                         let new_column = projection_exprs
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs 
b/datafusion/core/src/physical_optimizer/pruning.rs
index 74bbe1f95b..c65235f5fd 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -1175,7 +1175,7 @@ fn rewrite_column_expr(
     column_old: &phys_expr::Column,
     column_new: &phys_expr::Column,
 ) -> Result<Arc<dyn PhysicalExpr>> {
-    e.transform(&|expr| {
+    e.transform(|expr| {
         if let Some(column) = 
expr.as_any().downcast_ref::<phys_expr::Column>() {
             if column == column_old {
                 return Ok(Transformed::yes(Arc::new(column_new.clone())));
diff --git 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index ad19215fbf..b438e40ece 100644
--- 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++ 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -395,7 +395,7 @@ mod tests {
             // Run the rule top-down
             let config = 
SessionConfig::new().with_prefer_existing_sort($PREFER_EXISTING_SORT);
             let plan_with_pipeline_fixer = 
OrderPreservationContext::new_default(physical_plan);
-            let parallel = 
plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer| 
replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false, 
config.options())).data().and_then(check_integrity)?;
+            let parallel = 
plan_with_pipeline_fixer.transform_up(|plan_with_pipeline_fixer| 
replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false, 
config.options())).data().and_then(check_integrity)?;
             let optimized_physical_plan = parallel.plan;
 
             // Get string representation of the plan
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs 
b/datafusion/core/src/physical_optimizer/test_utils.rs
index 2e6e3af5df..7bc1eeb7c4 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -380,7 +380,7 @@ pub fn sort_exec(
 /// replaced with direct plan equality checks.
 pub fn check_integrity<T: Clone>(context: PlanContext<T>) -> 
Result<PlanContext<T>> {
     context
-        .transform_up(&|node| {
+        .transform_up(|node| {
             let children_plans = node.plan.children();
             assert_eq!(node.children.len(), children_plans.len());
             for (child_plan, child_node) in
diff --git a/datafusion/core/src/physical_optimizer/topk_aggregation.rs 
b/datafusion/core/src/physical_optimizer/topk_aggregation.rs
index c47e5e25d1..95f7067cbe 100644
--- a/datafusion/core/src/physical_optimizer/topk_aggregation.rs
+++ b/datafusion/core/src/physical_optimizer/topk_aggregation.rs
@@ -102,7 +102,7 @@ impl TopKAggregation {
         };
 
         let mut cardinality_preserved = true;
-        let mut closure = |plan: Arc<dyn ExecutionPlan>| {
+        let closure = |plan: Arc<dyn ExecutionPlan>| {
             if !cardinality_preserved {
                 return Ok(Transformed::no(plan));
             }
@@ -120,7 +120,7 @@ impl TopKAggregation {
             }
             Ok(Transformed::no(plan))
         };
-        let child = child.clone().transform_down_mut(&mut 
closure).data().ok()?;
+        let child = child.clone().transform_down(closure).data().ok()?;
         let sort = SortExec::new(sort.expr().to_vec(), child)
             .with_fetch(sort.fetch())
             .with_preserve_partitioning(sort.preserve_partitioning());
@@ -141,7 +141,7 @@ impl PhysicalOptimizerRule for TopKAggregation {
         config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         if config.optimizer.enable_topk_aggregation {
-            plan.transform_down(&|plan| {
+            plan.transform_down(|plan| {
                 Ok(
                     if let Some(plan) = 
TopKAggregation::transform_sort(plan.clone()) {
                         Transformed::yes(plan)
diff --git 
a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs 
b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
index 86be887198..e31a108162 100644
--- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
+++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
@@ -817,7 +817,7 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
 impl ScalarFunctionWrapper {
     // replaces placeholders with actual arguments
     fn replacement(expr: &Expr, args: &[Expr]) -> Result<Expr> {
-        let result = expr.clone().transform(&|e| {
+        let result = expr.clone().transform(|e| {
             let r = match e {
                 Expr::Placeholder(placeholder) => {
                     let placeholder_position =
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 08d495c3be..b2357e77b1 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -1240,7 +1240,7 @@ impl Expr {
     /// For example, gicen an expression like `<int32> = $0` will infer `$0` to
     /// have type `int32`.
     pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<Expr> {
-        self.transform(&|mut expr| {
+        self.transform(|mut expr| {
             // Default to assuming the arguments are the same type
             if let Expr::BinaryExpr(BinaryExpr { left, op: _, right }) = &mut 
expr {
                 rewrite_placeholder(left.as_mut(), right.as_ref(), schema)?;
diff --git a/datafusion/expr/src/expr_rewriter/mod.rs 
b/datafusion/expr/src/expr_rewriter/mod.rs
index c11619fc0e..f5779df812 100644
--- a/datafusion/expr/src/expr_rewriter/mod.rs
+++ b/datafusion/expr/src/expr_rewriter/mod.rs
@@ -62,7 +62,7 @@ pub trait FunctionRewrite {
 /// Recursively call [`Column::normalize_with_schemas`] on all [`Column`] 
expressions
 /// in the `expr` expression tree.
 pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result<Expr> {
-    expr.transform(&|expr| {
+    expr.transform(|expr| {
         Ok({
             if let Expr::Column(c) = expr {
                 let col = LogicalPlanBuilder::normalize(plan, c)?;
@@ -91,7 +91,7 @@ pub fn normalize_col_with_schemas_and_ambiguity_check(
         return Ok(Expr::Unnest(Unnest { expr: Box::new(e) }));
     }
 
-    expr.transform(&|expr| {
+    expr.transform(|expr| {
         Ok({
             if let Expr::Column(c) = expr {
                 let col =
@@ -119,7 +119,7 @@ pub fn normalize_cols(
 /// Recursively replace all [`Column`] expressions in a given expression tree 
with
 /// `Column` expressions provided by the hash map argument.
 pub fn replace_col(expr: Expr, replace_map: &HashMap<&Column, &Column>) -> 
Result<Expr> {
-    expr.transform(&|expr| {
+    expr.transform(|expr| {
         Ok({
             if let Expr::Column(c) = &expr {
                 match replace_map.get(c) {
@@ -140,7 +140,7 @@ pub fn replace_col(expr: Expr, replace_map: 
&HashMap<&Column, &Column>) -> Resul
 /// For example, if there were expressions like `foo.bar` this would
 /// rewrite it to just `bar`.
 pub fn unnormalize_col(expr: Expr) -> Expr {
-    expr.transform(&|expr| {
+    expr.transform(|expr| {
         Ok({
             if let Expr::Column(c) = expr {
                 let col = Column {
@@ -190,7 +190,7 @@ pub fn unnormalize_cols(exprs: impl IntoIterator<Item = 
Expr>) -> Vec<Expr> {
 /// Recursively remove all the ['OuterReferenceColumn'] and return the inside 
Column
 /// in the expression tree.
 pub fn strip_outer_reference(expr: Expr) -> Expr {
-    expr.transform(&|expr| {
+    expr.transform(|expr| {
         Ok({
             if let Expr::OuterReferenceColumn(_, col) = expr {
                 Transformed::yes(Expr::Column(col))
@@ -336,7 +336,7 @@ mod test {
         // rewrites "foo" --> "bar"
         let rewritten = col("state")
             .eq(lit("foo"))
-            .transform(&transformer)
+            .transform(transformer)
             .data()
             .unwrap();
         assert_eq!(rewritten, col("state").eq(lit("bar")));
@@ -344,7 +344,7 @@ mod test {
         // doesn't rewrite
         let rewritten = col("state")
             .eq(lit("baz"))
-            .transform(&transformer)
+            .transform(transformer)
             .data()
             .unwrap();
         assert_eq!(rewritten, col("state").eq(lit("baz")));
diff --git a/datafusion/expr/src/expr_rewriter/order_by.rs 
b/datafusion/expr/src/expr_rewriter/order_by.rs
index 2fb522b979..eb38fee7ca 100644
--- a/datafusion/expr/src/expr_rewriter/order_by.rs
+++ b/datafusion/expr/src/expr_rewriter/order_by.rs
@@ -84,7 +84,7 @@ fn rewrite_in_terms_of_projection(
 ) -> Result<Expr> {
     // assumption is that each item in exprs, such as "b + c" is
     // available as an output column named "b + c"
-    expr.transform(&|expr| {
+    expr.transform(|expr| {
         // search for unnormalized names first such as "c1" (such as aliases)
         if let Some(found) = proj_exprs.iter().find(|a| (**a) == expr) {
             let col = Expr::Column(
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index dbff504601..97f5e22287 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -358,7 +358,7 @@ impl LogicalPlan {
     pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, 
DataFusionError> {
         let mut using_columns: Vec<HashSet<Column>> = vec![];
 
-        self.apply_with_subqueries(&mut |plan| {
+        self.apply_with_subqueries(|plan| {
             if let LogicalPlan::Join(Join {
                 join_constraint: JoinConstraint::Using,
                 on,
@@ -554,7 +554,7 @@ impl LogicalPlan {
                 // AND lineitem.l_quantity < Decimal128(Some(2400),15,2)
 
                 let predicate = predicate
-                    .transform_down(&|expr| {
+                    .transform_down(|expr| {
                         match expr {
                             Expr::Exists { .. }
                             | Expr::ScalarSubquery(_)
@@ -1017,10 +1017,10 @@ impl LogicalPlan {
         self,
         param_values: &ParamValues,
     ) -> Result<LogicalPlan> {
-        self.transform_up_with_subqueries(&|plan| {
+        self.transform_up_with_subqueries(|plan| {
             let schema = plan.schema().clone();
             plan.map_expressions(|e| {
-                e.infer_placeholder_types(&schema)?.transform_up(&|e| {
+                e.infer_placeholder_types(&schema)?.transform_up(|e| {
                     if let Expr::Placeholder(Placeholder { id, .. }) = e {
                         let value = 
param_values.get_placeholders_with_values(&id)?;
                         Ok(Transformed::yes(Expr::Literal(value)))
@@ -1039,9 +1039,9 @@ impl LogicalPlan {
     ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
         let mut param_types: HashMap<String, Option<DataType>> = 
HashMap::new();
 
-        self.apply_with_subqueries(&mut |plan| {
+        self.apply_with_subqueries(|plan| {
             plan.apply_expressions(|expr| {
-                expr.apply(&mut |expr| {
+                expr.apply(|expr| {
                     if let Expr::Placeholder(Placeholder { id, data_type }) = 
expr {
                         let prev = param_types.get(id);
                         match (prev, data_type) {
@@ -3170,7 +3170,7 @@ digraph {
         // after transformation, because plan is not the same anymore,
         // the parent plan is built again with call to 
LogicalPlan::with_new_inputs -> with_new_exprs
         let plan = plan
-            .transform(&|plan| match plan {
+            .transform(|plan| match plan {
                 LogicalPlan::TableScan(table) => {
                     let filter = Filter::try_new(
                         external_filter.clone(),
diff --git a/datafusion/expr/src/logical_plan/tree_node.rs 
b/datafusion/expr/src/logical_plan/tree_node.rs
index 48f047c070..f5db5a2704 100644
--- a/datafusion/expr/src/logical_plan/tree_node.rs
+++ b/datafusion/expr/src/logical_plan/tree_node.rs
@@ -431,23 +431,6 @@ macro_rules! handle_transform_recursion {
     }};
 }
 
-macro_rules! handle_transform_recursion_down {
-    ($F_DOWN:expr, $F_CHILD:expr) => {{
-        $F_DOWN?
-            .transform_children(|n| n.map_subqueries($F_CHILD))?
-            .transform_sibling(|n| n.map_children($F_CHILD))
-    }};
-}
-
-macro_rules! handle_transform_recursion_up {
-    ($SELF:expr, $F_CHILD:expr, $F_UP:expr) => {{
-        $SELF
-            .map_subqueries($F_CHILD)?
-            .transform_sibling(|n| n.map_children($F_CHILD))?
-            .transform_parent(|n| $F_UP(n))
-    }};
-}
-
 impl LogicalPlan {
     /// Calls `f` on all expressions in the current `LogicalPlan` node.
     ///
@@ -787,19 +770,32 @@ impl LogicalPlan {
     /// ...)`.
     pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
         &self,
-        f: &mut F,
+        mut f: F,
     ) -> Result<TreeNodeRecursion> {
-        f(self)?
-            .visit_children(|| self.apply_subqueries(|c| 
c.apply_with_subqueries(f)))?
-            .visit_sibling(|| self.apply_children(|c| 
c.apply_with_subqueries(f)))
+        fn apply_with_subqueries_impl<
+            F: FnMut(&LogicalPlan) -> Result<TreeNodeRecursion>,
+        >(
+            node: &LogicalPlan,
+            f: &mut F,
+        ) -> Result<TreeNodeRecursion> {
+            f(node)?
+                .visit_children(|| {
+                    node.apply_subqueries(|c| apply_with_subqueries_impl(c, f))
+                })?
+                .visit_sibling(|| {
+                    node.apply_children(|c| apply_with_subqueries_impl(c, f))
+                })
+        }
+
+        apply_with_subqueries_impl(self, &mut f)
     }
 
     /// Similarly to [`Self::transform`], rewrites this node and its inputs 
using `f`,
     /// including subqueries that may appear in expressions such as `IN (SELECT
     /// ...)`.
-    pub fn transform_with_subqueries<F: Fn(Self) -> Result<Transformed<Self>>>(
+    pub fn transform_with_subqueries<F: FnMut(Self) -> 
Result<Transformed<Self>>>(
         self,
-        f: &F,
+        f: F,
     ) -> Result<Transformed<Self>> {
         self.transform_up_with_subqueries(f)
     }
@@ -807,43 +803,49 @@ impl LogicalPlan {
     /// Similarly to [`Self::transform_down`], rewrites this node and its 
inputs using `f`,
     /// including subqueries that may appear in expressions such as `IN (SELECT
     /// ...)`.
-    pub fn transform_down_with_subqueries<F: Fn(Self) -> 
Result<Transformed<Self>>>(
+    pub fn transform_down_with_subqueries<F: FnMut(Self) -> 
Result<Transformed<Self>>>(
         self,
-        f: &F,
+        mut f: F,
     ) -> Result<Transformed<Self>> {
-        handle_transform_recursion_down!(f(self), |c| 
c.transform_down_with_subqueries(f))
-    }
+        fn transform_down_with_subqueries_impl<
+            F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
+        >(
+            node: LogicalPlan,
+            f: &mut F,
+        ) -> Result<Transformed<LogicalPlan>> {
+            f(node)?
+                .transform_children(|n| {
+                    n.map_subqueries(|c| 
transform_down_with_subqueries_impl(c, f))
+                })?
+                .transform_sibling(|n| {
+                    n.map_children(|c| transform_down_with_subqueries_impl(c, 
f))
+                })
+        }
 
-    /// Similarly to [`Self::transform_down_mut`], rewrites this node and its 
inputs using `f`,
-    /// including subqueries that may appear in expressions such as `IN (SELECT
-    /// ...)`.
-    pub fn transform_down_mut_with_subqueries<
-        F: FnMut(Self) -> Result<Transformed<Self>>,
-    >(
-        self,
-        f: &mut F,
-    ) -> Result<Transformed<Self>> {
-        handle_transform_recursion_down!(f(self), |c| c
-            .transform_down_mut_with_subqueries(f))
+        transform_down_with_subqueries_impl(self, &mut f)
     }
 
     /// Similarly to [`Self::transform_up`], rewrites this node and its inputs 
using `f`,
     /// including subqueries that may appear in expressions such as `IN (SELECT
     /// ...)`.
-    pub fn transform_up_with_subqueries<F: Fn(Self) -> 
Result<Transformed<Self>>>(
+    pub fn transform_up_with_subqueries<F: FnMut(Self) -> 
Result<Transformed<Self>>>(
         self,
-        f: &F,
+        mut f: F,
     ) -> Result<Transformed<Self>> {
-        handle_transform_recursion_up!(self, |c| 
c.transform_up_with_subqueries(f), f)
-    }
+        fn transform_up_with_subqueries_impl<
+            F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
+        >(
+            node: LogicalPlan,
+            f: &mut F,
+        ) -> Result<Transformed<LogicalPlan>> {
+            node.map_subqueries(|c| transform_up_with_subqueries_impl(c, f))?
+                .transform_sibling(|n| {
+                    n.map_children(|c| transform_up_with_subqueries_impl(c, f))
+                })?
+                .transform_parent(f)
+        }
 
-    pub fn transform_up_mut_with_subqueries<
-        F: FnMut(Self) -> Result<Transformed<Self>>,
-    >(
-        self,
-        f: &mut F,
-    ) -> Result<Transformed<Self>> {
-        handle_transform_recursion_up!(self, |c| 
c.transform_up_mut_with_subqueries(f), f)
+        transform_up_with_subqueries_impl(self, &mut f)
     }
 
     /// Similarly to [`Self::transform_down`], rewrites this node and its 
inputs using `f`,
@@ -854,14 +856,25 @@ impl LogicalPlan {
         FU: FnMut(Self) -> Result<Transformed<Self>>,
     >(
         self,
-        f_down: &mut FD,
-        f_up: &mut FU,
+        mut f_down: FD,
+        mut f_up: FU,
     ) -> Result<Transformed<Self>> {
-        handle_transform_recursion!(
-            f_down(self),
-            |c| c.transform_down_up_with_subqueries(f_down, f_up),
-            f_up
-        )
+        fn transform_down_up_with_subqueries_impl<
+            FD: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
+            FU: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
+        >(
+            node: LogicalPlan,
+            f_down: &mut FD,
+            f_up: &mut FU,
+        ) -> Result<Transformed<LogicalPlan>> {
+            handle_transform_recursion!(
+                f_down(node),
+                |c| transform_down_up_with_subqueries_impl(c, f_down, f_up),
+                f_up
+            )
+        }
+
+        transform_down_up_with_subqueries_impl(self, &mut f_down, &mut f_up)
     }
 
     /// Similarly to [`Self::apply`], calls `f` on  this node and its inputs
@@ -872,7 +885,7 @@ impl LogicalPlan {
         mut f: F,
     ) -> Result<TreeNodeRecursion> {
         self.apply_expressions(|expr| {
-            expr.apply(&mut |expr| match expr {
+            expr.apply(|expr| match expr {
                 Expr::Exists(Exists { subquery, .. })
                 | Expr::InSubquery(InSubquery { subquery, .. })
                 | Expr::ScalarSubquery(subquery) => {
@@ -895,7 +908,7 @@ impl LogicalPlan {
         mut f: F,
     ) -> Result<Transformed<Self>> {
         self.map_expressions(|expr| {
-            expr.transform_down_mut(&mut |expr| match expr {
+            expr.transform_down(|expr| match expr {
                 Expr::Exists(Exists { subquery, negated }) => {
                     f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
                         LogicalPlan::Subquery(subquery) => {
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 8c6b98f179..8da93c244c 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -264,7 +264,7 @@ pub fn grouping_set_to_exprlist(group_expr: &[Expr]) -> 
Result<Vec<Expr>> {
 /// Recursively walk an expression tree, collecting the unique set of columns
 /// referenced in the expression
 pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()> 
{
-    expr.apply(&mut |expr| {
+    expr.apply(|expr| {
         match expr {
             Expr::Column(qc) => {
                 accum.insert(qc.clone());
@@ -661,7 +661,7 @@ where
     F: Fn(&Expr) -> bool,
 {
     let mut exprs = vec![];
-    expr.apply(&mut |expr| {
+    expr.apply(|expr| {
         if test_fn(expr) {
             if !(exprs.contains(expr)) {
                 exprs.push(expr.clone())
@@ -683,7 +683,7 @@ where
     F: FnMut(&Expr) -> Result<(), E>,
 {
     let mut err = Ok(());
-    expr.apply(&mut |expr| {
+    expr.apply(|expr| {
         if let Err(e) = f(expr) {
             // save the error for later (it may not be a DataFusionError
             err = Err(e);
@@ -839,7 +839,7 @@ pub fn find_column_exprs(exprs: &[Expr]) -> Vec<Expr> {
 
 pub(crate) fn find_columns_referenced_by_expr(e: &Expr) -> Vec<Column> {
     let mut exprs = vec![];
-    e.apply(&mut |expr| {
+    e.apply(|expr| {
         if let Expr::Column(c) = expr {
             exprs.push(c.clone())
         }
@@ -868,7 +868,7 @@ pub(crate) fn find_column_indexes_referenced_by_expr(
     schema: &DFSchemaRef,
 ) -> Vec<usize> {
     let mut indexes = vec![];
-    e.apply(&mut |expr| {
+    e.apply(|expr| {
         match expr {
             Expr::Column(qc) => {
                 if let Ok(idx) = schema.index_of_column(qc) {
diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs 
b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
index c5e60ee319..835c041fc3 100644
--- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
+++ b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
@@ -41,8 +41,7 @@ impl CountWildcardRule {
 
 impl AnalyzerRule for CountWildcardRule {
     fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> 
Result<LogicalPlan> {
-        plan.transform_down_with_subqueries(&analyze_internal)
-            .data()
+        plan.transform_down_with_subqueries(analyze_internal).data()
     }
 
     fn name(&self) -> &str {
@@ -78,7 +77,7 @@ fn analyze_internal(plan: LogicalPlan) -> 
Result<Transformed<LogicalPlan>> {
     let name_preserver = NamePreserver::new(&plan);
     plan.map_expressions(|expr| {
         let original_name = name_preserver.save(&expr)?;
-        let transformed_expr = expr.transform_up(&|expr| match expr {
+        let transformed_expr = expr.transform_up(|expr| match expr {
             Expr::WindowFunction(mut window_function)
                 if is_count_star_window_aggregate(&window_function) =>
             {
diff --git a/datafusion/optimizer/src/analyzer/function_rewrite.rs 
b/datafusion/optimizer/src/analyzer/function_rewrite.rs
index 4dd3222a32..098c934bf7 100644
--- a/datafusion/optimizer/src/analyzer/function_rewrite.rs
+++ b/datafusion/optimizer/src/analyzer/function_rewrite.rs
@@ -64,7 +64,7 @@ impl ApplyFunctionRewrites {
             let original_name = name_preserver.save(&expr)?;
 
             // recursively transform the expression, applying the rewrites at 
each step
-            let transformed_expr = expr.transform_up(&|expr| {
+            let transformed_expr = expr.transform_up(|expr| {
                 let mut result = Transformed::no(expr);
                 for rewriter in self.function_rewrites.iter() {
                     result = result.transform_data(|expr| {
@@ -85,7 +85,7 @@ impl AnalyzerRule for ApplyFunctionRewrites {
     }
 
     fn analyze(&self, plan: LogicalPlan, options: &ConfigOptions) -> 
Result<LogicalPlan> {
-        plan.transform_up_with_subqueries(&|plan| self.rewrite_plan(plan, 
options))
+        plan.transform_up_with_subqueries(|plan| self.rewrite_plan(plan, 
options))
             .map(|res| res.data)
     }
 }
diff --git a/datafusion/optimizer/src/analyzer/inline_table_scan.rs 
b/datafusion/optimizer/src/analyzer/inline_table_scan.rs
index cc5f870a9c..db1ce18e86 100644
--- a/datafusion/optimizer/src/analyzer/inline_table_scan.rs
+++ b/datafusion/optimizer/src/analyzer/inline_table_scan.rs
@@ -38,7 +38,7 @@ impl InlineTableScan {
 
 impl AnalyzerRule for InlineTableScan {
     fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> 
Result<LogicalPlan> {
-        plan.transform_up(&analyze_internal).data()
+        plan.transform_up(analyze_internal).data()
     }
 
     fn name(&self) -> &str {
@@ -49,7 +49,7 @@ impl AnalyzerRule for InlineTableScan {
 fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
     // rewrite any subqueries in the plan first
     let transformed_plan =
-        plan.map_subqueries(|plan| plan.transform_up(&analyze_internal))?;
+        plan.map_subqueries(|plan| plan.transform_up(analyze_internal))?;
 
     let transformed_plan = transformed_plan.transform_data(|plan| {
         match plan {
diff --git a/datafusion/optimizer/src/analyzer/mod.rs 
b/datafusion/optimizer/src/analyzer/mod.rs
index d0b83d2429..fb0eb14da6 100644
--- a/datafusion/optimizer/src/analyzer/mod.rs
+++ b/datafusion/optimizer/src/analyzer/mod.rs
@@ -155,10 +155,10 @@ impl Analyzer {
 
 /// Do necessary check and fail the invalid plan
 fn check_plan(plan: &LogicalPlan) -> Result<()> {
-    plan.apply_with_subqueries(&mut |plan: &LogicalPlan| {
+    plan.apply_with_subqueries(|plan: &LogicalPlan| {
         plan.apply_expressions(|expr| {
             // recursively look for subqueries
-            expr.apply(&mut |expr| {
+            expr.apply(|expr| {
                 match expr {
                     Expr::Exists(Exists { subquery, .. })
                     | Expr::InSubquery(InSubquery { subquery, .. })
diff --git a/datafusion/optimizer/src/analyzer/subquery.rs 
b/datafusion/optimizer/src/analyzer/subquery.rs
index 002885266e..b46516017a 100644
--- a/datafusion/optimizer/src/analyzer/subquery.rs
+++ b/datafusion/optimizer/src/analyzer/subquery.rs
@@ -276,7 +276,7 @@ fn strip_inner_query(inner_plan: &LogicalPlan) -> 
&LogicalPlan {
 
 fn get_correlated_expressions(inner_plan: &LogicalPlan) -> Result<Vec<Expr>> {
     let mut exprs = vec![];
-    inner_plan.apply_with_subqueries(&mut |plan| {
+    inner_plan.apply_with_subqueries(|plan| {
         if let LogicalPlan::Filter(Filter { predicate, .. }) = plan {
             let (correlated, _): (Vec<_>, Vec<_>) = 
split_conjunction(predicate)
                 .into_iter()
diff --git a/datafusion/optimizer/src/decorrelate.rs 
b/datafusion/optimizer/src/decorrelate.rs
index 7eda45fb56..a6abec9efd 100644
--- a/datafusion/optimizer/src/decorrelate.rs
+++ b/datafusion/optimizer/src/decorrelate.rs
@@ -381,7 +381,7 @@ fn agg_exprs_evaluation_result_on_empty_batch(
     for e in agg_expr.iter() {
         let result_expr = e
             .clone()
-            .transform_up(&|expr| {
+            .transform_up(|expr| {
                 let new_expr = match expr {
                     Expr::AggregateFunction(expr::AggregateFunction {
                         func_def, ..
@@ -429,7 +429,7 @@ fn proj_exprs_evaluation_result_on_empty_batch(
     for expr in proj_expr.iter() {
         let result_expr = expr
             .clone()
-            .transform_up(&|expr| {
+            .transform_up(|expr| {
                 if let Expr::Column(Column { name, .. }) = &expr {
                     if let Some(result_expr) =
                         input_expr_result_map_for_count_bug.get(name)
@@ -468,7 +468,7 @@ fn filter_exprs_evaluation_result_on_empty_batch(
 ) -> Result<Option<Expr>> {
     let result_expr = filter_expr
         .clone()
-        .transform_up(&|expr| {
+        .transform_up(|expr| {
             if let Expr::Column(Column { name, .. }) = &expr {
                 if let Some(result_expr) = 
input_expr_result_map_for_count_bug.get(name) {
                     Ok(Transformed::yes(result_expr.clone()))
diff --git a/datafusion/optimizer/src/optimize_projections.rs 
b/datafusion/optimizer/src/optimize_projections.rs
index c49095c4a3..70ffd8f244 100644
--- a/datafusion/optimizer/src/optimize_projections.rs
+++ b/datafusion/optimizer/src/optimize_projections.rs
@@ -613,7 +613,7 @@ fn rewrite_expr(expr: &Expr, input: &Projection) -> 
Result<Option<Expr>> {
 ///   columns are collected.
 fn outer_columns(expr: &Expr, columns: &mut HashSet<Column>) {
     // inspect_expr_pre doesn't handle subquery references, so find them 
explicitly
-    expr.apply(&mut |expr| {
+    expr.apply(|expr| {
         match expr {
             Expr::OuterReferenceColumn(_, col) => {
                 columns.insert(col.clone());
diff --git a/datafusion/optimizer/src/plan_signature.rs 
b/datafusion/optimizer/src/plan_signature.rs
index a8e323ff42..d642e2c26e 100644
--- a/datafusion/optimizer/src/plan_signature.rs
+++ b/datafusion/optimizer/src/plan_signature.rs
@@ -73,7 +73,7 @@ impl LogicalPlanSignature {
 /// Get total number of [`LogicalPlan`]s in the plan.
 fn get_node_number(plan: &LogicalPlan) -> NonZeroUsize {
     let mut node_number = 0;
-    plan.apply_with_subqueries(&mut |_plan| {
+    plan.apply_with_subqueries(|_plan| {
         node_number += 1;
         Ok(TreeNodeRecursion::Continue)
     })
diff --git a/datafusion/optimizer/src/push_down_filter.rs 
b/datafusion/optimizer/src/push_down_filter.rs
index 2b123e3559..950932f479 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -218,7 +218,7 @@ fn can_pushdown_join_predicate(predicate: &Expr, schema: 
&DFSchema) -> Result<bo
 // Determine whether the predicate can evaluate as the join conditions
 fn can_evaluate_as_join_condition(predicate: &Expr) -> Result<bool> {
     let mut is_evaluate = true;
-    predicate.apply(&mut |expr| match expr {
+    predicate.apply(|expr| match expr {
         Expr::Column(_)
         | Expr::Literal(_)
         | Expr::Placeholder(_)
@@ -993,7 +993,7 @@ pub fn replace_cols_by_name(
     e: Expr,
     replace_map: &HashMap<String, Expr>,
 ) -> Result<Expr> {
-    e.transform_up(&|expr| {
+    e.transform_up(|expr| {
         Ok(if let Expr::Column(c) = &expr {
             match replace_map.get(&c.flat_name()) {
                 Some(new_c) => Transformed::yes(new_c.clone()),
@@ -1009,7 +1009,7 @@ pub fn replace_cols_by_name(
 /// check whether the expression uses the columns in `check_map`.
 fn contain(e: &Expr, check_map: &HashMap<String, Expr>) -> bool {
     let mut is_contain = false;
-    e.apply(&mut |expr| {
+    e.apply(|expr| {
         Ok(if let Expr::Column(c) = &expr {
             match check_map.get(&c.flat_name()) {
                 Some(_) => {
diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs 
b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index a8999f9c1d..f9f602297f 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -95,7 +95,7 @@ impl OptimizerRule for ScalarSubqueryToJoin {
                         if !expr_check_map.is_empty() {
                             rewrite_expr = rewrite_expr
                                 .clone()
-                                .transform_up(&|expr| {
+                                .transform_up(|expr| {
                                     if let Expr::Column(col) = &expr {
                                         if let Some(map_expr) =
                                             expr_check_map.get(&col.name)
@@ -152,7 +152,7 @@ impl OptimizerRule for ScalarSubqueryToJoin {
                                 {
                                     let new_expr = rewrite_expr
                                         .clone()
-                                        .transform_up(&|expr| {
+                                        .transform_up(|expr| {
                                             if let Expr::Column(col) = &expr {
                                                 if let Some(map_expr) =
                                                     
expr_check_map.get(&col.name)
diff --git a/datafusion/optimizer/src/utils.rs 
b/datafusion/optimizer/src/utils.rs
index 0e89e2452a..aad89be7db 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -100,7 +100,7 @@ pub fn log_plan(description: &str, plan: &LogicalPlan) {
 /// check whether the expression is volatile predicates
 pub(crate) fn is_volatile_expression(e: &Expr) -> Result<bool> {
     let mut is_volatile_expr = false;
-    e.apply(&mut |expr| {
+    e.apply(|expr| {
         Ok(if is_volatile(expr)? {
             is_volatile_expr = true;
             TreeNodeRecursion::Stop
diff --git a/datafusion/physical-expr/src/equivalence/class.rs 
b/datafusion/physical-expr/src/equivalence/class.rs
index 6d7d8bf3cc..8a3f8b8280 100644
--- a/datafusion/physical-expr/src/equivalence/class.rs
+++ b/datafusion/physical-expr/src/equivalence/class.rs
@@ -262,7 +262,7 @@ impl EquivalenceGroup {
     /// class it matches with (if any).
     pub fn normalize_expr(&self, expr: Arc<dyn PhysicalExpr>) -> Arc<dyn 
PhysicalExpr> {
         expr.clone()
-            .transform(&|expr| {
+            .transform(|expr| {
                 for cls in self.iter() {
                     if cls.contains(&expr) {
                         return 
Ok(Transformed::yes(cls.canonical_expr().unwrap()));
@@ -452,7 +452,7 @@ impl EquivalenceGroup {
                         // Rewrite rhs to point to the right side of the join:
                         let new_rhs = rhs
                             .clone()
-                            .transform(&|expr| {
+                            .transform(|expr| {
                                 if let Some(column) =
                                     expr.as_any().downcast_ref::<Column>()
                                 {
diff --git a/datafusion/physical-expr/src/equivalence/mod.rs 
b/datafusion/physical-expr/src/equivalence/mod.rs
index fd8123c45b..f78d69d672 100644
--- a/datafusion/physical-expr/src/equivalence/mod.rs
+++ b/datafusion/physical-expr/src/equivalence/mod.rs
@@ -97,7 +97,7 @@ pub fn add_offset_to_expr(
     expr: Arc<dyn PhysicalExpr>,
     offset: usize,
 ) -> Arc<dyn PhysicalExpr> {
-    expr.transform_down(&|e| match e.as_any().downcast_ref::<Column>() {
+    expr.transform_down(|e| match e.as_any().downcast_ref::<Column>() {
         Some(col) => Ok(Transformed::yes(Arc::new(Column::new(
             col.name(),
             offset + col.index(),
diff --git a/datafusion/physical-expr/src/equivalence/projection.rs 
b/datafusion/physical-expr/src/equivalence/projection.rs
index 92772e4623..8c747ab8a2 100644
--- a/datafusion/physical-expr/src/equivalence/projection.rs
+++ b/datafusion/physical-expr/src/equivalence/projection.rs
@@ -59,7 +59,7 @@ impl ProjectionMapping {
                 let target_expr = Arc::new(Column::new(name, expr_idx)) as _;
                 expression
                     .clone()
-                    .transform_down(&|e| match 
e.as_any().downcast_ref::<Column>() {
+                    .transform_down(|e| match 
e.as_any().downcast_ref::<Column>() {
                         Some(col) => {
                             // Sometimes, an expression and its name in the 
input_schema
                             // doesn't match. This can cause problems, so we 
make sure
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs 
b/datafusion/physical-expr/src/equivalence/properties.rs
index c8a087db20..1036779c14 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -857,7 +857,7 @@ impl EquivalenceProperties {
     /// the given expression.
     pub fn get_expr_ordering(&self, expr: Arc<dyn PhysicalExpr>) -> 
ExprOrdering {
         ExprOrdering::new_default(expr.clone())
-            .transform_up(&|expr| Ok(update_ordering(expr, self)))
+            .transform_up(|expr| Ok(update_ordering(expr, self)))
             .data()
             // Guaranteed to always return `Ok`.
             .unwrap()
diff --git a/datafusion/physical-expr/src/expressions/case.rs 
b/datafusion/physical-expr/src/expressions/case.rs
index 609349509b..e376d3e7bb 100644
--- a/datafusion/physical-expr/src/expressions/case.rs
+++ b/datafusion/physical-expr/src/expressions/case.rs
@@ -958,7 +958,7 @@ mod tests {
 
         let expr2 = expr
             .clone()
-            .transform(&|e| {
+            .transform(|e| {
                 let transformed =
                     match 
e.as_any().downcast_ref::<crate::expressions::Literal>() {
                         Some(lit_value) => match lit_value.value() {
@@ -980,7 +980,7 @@ mod tests {
 
         let expr3 = expr
             .clone()
-            .transform_down(&|e| {
+            .transform_down(|e| {
                 let transformed =
                     match 
e.as_any().downcast_ref::<crate::expressions::Literal>() {
                         Some(lit_value) => match lit_value.value() {
diff --git a/datafusion/physical-expr/src/utils/mod.rs 
b/datafusion/physical-expr/src/utils/mod.rs
index a0d6436586..2232f6de44 100644
--- a/datafusion/physical-expr/src/utils/mod.rs
+++ b/datafusion/physical-expr/src/utils/mod.rs
@@ -194,9 +194,7 @@ where
         constructor,
     };
     // Use the builder to transform the expression tree node into a DAG.
-    let root = init
-        .transform_up_mut(&mut |node| builder.mutate(node))
-        .data()?;
+    let root = init.transform_up(|node| builder.mutate(node)).data()?;
     // Return a tuple containing the root node index and the DAG.
     Ok((root.data.unwrap(), builder.graph))
 }
@@ -204,7 +202,7 @@ where
 /// Recursively extract referenced [`Column`]s within a [`PhysicalExpr`].
 pub fn collect_columns(expr: &Arc<dyn PhysicalExpr>) -> HashSet<Column> {
     let mut columns = HashSet::<Column>::new();
-    expr.apply(&mut |expr| {
+    expr.apply(|expr| {
         if let Some(column) = expr.as_any().downcast_ref::<Column>() {
             if !columns.iter().any(|c| c.eq(column)) {
                 columns.insert(column.clone());
@@ -224,7 +222,7 @@ pub fn reassign_predicate_columns(
     schema: &SchemaRef,
     ignore_not_found: bool,
 ) -> Result<Arc<dyn PhysicalExpr>> {
-    pred.transform_down(&|expr| {
+    pred.transform_down(|expr| {
         let expr_any = expr.as_any();
 
         if let Some(column) = expr_any.downcast_ref::<Column>() {
diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs 
b/datafusion/physical-plan/src/joins/stream_join_utils.rs
index 9824c723d9..ef3fda3737 100644
--- a/datafusion/physical-plan/src/joins/stream_join_utils.rs
+++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs
@@ -285,7 +285,7 @@ pub fn convert_sort_expr_with_filter_schema(
         // Since we are sure that one to one column mapping includes all 
columns, we convert
         // the sort expression into a filter expression.
         let converted_filter_expr = expr
-            .transform_up(&|p| {
+            .transform_up(|p| {
                 convert_filter_columns(p.as_ref(), 
&column_map).map(|transformed| {
                     match transformed {
                         Some(transformed) => Transformed::yes(transformed),
diff --git a/datafusion/physical-plan/src/joins/utils.rs 
b/datafusion/physical-plan/src/joins/utils.rs
index a3d20b97d1..acf9ed4d7e 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -478,7 +478,7 @@ fn replace_on_columns_of_right_ordering(
             let new_expr = item
                 .expr
                 .clone()
-                .transform(&|e| {
+                .transform(|e| {
                     if e.eq(right_col) {
                         Ok(Transformed::yes(left_col.clone()))
                     } else {
diff --git a/datafusion/physical-plan/src/recursive_query.rs 
b/datafusion/physical-plan/src/recursive_query.rs
index ba7d1a5454..ed897d78f0 100644
--- a/datafusion/physical-plan/src/recursive_query.rs
+++ b/datafusion/physical-plan/src/recursive_query.rs
@@ -325,7 +325,7 @@ fn assign_work_table(
     work_table: Arc<WorkTable>,
 ) -> Result<Arc<dyn ExecutionPlan>> {
     let mut work_table_refs = 0;
-    plan.transform_down_mut(&mut |plan| {
+    plan.transform_down(|plan| {
         if let Some(exec) = plan.as_any().downcast_ref::<WorkTableExec>() {
             if work_table_refs > 0 {
                 not_impl_err!(
@@ -353,7 +353,7 @@ fn assign_work_table(
 /// However, if the data of the left table is derived from the work table, it 
will become outdated
 /// as the work table changes. When the next iteration executes this plan 
again, we must clear the left table.
 fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn 
ExecutionPlan>> {
-    plan.transform_up(&|plan| {
+    plan.transform_up(|plan| {
         // WorkTableExec's states have already been updated correctly.
         if plan.as_any().is::<WorkTableExec>() {
             Ok(Transformed::no(plan))
diff --git a/datafusion/sql/src/cte.rs b/datafusion/sql/src/cte.rs
index 5b1f81e820..4f7b9bb6d1 100644
--- a/datafusion/sql/src/cte.rs
+++ b/datafusion/sql/src/cte.rs
@@ -197,7 +197,7 @@ fn has_work_table_reference(
     work_table_source: &Arc<dyn TableSource>,
 ) -> bool {
     let mut has_reference = false;
-    plan.apply(&mut |node| {
+    plan.apply(|node| {
         if let LogicalPlan::TableScan(scan) = node {
             if Arc::ptr_eq(&scan.source, work_table_source) {
                 has_reference = true;
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index 78c01e8e28..fdcef0ef6a 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -293,7 +293,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     data: transformed_expr,
                     transformed,
                     tnr: _,
-                } = expr.transform_up_mut(&mut |expr: Expr| {
+                } = expr.transform_up(|expr: Expr| {
                     if let Expr::Unnest(Unnest { expr: ref arg }) = expr {
                         let column_name = expr.display_name()?;
                         unnest_columns.push(column_name.clone());
diff --git a/datafusion/sql/src/unparser/utils.rs 
b/datafusion/sql/src/unparser/utils.rs
index 9d098c4945..c1b02c330f 100644
--- a/datafusion/sql/src/unparser/utils.rs
+++ b/datafusion/sql/src/unparser/utils.rs
@@ -60,7 +60,7 @@ pub(crate) fn find_agg_node_within_select(
 /// into an actual aggregate expression COUNT(*) as identified in the 
aggregate node.
 pub(crate) fn unproject_agg_exprs(expr: &Expr, agg: &Aggregate) -> 
Result<Expr> {
     expr.clone()
-        .transform(&|sub_expr| {
+        .transform(|sub_expr| {
             if let Expr::Column(c) = sub_expr {
                 // find the column in the agg schmea
                 if let Ok(n) = agg.schema.index_of_column(&c) {
diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs
index 206c933a4f..2c50d3af1f 100644
--- a/datafusion/sql/src/utils.rs
+++ b/datafusion/sql/src/utils.rs
@@ -34,7 +34,7 @@ use sqlparser::ast::Ident;
 /// Make a best-effort attempt at resolving all columns in the expression tree
 pub(crate) fn resolve_columns(expr: &Expr, plan: &LogicalPlan) -> Result<Expr> 
{
     expr.clone()
-        .transform_up(&|nested_expr| {
+        .transform_up(|nested_expr| {
             match nested_expr {
                 Expr::Column(col) => {
                     let (qualifier, field) =
@@ -72,7 +72,7 @@ pub(crate) fn rebase_expr(
     plan: &LogicalPlan,
 ) -> Result<Expr> {
     expr.clone()
-        .transform_down(&|nested_expr| {
+        .transform_down(|nested_expr| {
             if base_exprs.contains(&nested_expr) {
                 Ok(Transformed::yes(expr_as_column_expr(&nested_expr, plan)?))
             } else {
@@ -178,7 +178,7 @@ pub(crate) fn resolve_aliases_to_exprs(
     aliases: &HashMap<String, Expr>,
 ) -> Result<Expr> {
     expr.clone()
-        .transform_up(&|nested_expr| match nested_expr {
+        .transform_up(|nested_expr| match nested_expr {
             Expr::Column(c) if c.relation.is_none() => {
                 if let Some(aliased_expr) = aliases.get(&c.name) {
                     Ok(Transformed::yes(aliased_expr.clone()))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to