adriangb commented on code in PR #19538:
URL: https://github.com/apache/datafusion/pull/19538#discussion_r2717480648
##########
datafusion/physical-optimizer/src/output_requirements.rs:
##########
@@ -256,18 +256,13 @@ impl ExecutionPlan for OutputRequirementExec {
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
- // If the projection does not narrow the schema, we should not try to
push it down:
- let proj_exprs = projection.expr();
- if proj_exprs.len() >= projection.input().schema().fields().len() {
- return Ok(None);
- }
Review Comment:
This same code was sort of sprinkled around various expressions. I'm not
sure how deeply each one was evaluated for what behavior it should have. It
seems to be that `OutputRequirementExec` is a temporary marker used by the
physical optimizer and should not have any bearing in where projections are
placed.
##########
datafusion/physical-optimizer/src/projection_pushdown.rs:
##########
@@ -88,6 +98,267 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
}
}
+/// Tries to split a projection to extract beneficial sub-expressions for
pushdown.
+///
+/// This function walks each expression in the projection and extracts
beneficial
+/// sub-expressions (like `get_field`) from within larger non-beneficial
expressions.
+/// For example:
+/// - Input: `get_field(col, 'foo') + 1`
+/// - Output: Inner projection: `get_field(col, 'foo') AS __extracted_0`,
Outer: `__extracted_0 + 1`
+///
+/// This enables the beneficial parts to be pushed down while keeping
non-beneficial
+/// expressions (like literals and computations) above.
+fn try_split_projection(
Review Comment:
This is a meaty bit worth reviewing and considering
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -684,6 +685,31 @@ pub fn all_alias_free_columns(exprs: &[ProjectionExpr]) ->
bool {
})
}
+/// Returns true if the projection is beneficial to push through operators
like Sort/Filter.
+/// This is true when:
+/// - The projection narrows the schema (drops columns) - saves memory, OR
+/// - Any expression is a TrivialExpr (like get_field) - beneficial
computation pushdown
+///
+/// Pure Column references that don't narrow the schema are NOT beneficial to
push,
+/// as they just rearrange the plan without any gain.
+///
+/// Note: Projections are split by `try_split_projection` before reaching this
function,
+/// so if any expression is TrivialExpr, all expressions should be trivial.
+pub fn is_trivial_or_narrows_schema(projection: &ProjectionExec) -> bool {
+ let exprs = projection.expr();
+
+ // Push if projection narrows schema (drops columns) - saves memory
+ let narrows_schema = exprs.len() <
projection.input().schema().fields().len();
Review Comment:
Should this also add the condition that all expressions are columns or
trivial expressions?
##########
datafusion/expr-common/src/triviality.rs:
##########
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Triviality classification for expressions and function arguments.
+
+/// Classification of argument triviality for scalar functions.
+///
+/// This enum is used by [`ScalarUDFImpl::triviality`] to allow
+/// functions to make context-dependent decisions about whether they are
+/// trivial based on the nature of their arguments.
+///
+/// For example, `get_field(struct_col, 'field_name')` is trivial (static field
+/// lookup), but `get_field(struct_col, key_col)` is not (dynamic per-row
lookup).
+///
+/// [`ScalarUDFImpl::triviality`]:
https://docs.rs/datafusion-expr/latest/datafusion_expr/trait.ScalarUDFImpl.html#tymethod.triviality
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ArgTriviality {
+ /// Argument is a literal constant value or an expression that can be
+ /// evaluated to a constant at planning time.
+ Literal,
+ /// Argument is a simple column reference.
+ Column,
+ /// Argument is a complex expression that declares itself trivial.
+ /// For example, if `get_field(struct_col, 'field_name')` is implemented
as a
+ /// trivial expression, then it would return this variant.
+ /// Then `other_trivial_function(get_field(...), 42)` could also be
classified as
+ /// a trivial expression using the knowledge that `get_field(...)` is
trivial.
+ TrivialExpr,
+ /// Argument is a complex expression that declares itself non-trivial.
+ /// For example, `min(col1 + col2)` is non-trivial because it requires
per-row computation.
+ NonTrivial,
+}
Review Comment:
I think the main thing we need to figure out is if these are the right terms
/ wording for this system.
I don't love "trivial" but it does work and it is the existing terminology.
I don't like it for 2 reasons:
1. It's confusing. Claude *constantly* got confused thinking that a column
or literal expression is trivial.
2. It's a non specific term. It doesn't imply something concrete.
##########
datafusion/physical-expr/src/projection.rs:
##########
@@ -795,7 +796,7 @@ pub fn update_expr(
projected_exprs: &[ProjectionExpr],
sync_with_child: bool,
) -> Result<Option<Arc<dyn PhysicalExpr>>> {
- #[derive(Debug, PartialEq)]
+ #[derive(PartialEq)]
Review Comment:
Let's try to revert this (keep `Debug` implemented)
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -957,14 +976,20 @@ fn try_unifying_projections(
})
.unwrap();
});
- // Merging these projections is not beneficial, e.g
- // If an expression is not trivial and it is referred more than 1, unifies
projections will be
- // beneficial as caching mechanism for non-trivial computations.
- // See discussion in: https://github.com/apache/datafusion/issues/8296
- if column_ref_map.iter().any(|(column, count)| {
- *count > 1 &&
!is_expr_trivial(&Arc::clone(&child.expr()[column.index()].expr))
- }) {
- return Ok(None);
+ // Don't merge if:
+ // 1. A non-trivial expression is referenced more than once (caching
benefit)
+ // See discussion in: https://github.com/apache/datafusion/issues/8296
+ // 2. The child projection has TrivialExpr (like get_field) that should be
pushed
+ // down to the data source separately
+ for (column, count) in column_ref_map.iter() {
+ let triviality = child.expr()[column.index()].expr.triviality();
+ // Don't merge if multi-referenced non-trivial (caching)
+ if (*count > 1 && matches!(triviality, ArgTriviality::NonTrivial))
+ // Don't merge if child has TrivialExpr (should push to source)
+ || matches!(triviality, ArgTriviality::TrivialExpr)
+ {
+ return Ok(None);
+ }
Review Comment:
This is awkward interaction between optimizer rules. Need to evaluate nad
see if there's a better way to do this.
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -255,18 +256,16 @@ impl ExecutionPlan for ProjectionExec {
}
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
- let all_simple_exprs =
- self.projector
- .projection()
- .as_ref()
+ // If expressions are all trivial (columns, literals, or field
accessors),
+ // then all computations in this projection are reorder or rename,
+ // and projection would not benefit from the repartition.
+ // An expression is "cheap" if it's not NonTrivial (i.e., Column,
Literal, or TrivialExpr).
+ vec![
+ !self
+ .projection_expr()
.iter()
- .all(|proj_expr| {
- proj_expr.expr.as_any().is::<Column>()
- || proj_expr.expr.as_any().is::<Literal>()
- });
- // If expressions are all either column_expr or Literal, then all
computations in this projection are reorder or rename,
- // and projection would not benefit from the repartition,
benefits_from_input_partitioning will return false.
- vec![!all_simple_exprs]
+ .all(|p| !matches!(p.expr.triviality(),
ArgTriviality::NonTrivial)),
+ ]
Review Comment:
This is only called from `RepartitionExec` as far as I can tell. We should
consider this implementation together with
https://github.com/apache/datafusion/pull/19538/changes#r2717491453
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1123,15 +1123,8 @@ impl ExecutionPlan for RepartitionExec {
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
- // If the projection does not narrow the schema, we should not try to
push it down.
- if projection.expr().len() >=
projection.input().schema().fields().len() {
- return Ok(None);
- }
-
// If pushdown is not beneficial or applicable, break it.
- if projection.benefits_from_input_partitioning()[0]
- || !all_columns(projection.expr())
- {
+ if projection.benefits_from_input_partitioning()[0] {
Review Comment:
Maybe this should also check `is_trivial_or_narrows_schema`?
##########
datafusion/physical-expr/src/projection.rs:
##########
@@ -806,17 +807,65 @@ pub fn update_expr(
RewrittenInvalid,
}
+ // Track Arc pointers of columns created by pass 1.
+ // These should not be modified by pass 2.
+ // We use Arc pointer addresses (not name/index) to distinguish
pass-1-created columns
+ // from original columns that happen to have the same name and index.
+ let mut pass1_created: HashSet<usize> = HashSet::new();
Review Comment:
This makes sense but I do want to review in a bit more detail to see if
there's other alternative ways of doing it, using pointer tracking is always a
bit funky.
##########
datafusion/physical-expr/src/scalar_function.rs:
##########
@@ -362,6 +362,14 @@ impl PhysicalExpr for ScalarFunctionExpr {
fn is_volatile_node(&self) -> bool {
self.fun.signature().volatility == Volatility::Volatile
}
+
+ fn triviality(&self) -> ArgTriviality {
Review Comment:
The logical and physical representations of a function go down the same path
by abstracting the arguments behind `ArgTriviality`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]