Daniel Rossos created FLINK-39804:
-------------------------------------

             Summary: Nested projection fields not pushed down to source
                 Key: FLINK-39804
                 URL: https://issues.apache.org/jira/browse/FLINK-39804
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
            Reporter: Daniel Rossos


h1. Summary

If you have a source that supports nested-projection-pushdown, if there is a 
non-pushed-down filter between the projection and source scan, only the 
top-level columns of your projection will be pushed down into the source.

Highlevel e.g 

{{SQL Query Example:}}

{{SELECT a.aOne, b FROM t1 WHERE <unpushable Filter> }}

Results in following Optimized Execution Plan:

{{== Optimized Execution Plan ==}}
{{Calc(select=[a.aOne, b], where=[<filter>])}}
{{+- TableSourceScan(table=[[c, db, t1, filter=[], project=[a, b]]], fields=[a, 
b])}}
h1. Detailed Trace

As part of the 
[PROJECT_RULES|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala#L212]
 in our query optimizer, we utilize the Calcite rule 
CoreRules.PROJECT_FILTER_TRANSPOSE. To 
[summarize|https://github.com/apache/calcite/blob/main/core/src/main/java/org/apache/calcite/rel/rules/ProjectFilterTransposeRule.java#L92],
 this rule will check to see if we have a PROJECTION above a FILTER node in our 
graph, if we do it will attempt to 'transpose' the projection node making it so 
the structure is that the Projection node now occurs "below" the filtering. 
This then sets up the structure to allow us for this projection to be pushed 
down into our source-scan. The problem is that in the default 
PROJECT_FILTER_TRANSPOSE mode of this transposition rule it will utilize a 
[PushProjector|https://github.com/apache/calcite/blob/main/core/src/main/java/org/apache/calcite/rel/rules/ProjectFilterTransposeRule.java#L176]
 to traverse the Projection that down the chain eventually uses 
visitInputRef(), which will only record the top-level index of the column in 
the projection. 
h1. Potential Solution / Investigation

In our testing, we found that by changing the projection rule to 
PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS or 
PROJECT_FILTER_TRANSPOSE_WHOLE_PROJECT_EXPRESSIONS in the 
[PROJECT_REWRITE|https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala#L253]
 rule-set* we were able to avoid this issue because these variants preserve 
whole project/filter expressions instead.

 

*Note, I specify PROJECT_REWRITE specifically, because if we use the 
PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS transpose rule in the Volcano 
LOGICAL phase we get an infinite oscillation problem because 
PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS pushes Project below Filter, 
FILTER_PROJECT_TRANSPOSE can pull Filter back below/above that Project, 
 and PROJECT_MERGE collapses the resulting projects back into the original 
shape, making the same rewrite applicable again. In our testing we left the 
transpose rule in LOGICAL phase as the current default, but still analyzing 
implications here.

Still investigating further, but interested to hear the opinion / perspective 
from someone with more understanding on the Calcite planner / Flink table 
optimizations work. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to