[ 
https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17808343#comment-17808343
 ] 

Venkata krishnan Sowrirajan commented on FLINK-32940:
-----------------------------------------------------

[~jeyhun] Thanks for the detailed response. Sorry for the delayed response.

Yes, you're correct that _CoreRules.ProjectCorrelateTransposeRule_ is not 
enough and I results in the above error you mentioned and even after resolving 
it by other means it still won't work.


I was actually taking the approach of extending the 
_CoreRules.ProjectCorrelateTransposeRule_ and overriding the _onMatch_ method. 
The issue I was facing there was Calcite doesn't support expressing nested 
fields on a collection type (Map or Array) for eg: arr.a.b where arr is an 
Array type. Nested fields are typically represented through _RexFieldAccess_ 
but it is not supported on a collection type (Array) holding a bunch of 
{_}structs{_}. I was trying to extend the _RexFieldAccess_ internally with in 
Flink that handles the collection case. The new _RexFieldAccess_ (say 
{_}FlinkRexFieldAccess{_}) would be used in the new rule to push the nested 
projections on the collection down.

 

Additionally, it also requires the above adjustments to be done to make sure 
there are no dangling references once the plan is rewritten. Let me know your 
thoughts. Thanks!

 

I got pulled in to other important things for the time being and not able to 
return to this problem. Hopefully, I will get some spare cycles in the coming 
weeks to address this issue. 

> Support projection pushdown to table source for column projections through 
> UDTF
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-32940
>                 URL: https://issues.apache.org/jira/browse/FLINK-32940
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Venkata krishnan Sowrirajan
>            Priority: Major
>
> Currently, Flink doesn't push down columns projected through UDTF like 
> _UNNEST_ to the table source.
> For eg:
> {code:java}
> SELECT t1.deptno, t2.ename FROM db.dept_nested t1, UNNEST(t1.employees) AS 
> t2{code}
> For the above SQL, Flink projects all the columns for DEPT_NESTED rather than 
> only _name_ and {_}employees{_}. If the table source supports nested fields 
> column projection, ideally it should project only _t1.employees.ename_ from 
> the table source.
> Query plan:
> {code:java}
> == Abstract Syntax Tree ==
> LogicalProject(deptno=[$0], ename=[$5])
> +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
> requiredColumns=[{3}])
>    :- LogicalTableScan(table=[[hive_catalog, db, dept_nested]])
>    +- Uncollect
>       +- LogicalProject(employees=[$cor1.employees])
>          +- LogicalValues(tuples=[[{ 0 }]]){code}
> {code:java}
> == Optimized Physical Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], 
> correlate=[table($UNNEST_ROWS$1($cor1.employees))], 
> select=[deptno,name,skillrecord,employees,empno,ename,skills], 
> rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, 
> RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) 
> desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) skillrecord, RecordType:peek_no_expand(BIGINT empno, 
> VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) 
> type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) 
> a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT 
> empno, VARCHAR(2147483647) ename, 
> RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, 
> RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) ARRAY skills)], joinType=[INNER])
>    +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], 
> fields=[deptno, name, skillrecord, employees]){code}
> {code:java}
> == Optimized Execution Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], 
> correlate=[table($UNNEST_ROWS$1($cor1.employees))], 
> select=[deptno,name,skillrecord,employees,empno,ename,skills], 
> rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, 
> RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) 
> desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) skillrecord, RecordType:peek_no_expand(BIGINT empno, 
> VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) 
> type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) 
> a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT 
> empno, VARCHAR(2147483647) ename, 
> RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, 
> RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) ARRAY skills)], joinType=[INNER])
>    +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], 
> fields=[deptno, name, skillrecord, employees]) {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to