alamb commented on code in PR #9948:
URL: https://github.com/apache/arrow-datafusion/pull/9948#discussion_r1551676725


##########
datafusion-examples/examples/rewrite_expr.rs:
##########
@@ -59,7 +59,7 @@ pub fn main() -> Result<()> {
 
     // then run the optimizer with our custom rule
     let optimizer = Optimizer::with_rules(vec![Arc::new(MyOptimizerRule {})]);
-    let optimized_plan = optimizer.optimize(&analyzed_plan, &config, observe)?;
+    let optimized_plan = optimizer.optimize(analyzed_plan, &config, observe)?;

Review Comment:
   This illustrates the API change -- the optimizer now takes an owned plan 
rather than a reference



##########
datafusion/core/tests/optimizer_integration.rs:
##########
@@ -110,7 +110,7 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
     let optimizer = Optimizer::new();
     // analyze and optimize the logical plan
     let plan = analyzer.execute_and_check(&plan, config.options(), |_, _| {})?;
-    optimizer.optimize(&plan, &config, |_, _| {})
+    optimizer.optimize(plan, &config, |_, _| {})

Review Comment:
   A large amount of this PR is changes to test to pass in an owned plan



##########
datafusion/optimizer/src/optimizer.rs:
##########
@@ -506,21 +498,27 @@ mod tests {
             produce_one_row: false,
             schema: Arc::new(DFSchema::empty()),
         });
-        let err = opt.optimize(&plan, &config, &observe).unwrap_err();
+        let err = opt.optimize(plan, &config, &observe).unwrap_err();
         assert_eq!(
-            "Optimizer rule 'get table_scan rule' failed\ncaused by\nget 
table_scan rule\ncaused by\n\
-            Internal error: Failed due to a difference in schemas, original 
schema: \
-            DFSchema { inner: Schema { fields: \
-                [Field { name: \"a\", data_type: UInt32, nullable: false, 
dict_id: 0, dict_is_ordered: false, metadata: {} }, \
-                Field { name: \"b\", data_type: UInt32, nullable: false, 
dict_id: 0, dict_is_ordered: false, metadata: {} }, \
-                Field { name: \"c\", data_type: UInt32, nullable: false, 
dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, \
-                field_qualifiers: [Some(Bare { table: \"test\" }), Some(Bare { 
table: \"test\" }), Some(Bare { table: \"test\" })], \
-                functional_dependencies: FunctionalDependencies { deps: [] } 
}, \
+            "Optimizer rule 'get table_scan rule' failed\n\

Review Comment:
   The original error actually is incorrect that it reports the reversed 
schemas (the "new schema" was actually the original schema)



##########
datafusion/optimizer/src/optimizer.rs:
##########
@@ -274,22 +248,82 @@ impl Optimizer {
     pub fn with_rules(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> 
Self {
         Self { rules }
     }
+}
+
+/// Rewrites LogicalPlan nodes
+struct Rewriter<'a> {
+    apply_order: ApplyOrder,
+    rule: &'a dyn OptimizerRule,
+    config: &'a dyn OptimizerConfig,
+}
+
+impl<'a> Rewriter<'a> {
+    fn new(
+        apply_order: ApplyOrder,
+        rule: &'a dyn OptimizerRule,
+        config: &'a dyn OptimizerConfig,
+    ) -> Self {
+        Self {
+            apply_order,
+            rule,
+            config,
+        }
+    }
+}
+
+impl<'a> TreeNodeRewriter for Rewriter<'a> {
+    type Node = LogicalPlan;
+
+    fn f_down(&mut self, node: LogicalPlan) -> 
Result<Transformed<LogicalPlan>> {
+        if self.apply_order == ApplyOrder::TopDown {
+            optimize_plan_node(node, self.rule, self.config)
+        } else {
+            Ok(Transformed::no(node))
+        }
+    }
 
+    fn f_up(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
+        if self.apply_order == ApplyOrder::BottomUp {
+            optimize_plan_node(node, self.rule, self.config)
+        } else {
+            Ok(Transformed::no(node))
+        }
+    }
+}
+
+/// Invokes the Optimizer rule to rewrite the LogicalPlan in place.
+fn optimize_plan_node(
+    plan: LogicalPlan,
+    rule: &dyn OptimizerRule,
+    config: &dyn OptimizerConfig,
+) -> Result<Transformed<LogicalPlan>> {
+    // TODO: introduce a better API to OptimizerRule to allow rewriting by 
ownership

Review Comment:
   Each rule still requires the plan to be copied, which is not ideal. I plan 
to add a better API to avoid this



##########
datafusion/optimizer/src/optimizer.rs:
##########
@@ -299,44 +333,77 @@ impl Optimizer {
             log_plan(&format!("Optimizer input (pass {i})"), &new_plan);
 
             for rule in &self.rules {
-                let result =
-                    self.optimize_recursively(rule, &new_plan, config)
-                        .and_then(|plan| {
-                            if let Some(plan) = &plan {
-                                assert_schema_is_the_same(rule.name(), plan, 
&new_plan)?;
-                            }
-                            Ok(plan)
-                        });
-                match result {
-                    Ok(Some(plan)) => {
-                        new_plan = plan;
-                        observer(&new_plan, rule.as_ref());
-                        log_plan(rule.name(), &new_plan);
+                // If we need to skip failed rules, must copy plan before 
attempting to rewrite

Review Comment:
   this change ensures we only copy the plan if we need it to remain unchanged



##########
datafusion/optimizer/src/optimizer.rs:
##########
@@ -356,97 +423,22 @@ impl Optimizer {
         debug!("Optimizer took {} ms", start_time.elapsed().as_millis());
         Ok(new_plan)
     }
-
-    fn optimize_node(

Review Comment:
   this code handles recursion and is redundant with the tree node api



##########
datafusion/optimizer/src/eliminate_limit.rs:
##########
@@ -93,24 +93,19 @@ mod tests {
 
     use crate::push_down_limit::PushDownLimit;
 
-    fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> 
Result<()> {
+    fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
+    fn assert_optimized_plan_eq(plan: LogicalPlan, expected: &str) -> 
Result<()> {
         let optimizer = 
Optimizer::with_rules(vec![Arc::new(EliminateLimit::new())]);
-        let optimized_plan = optimizer
-            .optimize_recursively(
-                optimizer.rules.first().unwrap(),
-                plan,
-                &OptimizerContext::new(),
-            )?
-            .unwrap_or_else(|| plan.clone());
+        let optimized_plan =
+            optimizer.optimize(plan, &OptimizerContext::new(), observe)?;
 
         let formatted_plan = format!("{optimized_plan:?}");
         assert_eq!(formatted_plan, expected);
-        assert_eq!(plan.schema(), optimized_plan.schema());

Review Comment:
   I changed the tests to call `Optimizer::optimize` directly, which already 
checks the schema doesn't change, so this test is redundant



##########
datafusion/optimizer/src/optimizer.rs:
##########
@@ -184,39 +185,12 @@ pub struct Optimizer {
     pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
 }
 
-/// If a rule is with `ApplyOrder`, it means the optimizer will derive to 
handle children instead of
-/// recursively handling in rule.
-/// We just need handle a subtree pattern itself.
+/// Specifies how recursion for an `OptimizerRule` should be handled.
 ///
-/// Notice: **sometime** result after optimize still can be optimized, we need 
apply again.

Review Comment:
   I do not think this comment is applicable anymore -- the optimizer handles 
the recursion internally as well as applying multiple optimizer passes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to