adriangb opened a new issue, #19550:
URL: https://github.com/apache/datafusion/issues/19550

   The current physical layer projection pushdown rule is:
   
   ```rust
   fn try_swapping_with_projection(
       &self,
       projection: &ProjectionExec,
   ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
   ```
   
   Operators like `FilterExec` then treat the pushdown as all or nothing:
   
   ```rust
   // If the projection does not narrow the schema, we should not try to push 
it down:
   if projection.expr().len() < projection.input().schema().fields().len() {
       // Each column in the predicate expression must exist after the 
projection.
       if let Some(new_predicate) =
           update_expr(self.predicate(), projection.expr(), false)?
       { ... {
    }
   ```
   
   This poses a problem for #19387 for cases like 
`complex_function(struct_col['field'])`.
   To do the optimal thing I think we have to change the rule to operate on a 
per-expression per-subtree basis.
   That is: for each projection expression look for subtrees that would be 
beneficial to push down, split the projection into two and push down subtrees 
of each expression. For example, given the data:
   
   
   ```sql
   copy (select {email: '[email protected]', address: '123 Main St, NYC'} as 
user) to 'test.parquet';
   ```
   
   ```sql
   select lower(user['email'])
   from 'test.parquet' t
   where user['address'] ilike '%nyc%';
   ```
   
   Will produce a plan of the form:
   
   ```
   ProjectionExec: expr=[lower(get_field(user@0, email)) as 
lower(t.user[email])]
     RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1
       CoalesceBatchesExec: target_batch_size=8192
         DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[user], predicate = [get_field(user@0, address) ILIKE %nyc%], 
file_type=parquet
   ```
   
   (The actual plan is a bit different because filters referencing struct 
columns are currently not pushed down but that's an orthogonal issue that I 
will expand on in a different issue and link to here)
   
   
   Ideally we want a plan of the form:
   
   ```
   ProjectionExec: expr=[lower("get_field(user@0, email)"@0) as 
lower(t.user[email])]
     RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1
       CoalesceBatchesExec: target_batch_size=8192
         DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[get_field(user@0, email) as get_field(user@0, email)], predicate = 
[get_field(user@0, address) ILIKE %nyc%], file_type=parquet
   ```
   
   To achieve this I propose we add some helper methods to `ProjectionExprs` 
along the lines of:
   
   ```rust
   fn split_expressions(&self, f: F) -> Result<(Option<ProjectionExprs>, 
Option<ProjectionExprs>)>
   where
       F: Fn(Arc<dyn PhysicalExpr>) -> Result<bool> { ... }
   ```
   
   Then operators would use this as:
   
   ```rust
   fn try_swapping_with_projection(
       &self,
       projection: &ProjectionExec,
   ) -> Result<Option<Arc<dyn DataSource>>> {
       let (keep, pushdown) = projection.expr().split_expressions(|expr| 
expr.is_trivial())?;
       // Proceed as normal with `pushdown`, if keep is Some() wrap result in 
ProjectionExpr
   ```
   
   Where `PhysicalExpr::is_trivial` comes from #19538


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to