adriangb commented on code in PR #19538:
URL: https://github.com/apache/datafusion/pull/19538#discussion_r2717480648


##########
datafusion/physical-optimizer/src/output_requirements.rs:
##########
@@ -256,18 +256,13 @@ impl ExecutionPlan for OutputRequirementExec {
         &self,
         projection: &ProjectionExec,
     ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
-        // If the projection does not narrow the schema, we should not try to 
push it down:
-        let proj_exprs = projection.expr();
-        if proj_exprs.len() >= projection.input().schema().fields().len() {
-            return Ok(None);
-        }

Review Comment:
   This same code was sort of sprinkled around various expressions. I'm not 
sure how deeply each one was evaluated for what behavior it should have. It 
seems to be that `OutputRequirementExec` is a temporary marker used by the 
physical optimizer and should not have any bearing in where projections are 
placed.



##########
datafusion/physical-optimizer/src/projection_pushdown.rs:
##########
@@ -88,6 +98,267 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
     }
 }
 
+/// Tries to split a projection to extract beneficial sub-expressions for 
pushdown.
+///
+/// This function walks each expression in the projection and extracts 
beneficial
+/// sub-expressions (like `get_field`) from within larger non-beneficial 
expressions.
+/// For example:
+/// - Input: `get_field(col, 'foo') + 1`
+/// - Output: Inner projection: `get_field(col, 'foo') AS __extracted_0`, 
Outer: `__extracted_0 + 1`
+///
+/// This enables the beneficial parts to be pushed down while keeping 
non-beneficial
+/// expressions (like literals and computations) above.
+fn try_split_projection(

Review Comment:
   This is a meaty bit worth reviewing and considering



##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -684,6 +685,31 @@ pub fn all_alias_free_columns(exprs: &[ProjectionExpr]) -> 
bool {
     })
 }
 
+/// Returns true if the projection is beneficial to push through operators 
like Sort/Filter.
+/// This is true when:
+/// - The projection narrows the schema (drops columns) - saves memory, OR
+/// - Any expression is a TrivialExpr (like get_field) - beneficial 
computation pushdown
+///
+/// Pure Column references that don't narrow the schema are NOT beneficial to 
push,
+/// as they just rearrange the plan without any gain.
+///
+/// Note: Projections are split by `try_split_projection` before reaching this 
function,
+/// so if any expression is TrivialExpr, all expressions should be trivial.
+pub fn is_trivial_or_narrows_schema(projection: &ProjectionExec) -> bool {
+    let exprs = projection.expr();
+
+    // Push if projection narrows schema (drops columns) - saves memory
+    let narrows_schema = exprs.len() < 
projection.input().schema().fields().len();

Review Comment:
   Should this also add the condition that all expressions are columns or 
trivial expressions?



##########
datafusion/expr-common/src/triviality.rs:
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Triviality classification for expressions and function arguments.
+
+/// Classification of argument triviality for scalar functions.
+///
+/// This enum is used by [`ScalarUDFImpl::triviality`] to allow
+/// functions to make context-dependent decisions about whether they are
+/// trivial based on the nature of their arguments.
+///
+/// For example, `get_field(struct_col, 'field_name')` is trivial (static field
+/// lookup), but `get_field(struct_col, key_col)` is not (dynamic per-row 
lookup).
+///
+/// [`ScalarUDFImpl::triviality`]: 
https://docs.rs/datafusion-expr/latest/datafusion_expr/trait.ScalarUDFImpl.html#tymethod.triviality
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ArgTriviality {
+    /// Argument is a literal constant value or an expression that can be
+    /// evaluated to a constant at planning time.
+    Literal,
+    /// Argument is a simple column reference.
+    Column,
+    /// Argument is a complex expression that declares itself trivial.
+    /// For example, if `get_field(struct_col, 'field_name')` is implemented 
as a
+    /// trivial expression, then it would return this variant.
+    /// Then `other_trivial_function(get_field(...), 42)` could also be 
classified as
+    /// a trivial expression using the knowledge that `get_field(...)` is 
trivial.
+    TrivialExpr,
+    /// Argument is a complex expression that declares itself non-trivial.
+    /// For example, `min(col1 + col2)` is non-trivial because it requires 
per-row computation.
+    NonTrivial,
+}

Review Comment:
   I think the main thing we need to figure out is if these are the right terms 
/ wording for this system.
   
   I don't love "trivial" but it does work and it is the existing terminology.
   
   I don't like it for 2 reasons:
   1. It's confusing. Claude *constantly* got confused thinking that a column 
or literal expression is trivial.
   2. It's a non specific term. It doesn't imply something concrete.



