[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF

2024-01-18 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17808343#comment-17808343
 ] 

Venkata krishnan Sowrirajan commented on FLINK-32940:
-

[~jeyhun] Thanks for the detailed response. Sorry for the delayed response.

Yes, you're correct that _CoreRules.ProjectCorrelateTransposeRule_ is not 
enough and I results in the above error you mentioned and even after resolving 
it by other means it still won't work.


I was actually taking the approach of extending the 
_CoreRules.ProjectCorrelateTransposeRule_ and overriding the _onMatch_ method. 
The issue I was facing there was Calcite doesn't support expressing nested 
fields on a collection type (Map or Array) for eg: arr.a.b where arr is an 
Array type. Nested fields are typically represented through _RexFieldAccess_ 
but it is not supported on a collection type (Array) holding a bunch of 
{_}structs{_}. I was trying to extend the _RexFieldAccess_ internally with in 
Flink that handles the collection case. The new _RexFieldAccess_ (say 
{_}FlinkRexFieldAccess{_}) would be used in the new rule to push the nested 
projections on the collection down.

 

Additionally, it also requires the above adjustments to be done to make sure 
there are no dangling references once the plan is rewritten. Let me know your 
thoughts. Thanks!

 

I got pulled in to other important things for the time being and not able to 
return to this problem. Hopefully, I will get some spare cycles in the coming 
weeks to address this issue. 

> Support projection pushdown to table source for column projections through 
> UDTF
> ---
>
> Key: FLINK-32940
> URL: https://issues.apache.org/jira/browse/FLINK-32940
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> Currently, Flink doesn't push down columns projected through UDTF like 
> _UNNEST_ to the table source.
> For eg:
> {code:java}
> SELECT t1.deptno, t2.ename FROM db.dept_nested t1, UNNEST(t1.employees) AS 
> t2{code}
> For the above SQL, Flink projects all the columns for DEPT_NESTED rather than 
> only _name_ and {_}employees{_}. If the table source supports nested fields 
> column projection, ideally it should project only _t1.employees.ename_ from 
> the table source.
> Query plan:
> {code:java}
> == Abstract Syntax Tree ==
> LogicalProject(deptno=[$0], ename=[$5])
> +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
> requiredColumns=[{3}])
>    :- LogicalTableScan(table=[[hive_catalog, db, dept_nested]])
>    +- Uncollect
>       +- LogicalProject(employees=[$cor1.employees])
>          +- LogicalValues(tuples=[[{ 0 }]]){code}
> {code:java}
> == Optimized Physical Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], 
> correlate=[table($UNNEST_ROWS$1($cor1.employees))], 
> select=[deptno,name,skillrecord,employees,empno,ename,skills], 
> rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, 
> RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) 
> desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) skillrecord, RecordType:peek_no_expand(BIGINT empno, 
> VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) 
> type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) 
> a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT 
> empno, VARCHAR(2147483647) ename, 
> RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, 
> RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) ARRAY skills)], joinType=[INNER])
>    +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], 
> fields=[deptno, name, skillrecord, employees]){code}
> {code:java}
> == Optimized Execution Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], 
> correlate=[table($UNNEST_ROWS$1($cor1.employees))], 
> select=[deptno,name,skillrecord,employees,empno,ename,skills], 
> rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, 
> RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) 
> desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) skillrecord, RecordType:peek_no_expand(BIGINT empno, 
> VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) 
> type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) 
> a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT 
> empno, VARCHAR(2147483647) ename, 
> RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, 
> 

[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF

2023-10-22 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778478#comment-17778478
 ] 

Jeyhun Karimov commented on FLINK-32940:


Hi [~vsowrirajan] [~337361...@qq.com] [~lsy] [~jark], throwing my two cents 
here:

Adding _CoreRules.ProjectCorrelateTransposeRule_ is not enough to solve the 
problem because of several reasons:

* Calcite will add two projections (one for the left and one for the right 
input) [1]. Sometimes some of these projections can be no-op (e.g, without 
expressions). This will cause null reference error in 
_BatchPhysicalCorrelateRule.scala: 67_ 
(_Some(calc.getProgram.expandLocalRef(calc.getProgram.getCondition))_). That is 
why probably you get this error. 
* However, solving the above issue is probably not enough to get this rule 
working, mainly because how _CoreRules.ProjectCorrelateTransposeRule_ works. 
Basically, this rule pushes down projects, without further handling/correcting 
the references (e.g., LogicalTableFunctionScan will have stale function 
invocation expression - getCall()). As a result, LogicalTableFunctionScan will 
try to access some field, however this field is already projected by Calcite 
rule (there is a LogicalProject operator(s) on top). 
* The above issue will get even complicated, when there are more operators 
(e.g., filters and projections) which has dangling references after Calcite 
rule is applied or many nested fields are accessed (this will result in 
LogicalCorrelate operators nested in each other)


  
About solution, IMO we should either:
# Create a rule that inherits from _CoreRules.ProjectCorrelateTransposeRule_ 
and overrides its _onMatch_ method. We should gracefully handle the downstream 
tree of operators when pushing down projections down to the LogicalCorrelate.
# Alternatively, we can use _CoreRules.ProjectCorrelateTransposeRule_ and our 
own rule to match


{code:java}
+- LogicalCorrelate
   :- LogicalProject
{code}

We cannot force matching LogicalTableFunctionScan or LogicalTableScan because 
dangling references can be in anywhere of the query plan. We need to 1) find 
all RexCall fields of LogicalTableFunctionScan, 2) check if they exists after 
projection pushdown, 3) if not, find to which [new] project expressions they 
correspond, and 4) rewire them. This potentially requires to rewrite 
expressions thought the whole query plan until the leaf node. 

Also, we do not need to merge LogicalProject and LogicalTableScan as part of 
this rule, since other rules will already do it. 

What do you guys think?

[1] 
https://github.com/apache/calcite/blob/c83ac69111fd9e75af5e3615af29a72284667a4a/core/src/main/java/org/apache/calcite/rel/rules/ProjectCorrelateTransposeRule.java#L126

> Support projection pushdown to table source for column projections through 
> UDTF
> ---
>
> Key: FLINK-32940
> URL: https://issues.apache.org/jira/browse/FLINK-32940
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> Currently, Flink doesn't push down columns projected through UDTF like 
> _UNNEST_ to the table source.
> For eg:
> {code:java}
> SELECT t1.deptno, t2.ename FROM db.dept_nested t1, UNNEST(t1.employees) AS 
> t2{code}
> For the above SQL, Flink projects all the columns for DEPT_NESTED rather than 
> only _name_ and {_}employees{_}. If the table source supports nested fields 
> column projection, ideally it should project only _t1.employees.ename_ from 
> the table source.
> Query plan:
> {code:java}
> == Abstract Syntax Tree ==
> LogicalProject(deptno=[$0], ename=[$5])
> +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
> requiredColumns=[{3}])
>    :- LogicalTableScan(table=[[hive_catalog, db, dept_nested]])
>    +- Uncollect
>       +- LogicalProject(employees=[$cor1.employees])
>          +- LogicalValues(tuples=[[{ 0 }]]){code}
> {code:java}
> == Optimized Physical Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], 
> correlate=[table($UNNEST_ROWS$1($cor1.employees))], 
> select=[deptno,name,skillrecord,employees,empno,ename,skills], 
> rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, 
> RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) 
> desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) skillrecord, RecordType:peek_no_expand(BIGINT empno, 
> VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) 
> type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) 
> a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT 
> empno, VARCHAR(2147483647) ename, 
> RecordType:peek_no_expand(VARCHAR(2147483647) type, 

[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF

2023-08-27 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17759447#comment-17759447
 ] 

Venkata krishnan Sowrirajan commented on FLINK-32940:
-

Thanks [~337361...@qq.com] and [~lsy] . [~337361...@qq.com] , it makes sense to 
me.

To summarize:
 # Apply _CoreRules.ProjectCorrelateTransposeRule_ to {_}FlinkBatchRuleSets{_}. 
But this only pushes the projects that are referenced by the children of 
_Correlate_ and not the other projects that needs to be pushed to the TableScan.
 # Introduce another rule that pushes the projects to the TableScan but only if 
the _Correlate_ is on a known UDTF like {_}UNNEST{_}. Otherwise, for user 
defined UDTFs we won't know for sure whether it is safe to push to the 
_TableScan_ or not. What do you think? I'm still thinking about how to write 
this rule so that it pushes all the projects to the TableScan operator.
 # Merge the _LogicalProject_ and _LogicalTableFunctionScan_ to 
LogicalTableFunctionScan.

> Support projection pushdown to table source for column projections through 
> UDTF
> ---
>
> Key: FLINK-32940
> URL: https://issues.apache.org/jira/browse/FLINK-32940
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> Currently, Flink doesn't push down columns projected through UDTF like 
> _UNNEST_ to the table source.
> For eg:
> {code:java}
> SELECT t1.deptno, t2.ename FROM db.dept_nested t1, UNNEST(t1.employees) AS 
> t2{code}
> For the above SQL, Flink projects all the columns for DEPT_NESTED rather than 
> only _name_ and {_}employees{_}. If the table source supports nested fields 
> column projection, ideally it should project only _t1.employees.ename_ from 
> the table source.
> Query plan:
> {code:java}
> == Abstract Syntax Tree ==
> LogicalProject(deptno=[$0], ename=[$5])
> +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
> requiredColumns=[{3}])
>    :- LogicalTableScan(table=[[hive_catalog, db, dept_nested]])
>    +- Uncollect
>       +- LogicalProject(employees=[$cor1.employees])
>          +- LogicalValues(tuples=[[{ 0 }]]){code}
> {code:java}
> == Optimized Physical Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], 
> correlate=[table($UNNEST_ROWS$1($cor1.employees))], 
> select=[deptno,name,skillrecord,employees,empno,ename,skills], 
> rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, 
> RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) 
> desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) skillrecord, RecordType:peek_no_expand(BIGINT empno, 
> VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) 
> type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) 
> a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT 
> empno, VARCHAR(2147483647) ename, 
> RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, 
> RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) ARRAY skills)], joinType=[INNER])
>    +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], 
> fields=[deptno, name, skillrecord, employees]){code}
> {code:java}
> == Optimized Execution Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], 
> correlate=[table($UNNEST_ROWS$1($cor1.employees))], 
> select=[deptno,name,skillrecord,employees,empno,ename,skills], 
> rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, 
> RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) 
> desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) skillrecord, RecordType:peek_no_expand(BIGINT empno, 
> VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) 
> type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) 
> a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT 
> empno, VARCHAR(2147483647) ename, 
> RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, 
> RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) ARRAY skills)], joinType=[INNER])
>    +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], 
> fields=[deptno, name, skillrecord, employees]) {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF

2023-08-24 Thread Yunhong Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758824#comment-17758824
 ] 

