[ https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17778478#comment-17778478 ]
Jeyhun Karimov commented on FLINK-32940: ---------------------------------------- Hi [~vsowrirajan] [~337361...@qq.com] [~lsy] [~jark], throwing my two cents here: Adding _CoreRules.ProjectCorrelateTransposeRule_ is not enough to solve the problem because of several reasons: * Calcite will add two projections (one for the left and one for the right input) [1]. Sometimes some of these projections can be no-op (e.g, without expressions). This will cause null reference error in _BatchPhysicalCorrelateRule.scala: 67_ (_Some(calc.getProgram.expandLocalRef(calc.getProgram.getCondition))_). That is why probably you get this error. * However, solving the above issue is probably not enough to get this rule working, mainly because how _CoreRules.ProjectCorrelateTransposeRule_ works. Basically, this rule pushes down projects, without further handling/correcting the references (e.g., LogicalTableFunctionScan will have stale function invocation expression - getCall()). As a result, LogicalTableFunctionScan will try to access some field, however this field is already projected by Calcite rule (there is a LogicalProject operator(s) on top). * The above issue will get even complicated, when there are more operators (e.g., filters and projections) which has dangling references after Calcite rule is applied or many nested fields are accessed (this will result in LogicalCorrelate operators nested in each other) About solution, IMO we should either: # Create a rule that inherits from _CoreRules.ProjectCorrelateTransposeRule_ and overrides its _onMatch_ method. We should gracefully handle the downstream tree of operators when pushing down projections down to the LogicalCorrelate. # Alternatively, we can use _CoreRules.ProjectCorrelateTransposeRule_ and our own rule to match {code:java} +- LogicalCorrelate :- LogicalProject {code} We cannot force matching LogicalTableFunctionScan or LogicalTableScan because dangling references can be in anywhere of the query plan. We need to 1) find all RexCall fields of LogicalTableFunctionScan, 2) check if they exists after projection pushdown, 3) if not, find to which [new] project expressions they correspond, and 4) rewire them. This potentially requires to rewrite expressions thought the whole query plan until the leaf node. Also, we do not need to merge LogicalProject and LogicalTableScan as part of this rule, since other rules will already do it. What do you guys think? [1] https://github.com/apache/calcite/blob/c83ac69111fd9e75af5e3615af29a72284667a4a/core/src/main/java/org/apache/calcite/rel/rules/ProjectCorrelateTransposeRule.java#L126 > Support projection pushdown to table source for column projections through > UDTF > ------------------------------------------------------------------------------- > > Key: FLINK-32940 > URL: https://issues.apache.org/jira/browse/FLINK-32940 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Reporter: Venkata krishnan Sowrirajan > Priority: Major > > Currently, Flink doesn't push down columns projected through UDTF like > _UNNEST_ to the table source. > For eg: > {code:java} > SELECT t1.deptno, t2.ename FROM db.dept_nested t1, UNNEST(t1.employees) AS > t2{code} > For the above SQL, Flink projects all the columns for DEPT_NESTED rather than > only _name_ and {_}employees{_}. If the table source supports nested fields > column projection, ideally it should project only _t1.employees.ename_ from > the table source. > Query plan: > {code:java} > == Abstract Syntax Tree == > LogicalProject(deptno=[$0], ename=[$5]) > +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], > requiredColumns=[{3}]) > :- LogicalTableScan(table=[[hive_catalog, db, dept_nested]]) > +- Uncollect > +- LogicalProject(employees=[$cor1.employees]) > +- LogicalValues(tuples=[[{ 0 }]]){code} > {code:java} > == Optimized Physical Plan == > Calc(select=[deptno, ename]) > +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], > correlate=[table($UNNEST_ROWS$1($cor1.employees))], > select=[deptno,name,skillrecord,employees,empno,ename,skills], > rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, > RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) > desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) skillrecord, RecordType:peek_no_expand(BIGINT empno, > VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) > type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) > a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT > empno, VARCHAR(2147483647) ename, > RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, > RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) ARRAY skills)], joinType=[INNER]) > +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], > fields=[deptno, name, skillrecord, employees]){code} > {code:java} > == Optimized Execution Plan == > Calc(select=[deptno, ename]) > +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], > correlate=[table($UNNEST_ROWS$1($cor1.employees))], > select=[deptno,name,skillrecord,employees,empno,ename,skills], > rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, > RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) > desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) skillrecord, RecordType:peek_no_expand(BIGINT empno, > VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) > type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) > a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT > empno, VARCHAR(2147483647) ename, > RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, > RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) ARRAY skills)], joinType=[INNER]) > +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], > fields=[deptno, name, skillrecord, employees]) {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)