alamb commented on code in PR #20239:
URL: https://github.com/apache/datafusion/pull/20239#discussion_r2783367927


##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -1831,4 +1843,89 @@ mod test {
             panic!("dummy - not implemented")
         }
     }
+
+    /// A UDF that returns `MoveTowardsLeafNodes` placement, simulating cheap
+    /// expressions like `get_field` (struct field access).
+    #[derive(Debug, PartialEq, Eq, Hash)]
+    struct LeafUDF {
+        signature: Signature,
+    }
+
+    impl ScalarUDFImpl for LeafUDF {
+        fn as_any(&self) -> &dyn Any {
+            self
+        }
+        fn name(&self) -> &str {
+            "leaf_udf"
+        }
+        fn signature(&self) -> &Signature {
+            &self.signature
+        }
+        fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+            Ok(DataType::UInt32)
+        }
+        fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+            panic!("dummy - not implemented")
+        }
+        fn placement(&self, _args: &[ExpressionPlacement]) -> 
ExpressionPlacement {
+            ExpressionPlacement::MoveTowardsLeafNodes
+        }
+    }
+
+    fn leaf_udf_expr(arg: Expr) -> Expr {
+        let udf = ScalarUDF::new_from_impl(LeafUDF {
+            signature: Signature::exact(vec![DataType::UInt32], 
Volatility::Immutable),
+        });
+        Expr::ScalarFunction(ScalarFunction::new_udf(Arc::new(udf), vec![arg]))
+    }
+
+    /// Identical MoveTowardsLeafNodes expressions should NOT be deduplicated
+    /// by CSE — they are cheap (e.g. struct field access) and the extraction
+    /// rules deliberately duplicate them. Deduplicating causes optimizer

Review Comment:
   What does "deduplicating causes optimizer instability" mean? Maybe we could 
make that more specific



##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -1310,10 +1311,19 @@ fn rewrite_projection(
         })
         .partition(|(_, value)| value.is_volatile());
 
+    // Partition non-volatile expressions by MoveTowardsLeafNodes vs 
everything else.
+    // MoveTowardsLeafNodes expressions (like get_field) are cheap — no 
benefit to
+    // re-inlining them into filters, and it causes optimizer instability with
+    // ExtractLeafExpressions.
+    let (move_towards_leaves_map, pushable_map): (HashMap<_, _>, HashMap<_, 
_>) =

Review Comment:
   Is there any reason to have two separate hash maps? It seems like a single 
one would work (and maybe rename  from `volatile_map` to `no_move_map` or 
something ?)



##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -4221,4 +4234,156 @@ mod tests {
         "
         )
     }
+
+    /// A mock UDF that reports MoveTowardsLeafNodes placement (like 
get_field).
+    #[derive(Debug, PartialEq, Eq, Hash)]
+    struct LeafUDF {
+        signature: Signature,
+    }
+
+    impl ScalarUDFImpl for LeafUDF {
+        fn as_any(&self) -> &dyn Any {
+            self
+        }
+        fn name(&self) -> &str {
+            "leaf_udf"
+        }
+        fn signature(&self) -> &Signature {
+            &self.signature
+        }
+        fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+            Ok(DataType::UInt32)
+        }
+        fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+            Ok(ColumnarValue::Scalar(ScalarValue::UInt32(Some(1))))
+        }
+        fn placement(&self, _args: &[ExpressionPlacement]) -> 
ExpressionPlacement {
+            ExpressionPlacement::MoveTowardsLeafNodes
+        }
+    }
+
+    /// Helper to create a leaf_udf(col) expression.
+    fn leaf_udf_expr(arg: Expr) -> Expr {
+        let udf = ScalarUDF::new_from_impl(LeafUDF {
+            signature: Signature::exact(vec![DataType::UInt32], 
Volatility::Immutable),
+        });
+        Expr::ScalarFunction(ScalarFunction::new_udf(Arc::new(udf), vec![arg]))
+    }
+
+    /// Test that filters are NOT pushed through MoveTowardsLeafNodes 
projections.
+    /// These are cheap expressions (like get_field) where re-inlining into a 
filter
+    /// has no benefit and causes optimizer instability with 
ExtractLeafExpressions.

Review Comment:
   same question here about what instablity is caused



##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -4221,4 +4234,156 @@ mod tests {
         "
         )
     }