##########
datafusion/physical-expr/src/projection.rs:
##########
@@ -795,7 +796,7 @@ pub fn update_expr(
     projected_exprs: &[ProjectionExpr],
     sync_with_child: bool,
 ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
-    #[derive(Debug, PartialEq)]
+    #[derive(PartialEq)]

Review Comment:
   Let's try to revert this (keep `Debug` implemented)



##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -957,14 +976,20 @@ fn try_unifying_projections(
             })
             .unwrap();
     });
-    // Merging these projections is not beneficial, e.g
-    // If an expression is not trivial and it is referred more than 1, unifies 
projections will be
-    // beneficial as caching mechanism for non-trivial computations.
-    // See discussion in: https://github.com/apache/datafusion/issues/8296
-    if column_ref_map.iter().any(|(column, count)| {
-        *count > 1 && 
!is_expr_trivial(&Arc::clone(&child.expr()[column.index()].expr))
-    }) {
-        return Ok(None);
+    // Don't merge if:
+    // 1. A non-trivial expression is referenced more than once (caching 
benefit)
+    //    See discussion in: https://github.com/apache/datafusion/issues/8296
+    // 2. The child projection has TrivialExpr (like get_field) that should be 
pushed
+    //    down to the data source separately
+    for (column, count) in column_ref_map.iter() {
+        let triviality = child.expr()[column.index()].expr.triviality();
+        // Don't merge if multi-referenced non-trivial (caching)
+        if (*count > 1 && matches!(triviality, ArgTriviality::NonTrivial))
+            // Don't merge if child has TrivialExpr (should push to source)
+            || matches!(triviality, ArgTriviality::TrivialExpr)
+        {
+            return Ok(None);
+        }

Review Comment:
   This is awkward interaction between optimizer rules. Need to evaluate nad 
see if there's a better way to do this.



##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -255,18 +256,16 @@ impl ExecutionPlan for ProjectionExec {
     }
 
     fn benefits_from_input_partitioning(&self) -> Vec<bool> {
-        let all_simple_exprs =
-            self.projector
-                .projection()
-                .as_ref()
+        // If expressions are all trivial (columns, literals, or field 
accessors),
+        // then all computations in this projection are reorder or rename,
+        // and projection would not benefit from the repartition.
+        // An expression is "cheap" if it's not NonTrivial (i.e., Column, 
Literal, or TrivialExpr).
+        vec![
+            !self
+                .projection_expr()
                 .iter()
-                .all(|proj_expr| {
-                    proj_expr.expr.as_any().is::<Column>()
-                        || proj_expr.expr.as_any().is::<Literal>()
-                });
-        // If expressions are all either column_expr or Literal, then all 
computations in this projection are reorder or rename,
-        // and projection would not benefit from the repartition, 
benefits_from_input_partitioning will return false.
-        vec![!all_simple_exprs]
+                .all(|p| !matches!(p.expr.triviality(), 
ArgTriviality::NonTrivial)),
+        ]

Review Comment:
   This is only called from `RepartitionExec` as far as I can tell. We should 
consider this implementation together with 
https://github.com/apache/datafusion/pull/19538/changes#r2717491453



##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1123,15 +1123,8 @@ impl ExecutionPlan for RepartitionExec {
         &self,
         projection: &ProjectionExec,
     ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
-        // If the projection does not narrow the schema, we should not try to 
push it down.
-        if projection.expr().len() >= 
projection.input().schema().fields().len() {
-            return Ok(None);
-        }
-
         // If pushdown is not beneficial or applicable, break it.
-        if projection.benefits_from_input_partitioning()[0]
-            || !all_columns(projection.expr())
-        {
+        if projection.benefits_from_input_partitioning()[0] {

Review Comment:
   Maybe this should also check `is_trivial_or_narrows_schema`?



##########
datafusion/physical-expr/src/projection.rs:
##########
@@ -806,17 +807,65 @@ pub fn update_expr(
         RewrittenInvalid,
     }
 
+    // Track Arc pointers of columns created by pass 1.
+    // These should not be modified by pass 2.
+    // We use Arc pointer addresses (not name/index) to distinguish 
pass-1-created columns
+    // from original columns that happen to have the same name and index.
+    let mut pass1_created: HashSet<usize> = HashSet::new();

Review Comment:
   This makes sense but I do want to review in a bit more detail to see if 
there's other alternative ways of doing it, using pointer tracking is always a 
bit funky.



##########
datafusion/physical-expr/src/scalar_function.rs:
##########
@@ -362,6 +362,14 @@ impl PhysicalExpr for ScalarFunctionExpr {
     fn is_volatile_node(&self) -> bool {
         self.fun.signature().volatility == Volatility::Volatile
     }
+
+    fn triviality(&self) -> ArgTriviality {

Review Comment:
   The logical and physical representations of a function go down the same path 
by abstracting the arguments behind `ArgTriviality`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to