Yunhong Zheng commented on FLINK-32940:
---

Hi, [~vsowrirajan] . I think your idea is good, but the biggest problem 
currently is how to pass the column cropping condition of 
LogicalTableFunctionScan to LogicalTableScan and rewrite 
LogicalTableFunctionScan. So I think we need to add a rule in project_rewrite 
stage. 
 # Actually, first we need add  rule CoreRules.ProjectCorrelateTransposeRule in 
FlinkBatchRuleSets to push project into LogicalCorrelate.
 # And we need add a rule in project_rewrite stage to pass by this project into 
LogicalTableScan side and rewrite LogicalTableFunctionScan.
 # For this npe problem, you can add if logical to avoid it.

Adding one example to explain step: 2

for this ddl

 
{code:java}
String ddl =
"CREATE TABLE NestedItemTable1 (\n"
+ "  `deptno` INT,\n"
+ "  `employees` MAP\n"
+ ") WITH (\n"
+ " 'connector' = 'values',\n"
+ " 'nested-projection-supported' = 'true',"
+ " 'bounded' = 'true'\n"
+ ")";
util.tableEnv().executeSql(ddl);

util.verifyRelPlan(
"SELECT t1.deptno, k FROM NestedItemTable1 t1, UNNEST(t1.employees) as 
f(k, v)");{code}
 

we will get the below plan after add CoreRules.ProjectCorrelateTransposeRule:

 
{code:java}
optimize project_rewrite cost 413675 ms.
optimize result: 
LogicalProject(inputs=[0], exprs=[[$2]])
+- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
requiredColumns=[{1}])
   :- LogicalTableScan(table=[[default_catalog, default_database, 
NestedItemTable1]])
   +- LogicalProject(inputs=[0])
      +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.employees)], 
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, VARCHAR(2147483647) 
f1)]){code}
 

I think for this pattern, we need add a new rule to match this.:

 
{code:java}
+- LogicalCorrelate
   :- LogicalTableScan
   +- LogicalProject
  +- LogicalTableFunctionScan{code}
 

In this rule, we first need to create a new LogicalTableFunctionScan after 
merge LogicalProject and LogicalTableFunctionScan.

second, we need add a new LogicalProject for LogicalTableScan, which will be 
push down to LogicalTableScan in logical stage.