+
+    /// A mock UDF that reports MoveTowardsLeafNodes placement (like 
get_field).
+    #[derive(Debug, PartialEq, Eq, Hash)]
+    struct LeafUDF {
+        signature: Signature,
+    }
+
+    impl ScalarUDFImpl for LeafUDF {
+        fn as_any(&self) -> &dyn Any {
+            self
+        }
+        fn name(&self) -> &str {
+            "leaf_udf"
+        }
+        fn signature(&self) -> &Signature {
+            &self.signature
+        }
+        fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+            Ok(DataType::UInt32)
+        }
+        fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+            Ok(ColumnarValue::Scalar(ScalarValue::UInt32(Some(1))))
+        }
+        fn placement(&self, _args: &[ExpressionPlacement]) -> 
ExpressionPlacement {
+            ExpressionPlacement::MoveTowardsLeafNodes
+        }
+    }
+
+    /// Helper to create a leaf_udf(col) expression.
+    fn leaf_udf_expr(arg: Expr) -> Expr {
+        let udf = ScalarUDF::new_from_impl(LeafUDF {
+            signature: Signature::exact(vec![DataType::UInt32], 
Volatility::Immutable),
+        });
+        Expr::ScalarFunction(ScalarFunction::new_udf(Arc::new(udf), vec![arg]))
+    }
+
+    /// Test that filters are NOT pushed through MoveTowardsLeafNodes 
projections.
+    /// These are cheap expressions (like get_field) where re-inlining into a 
filter
+    /// has no benefit and causes optimizer instability with 
ExtractLeafExpressions.
+    #[test]
+    fn filter_not_pushed_through_move_towards_leaves_projection() -> 
Result<()> {
+        let table_scan = test_table_scan()?;
+
+        // Create a projection with a MoveTowardsLeafNodes expression
+        let proj = LogicalPlanBuilder::from(table_scan)
+            .project(vec![
+                leaf_udf_expr(col("a")).alias("val"),
+                col("b"),
+                col("c"),
+            ])?
+            .build()?;
+
+        // Put a filter on the MoveTowardsLeafNodes column
+        let plan = LogicalPlanBuilder::from(proj)
+            .filter(col("val").gt(lit(150i64)))?
+            .build()?;
+
+        // Filter should NOT be pushed through — val maps to a 
MoveTowardsLeafNodes expr
+        assert_optimized_plan_equal!(
+            plan,
+            @r"
+        Filter: val > Int64(150)
+          Projection: leaf_udf(test.a) AS val, test.b, test.c
+            TableScan: test
+        "
+        )
+    }
+
+    /// Test that filters ARE pushed through regular (Column/KeepInPlace) 
projections.

Review Comment:
   this is likely already tested by other tests
   
   One way you maybe could make it clear this is the same as above, only 
difference is the expression, might be with a builder type API
   
   Something like
   
   ```rust
   let udf_expr = leaf_udf()
     .with_placement(ExpressionPlacement::KeepInPlace) // <----- Builder API to 
manually control udf)
     .call(vec![col("a")])
     .alias("val"),
   ```



##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -4221,4 +4234,156 @@ mod tests {
         "
         )
     }
+
+    /// A mock UDF that reports MoveTowardsLeafNodes placement (like 
get_field).
+    #[derive(Debug, PartialEq, Eq, Hash)]
+    struct LeafUDF {
+        signature: Signature,
+    }
+
+    impl ScalarUDFImpl for LeafUDF {
+        fn as_any(&self) -> &dyn Any {
+            self
+        }
+        fn name(&self) -> &str {
+            "leaf_udf"
+        }
+        fn signature(&self) -> &Signature {
+            &self.signature
+        }
+        fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+            Ok(DataType::UInt32)
+        }
+        fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+            Ok(ColumnarValue::Scalar(ScalarValue::UInt32(Some(1))))
+        }
+        fn placement(&self, _args: &[ExpressionPlacement]) -> 
ExpressionPlacement {
+            ExpressionPlacement::MoveTowardsLeafNodes
+        }
+    }
+
+    /// Helper to create a leaf_udf(col) expression.
+    fn leaf_udf_expr(arg: Expr) -> Expr {
+        let udf = ScalarUDF::new_from_impl(LeafUDF {
+            signature: Signature::exact(vec![DataType::UInt32], 
Volatility::Immutable),
+        });
+        Expr::ScalarFunction(ScalarFunction::new_udf(Arc::new(udf), vec![arg]))

Review Comment:
   I think you could do this via call 🤔 
   
   ```suggestion
   udf.call(vec![arg])
   ```
   



##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -4221,4 +4234,156 @@ mod tests {
         "
         )
     }
+
+    /// A mock UDF that reports MoveTowardsLeafNodes placement (like 
get_field).
+    #[derive(Debug, PartialEq, Eq, Hash)]

Review Comment:
   This is basically the same as in 
datafusion/optimizer/src/common_subexpr_eliminate.rs
   
   I wonder if it would be better to put them in a common place (both to avoid 
code duplication, and make new tests easier to write)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to