adriangb opened a new issue, #19550:
URL: https://github.com/apache/datafusion/issues/19550
The current physical layer projection pushdown rule is:
```rust
fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
```
Operators like `FilterExec` then treat the pushdown as all or nothing:
```rust
// If the projection does not narrow the schema, we should not try to push
it down:
if projection.expr().len() < projection.input().schema().fields().len() {
// Each column in the predicate expression must exist after the
projection.
if let Some(new_predicate) =
update_expr(self.predicate(), projection.expr(), false)?
{ ... {
}
```
This poses a problem for #19387 for cases like
`complex_function(struct_col['field'])`.
To do the optimal thing I think we have to change the rule to operate on a
per-expression per-subtree basis.
That is: for each projection expression look for subtrees that would be
beneficial to push down, split the projection into two and push down subtrees
of each expression. For example, given the data:
```sql
copy (select {email: '[email protected]', address: '123 Main St, NYC'} as
user) to 'test.parquet';
```
```sql
select lower(user['email'])
from 'test.parquet' t
where user['address'] ilike '%nyc%';
```
Will produce a plan of the form:
```
ProjectionExec: expr=[lower(get_field(user@0, email)) as
lower(t.user[email])]
RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1
CoalesceBatchesExec: target_batch_size=8192
DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[user], predicate = [get_field(user@0, address) ILIKE %nyc%],
file_type=parquet
```
(The actual plan is a bit different because filters referencing struct
columns are currently not pushed down but that's an orthogonal issue that I
will expand on in a different issue and link to here)
Ideally we want a plan of the form:
```
ProjectionExec: expr=[lower("get_field(user@0, email)"@0) as
lower(t.user[email])]
RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1
CoalesceBatchesExec: target_batch_size=8192
DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[get_field(user@0, email) as get_field(user@0, email)], predicate =
[get_field(user@0, address) ILIKE %nyc%], file_type=parquet
```
To achieve this I propose we add some helper methods to `ProjectionExprs`
along the lines of:
```rust
fn split_expressions(&self, f: F) -> Result<(Option<ProjectionExprs>,
Option<ProjectionExprs>)>
where
F: Fn(Arc<dyn PhysicalExpr>) -> Result<bool> { ... }
```
Then operators would use this as:
```rust
fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn DataSource>>> {
let (keep, pushdown) = projection.expr().split_expressions(|expr|
expr.is_trivial())?;
// Proceed as normal with `pushdown`, if keep is Some() wrap result in
ProjectionExpr
```
Where `PhysicalExpr::is_trivial` comes from #19538
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]