IMO, the new plan after match this rule will be (just an example, not correct 
plan):

 
{code:java}
LogicalProject(inputs=[0], exprs=[[$2]])
+- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
requiredColumns=[{1}])
   :- LogicalProject(inputs=[employees.k])    
  +- LogicalTableScan(table=[[default_catalog, default_database, 
NestedItemTable1]])
   :- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.employees.k)], 
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)]){code}
 

WDYT?  [~vsowrirajan]. Once the solution is determined and u complete the 
development, you can ping me to review it.

> Support projection pushdown to table source for column projections through 
> UDTF
> ---
>
> Key: FLINK-32940
> URL: https://issues.apache.org/jira/browse/FLINK-32940
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> Currently, Flink doesn't push down columns projected through UDTF like 
> _UNNEST_ to the table source.
> For eg:
> {code:java}
> SELECT t1.deptno, t2.ename FROM db.dept_nested t1, UNNEST(t1.employees) AS 
> t2{code}
> For the above SQL, Flink projects all the columns for DEPT_NESTED rather than 
> only _name_ and {_}employees{_}. If the table source supports nested fields 
> column projection, ideally it should project only _t1.employees.ename_ from 
> the table source.
> Query plan:
> {code:java}
> == Abstract Syntax Tree ==
> LogicalProject(deptno=[$0], ename=[$5])
> +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
> requiredColumns=[{3}])
>    :- LogicalTableScan(table=[[hive_catalog, db, dept_nested]])
>    +- Uncollect
>       +- LogicalProject(employees=[$cor1.employees])
>          +- LogicalValues(tuples=[[{ 0 }]]){code}
> {code:java}
> == Optimized Physical Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], 
> correlate=[table($UNNEST_ROWS$1($cor1.employees))], 
> select=[deptno,name,skillrecord,employees,empno,ename,skills], 
> rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, 
> RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) 
> desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) skillrecord, RecordType:peek_no_expand(BIGINT empno, 
> VARCHAR(2147483647) ename, 

[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF

2023-08-24 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758789#comment-17758789
 ] 

dalongliu commented on FLINK-32940:
---

[~vsowrirajan] Your attempt look goods to me

> Support projection pushdown to table source for column projections through 
> UDTF
> ---
>
> Key: FLINK-32940
> URL: https://issues.apache.org/jira/browse/FLINK-32940
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> Currently, Flink doesn't push down columns projected through UDTF like 
> _UNNEST_ to the table source.
> For eg:
> {code:java}
> SELECT t1.deptno, t2.ename FROM db.dept_nested t1, UNNEST(t1.employees) AS 
> t2{code}
> For the above SQL, Flink projects all the columns for DEPT_NESTED rather than 
> only _name_ and {_}employees{_}. If the table source supports nested fields 
> column projection, ideally it should project only _t1.employees.ename_ from 
> the table source.
> Query plan:
> {code:java}
> == Abstract Syntax Tree ==
> LogicalProject(deptno=[$0], ename=[$5])
> +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
> requiredColumns=[{3}])
>    :- LogicalTableScan(table=[[hive_catalog, db, dept_nested]])
>    +- Uncollect
>       +- LogicalProject(employees=[$cor1.employees])
>          +- LogicalValues(tuples=[[{ 0 }]]){code}
> {code:java}
> == Optimized Physical Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], 
> correlate=[table($UNNEST_ROWS$1($cor1.employees))], 
> select=[deptno,name,skillrecord,employees,empno,ename,skills], 
> rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, 
> RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) 
> desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) skillrecord, RecordType:peek_no_expand(BIGINT empno, 
> VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) 
> type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) 
> a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT 
> empno, VARCHAR(2147483647) ename, 
> RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, 
> RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) ARRAY skills)], joinType=[INNER])
>    +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], 
> fields=[deptno, name, skillrecord, employees]){code}
> {code:java}
> == Optimized Execution Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], 
> correlate=[table($UNNEST_ROWS$1($cor1.employees))], 
> select=[deptno,name,skillrecord,employees,empno,ename,skills], 
> rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, 
> RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) 
> desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) skillrecord, RecordType:peek_no_expand(BIGINT empno, 
> VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) 
> type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) 
> a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT 
> empno, VARCHAR(2147483647) ename, 
> RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, 
> RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) 
> others) ARRAY skills)], joinType=[INNER])
>    +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], 
> fields=[deptno, name, skillrecord, employees]) {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF

2023-08-23 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758238#comment-17758238
 ] 

Venkata krishnan Sowrirajan commented on FLINK-32940:
-

