[ 
https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17778478#comment-17778478
 ] 

Jeyhun Karimov commented on FLINK-32940:
----------------------------------------

Hi [~vsowrirajan] [~337361...@qq.com] [~lsy] [~jark], throwing my two cents 
here:

Adding _CoreRules.ProjectCorrelateTransposeRule_ is not enough to solve the 
problem because of several reasons:

* Calcite will add two projections (one for the left and one for the right 
input) [1]. Sometimes some of these projections can be no-op (e.g, without 
expressions). This will cause null reference error in 
_BatchPhysicalCorrelateRule.scala: 67_ 
(_Some(calc.getProgram.expandLocalRef(calc.getProgram.getCondition))_). That is 
why probably you get this error. 
* However, solving the above issue is probably not enough to get this rule 
working, mainly because how _CoreRules.ProjectCorrelateTransposeRule_ works. 
Basically, this rule pushes down projects, without further handling/correcting 
the references (e.g., LogicalTableFunctionScan will have stale function 
invocation expression - getCall()). As a result, LogicalTableFunctionScan will 
try to access some field, however this field is already projected by Calcite 
rule (there is a LogicalProject operator(s) on top). 
* The above issue will get even complicated, when there are more operators 
(e.g., filters and projections) which has dangling references after Calcite 
rule is applied or many nested fields are accessed (this will result in 
LogicalCorrelate operators nested in each other)


  
About solution, IMO we should either:
# Create a rule that inherits from _CoreRules.ProjectCorrelateTransposeRule_ 
and overrides its _onMatch_ method. We should gracefully handle the downstream 
tree of operators when pushing down projections down to the LogicalCorrelate.
# Alternatively, we can use _CoreRules.ProjectCorrelateTransposeRule_ and our 
own rule to match


{code:java}
+- LogicalCorrelate
   :- LogicalProject
{code}

We cannot force matching LogicalTableFunctionScan or LogicalTableScan because 
dangling references can be in anywhere of the query plan. We need to 1) find 
all RexCall fields of LogicalTableFunctionScan, 2) check if they exists after 
projection pushdown, 3) if not, find to which [new] project expressions they 
correspond, and 4) rewire them. This potentially requires to rewrite 
expressions thought the whole query plan until the leaf node. 

Also, we do not need to merge LogicalProject and LogicalTableScan as part of 
this rule, since other rules will already do it. 

What do you guys think?

[1] 
https://github.com/apache/calcite/blob/c83ac69111fd9e75af5e3615af29a72284667a4a/core/src/main/java/org/apache/calcite/rel/rules/ProjectCorrelateTransposeRule.java#L126

> Support projection pushdown to table source for column projections through 
> UDTF
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-32940
>                 URL: https://issues.apache.org/jira/browse/FLINK-32940
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Venkata krishnan Sowrirajan
>            Priority: Major
>
> Currently, Flink doesn't push down columns projected through UDTF like 
> _UNNEST_ to the table source.
> For eg:
> {code:java}
> SELECT t1.deptno, t2.ename FROM db.dept_nested t1, UNNEST(t1.employees) AS 
> t2{code}
> For the above SQL, Flink projects all the columns for DEPT_NESTED rather than 
> only _name_ and {_}employees{_}. If the table source supports nested fields 
> column projection, ideally it should project only _t1.employees.ename_ from 
> the table source.
> Query plan:
> {code:java}
> == Abstract Syntax Tree ==
> LogicalProject(deptno=[$0], ename=[$5])
> +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
> requiredColumns=[{3}])
>    :- LogicalTableScan(table=[[hive_catalog, db, dept_nested]])
>    +- Uncollect
>       +- LogicalProject(employees=[$cor1.employees])
>          +- LogicalValues(tuples=[[{ 0 }]]){code}
> {code:java}
> == Optimized Physical Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], 
> correlate=[table($UNNEST_ROWS$1($cor1.employees))], 
> select=[deptno,name,skillrecord,employees,empno,ename,skills], 
> rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, 
> RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) 
> desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) skillrecord, RecordType:peek_no_expand(BIGINT empno, 
> VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) 
> type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) 
> a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT 
> empno, VARCHAR(2147483647) ename, 
> RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, 
> RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) ARRAY skills)], joinType=[INNER])
>    +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], 
> fields=[deptno, name, skillrecord, employees]){code}
> {code:java}
> == Optimized Execution Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], 
> correlate=[table($UNNEST_ROWS$1($cor1.employees))], 
> select=[deptno,name,skillrecord,employees,empno,ename,skills], 
> rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, 
> RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) 
> desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) skillrecord, RecordType:peek_no_expand(BIGINT empno, 
> VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) 
> type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) 
> a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT 
> empno, VARCHAR(2147483647) ename, 
> RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, 
> RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) ARRAY skills)], joinType=[INNER])
>    +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], 
> fields=[deptno, name, skillrecord, employees]) {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to