[~jark] Added the SQL and query plan output. This is happening in the master 
branch of flink.

My observations and attempt at fixing the issue:
 * Apply calcite's {{CoreRules.ProjectCorrelateTransposeRule}} to push project 
down to the {{Correlate}} `s inputs
 * Fix {{BatchPhysicalCorrelateRule}} to handle the new logical plan with 
{{projects}} pushed through {{Correlate}}

Adding CoreRules.ProjectCorrelateTransposeRule in FlinkBatchRuleSets. It fails 
with the below exception
{code:java}
Exception in thread "main" java.lang.RuntimeException: Error while applying 
rule BatchPhysicalCorrelateRule(in:LOGICAL,out:BATCH_PHYSICAL), args 
[rel#372:FlinkLogicalCorrelate.LOGICAL.any.[](left=RelSubset#368,right=RelSubset#371,correlation=$cor2,joinType=inner,requiredColumns={1})]
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250)
at 
org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523)
at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:318)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
at scala.collection.immutable.List.foreach(List.scala:388)
at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:324)
at 
org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:536)
at 
org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:115)
at 
org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:47)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:696)
at 
org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:482)
at org.apache.flink.table.api.Explainable.explain(Explainable.java:40)
at 
org.apache.flink.table.api.Explainable.printExplain(Explainable.java:57)
at 
com.linkedin.tracking3.flink.FlinkTest.runJoinUsingTableAPI(FlinkTest.java:129)
at com.linkedin.tracking3.flink.FlinkTest.main(FlinkTest.java:316)
Caused by: java.lang.NullPointerException
at org.apache.calcite.rex.RexProgram.expandLocalRef(RexProgram.java:549)
at 
org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalCorrelateRule.convertToCorrelate$1(BatchPhysicalCorrelateRule.scala:67)
at 
org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalCorrelateRule.convert(BatchPhysicalCorrelateRule.scala:80)
at 

[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF

2023-08-23 Thread Yunhong Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17757996#comment-17757996
 ] 

Yunhong Zheng commented on FLINK-32940:
---

Sure, [~jark] . Please assign to me. Thanks!

> Support projection pushdown to table source for column projections through 
> UDTF
> ---
>
> Key: FLINK-32940
> URL: https://issues.apache.org/jira/browse/FLINK-32940
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> Currently, Flink doesn't push down columns projected through UDTF like 
> _UNNEST_ to the table source.
> For eg:
> {code:java}
> select t1.name, t2.ename from DEPT_NESTED as t1, unnest(t1.employees) as 
> t2{code}
> For the above SQL, Flink projects all the columns for DEPT_NESTED rather than 
> only _name_ and {_}employees{_}. If the table source supports nested fields 
> column projection, ideally it should project only _t1.employees.ename_ from 
> the table source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF

2023-08-23 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17757993#comment-17757993
 ] 

Jark Wu commented on FLINK-32940:
-

[~337361...@qq.com] do you have time to help take a look?

> Support projection pushdown to table source for column projections through 
> UDTF
> ---
>
> Key: FLINK-32940
> URL: https://issues.apache.org/jira/browse/FLINK-32940
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> Currently, Flink doesn't push down columns projected through UDTF like 
> _UNNEST_ to the table source.
> For eg:
> {code:java}
> select t1.name, t2.ename from DEPT_NESTED as t1, unnest(t1.employees) as 
> t2{code}
> For the above SQL, Flink projects all the columns for DEPT_NESTED rather than 
> only _name_ and {_}employees{_}. If the table source supports nested fields 
> column projection, ideally it should project only _t1.employees.ename_ from 
> the table source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF

2023-08-23 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17757992#comment-17757992
 ] 

Jark Wu commented on FLINK-32940:
-

Hi [~vsowrirajan], thank you for reporting this problem. Which Flink version 
are you using? 

> Support projection pushdown to table source for column projections through 
> UDTF
> ---
>
> Key: FLINK-32940
> URL: https://issues.apache.org/jira/browse/FLINK-32940
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> Currently, Flink doesn't push down columns projected through UDTF like 
> _UNNEST_ to the table source.
> For eg:
> {code:java}
> select t1.name, t2.ename from DEPT_NESTED as t1, unnest(t1.employees) as 
> t2{code}
> For the above SQL, Flink projects all the columns for DEPT_NESTED rather than 
> only _name_ and {_}employees{_}. If the table source supports nested fields 
> column projection, ideally it should project only _t1.employees.ename_ from 
> the table source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)