[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF
[ https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17808343#comment-17808343 ] Venkata krishnan Sowrirajan commented on FLINK-32940: - [~jeyhun] Thanks for the detailed response. Sorry for the delayed response. Yes, you're correct that _CoreRules.ProjectCorrelateTransposeRule_ is not enough and I results in the above error you mentioned and even after resolving it by other means it still won't work. I was actually taking the approach of extending the _CoreRules.ProjectCorrelateTransposeRule_ and overriding the _onMatch_ method. The issue I was facing there was Calcite doesn't support expressing nested fields on a collection type (Map or Array) for eg: arr.a.b where arr is an Array type. Nested fields are typically represented through _RexFieldAccess_ but it is not supported on a collection type (Array) holding a bunch of {_}structs{_}. I was trying to extend the _RexFieldAccess_ internally with in Flink that handles the collection case. The new _RexFieldAccess_ (say {_}FlinkRexFieldAccess{_}) would be used in the new rule to push the nested projections on the collection down. Additionally, it also requires the above adjustments to be done to make sure there are no dangling references once the plan is rewritten. Let me know your thoughts. Thanks! I got pulled in to other important things for the time being and not able to return to this problem. Hopefully, I will get some spare cycles in the coming weeks to address this issue. > Support projection pushdown to table source for column projections through > UDTF > --- > > Key: FLINK-32940 > URL: https://issues.apache.org/jira/browse/FLINK-32940 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > Currently, Flink doesn't push down columns projected through UDTF like > _UNNEST_ to the table source. > For eg: > {code:java} > SELECT t1.deptno, t2.ename FROM db.dept_nested t1, UNNEST(t1.employees) AS > t2{code} > For the above SQL, Flink projects all the columns for DEPT_NESTED rather than > only _name_ and {_}employees{_}. If the table source supports nested fields > column projection, ideally it should project only _t1.employees.ename_ from > the table source. > Query plan: > {code:java} > == Abstract Syntax Tree == > LogicalProject(deptno=[$0], ename=[$5]) > +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], > requiredColumns=[{3}]) > :- LogicalTableScan(table=[[hive_catalog, db, dept_nested]]) > +- Uncollect > +- LogicalProject(employees=[$cor1.employees]) > +- LogicalValues(tuples=[[{ 0 }]]){code} > {code:java} > == Optimized Physical Plan == > Calc(select=[deptno, ename]) > +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], > correlate=[table($UNNEST_ROWS$1($cor1.employees))], > select=[deptno,name,skillrecord,employees,empno,ename,skills], > rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, > RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) > desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) skillrecord, RecordType:peek_no_expand(BIGINT empno, > VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) > type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) > a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT > empno, VARCHAR(2147483647) ename, > RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, > RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) ARRAY skills)], joinType=[INNER]) > +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], > fields=[deptno, name, skillrecord, employees]){code} > {code:java} > == Optimized Execution Plan == > Calc(select=[deptno, ename]) > +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], > correlate=[table($UNNEST_ROWS$1($cor1.employees))], > select=[deptno,name,skillrecord,employees,empno,ename,skills], > rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, > RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) > desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) skillrecord, RecordType:peek_no_expand(BIGINT empno, > VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) > type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) > a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT > empno, VARCHAR(2147483647) ename, > RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, >
[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF
[ https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778478#comment-17778478 ] Jeyhun Karimov commented on FLINK-32940: Hi [~vsowrirajan] [~337361...@qq.com] [~lsy] [~jark], throwing my two cents here: Adding _CoreRules.ProjectCorrelateTransposeRule_ is not enough to solve the problem because of several reasons: * Calcite will add two projections (one for the left and one for the right input) [1]. Sometimes some of these projections can be no-op (e.g, without expressions). This will cause null reference error in _BatchPhysicalCorrelateRule.scala: 67_ (_Some(calc.getProgram.expandLocalRef(calc.getProgram.getCondition))_). That is why probably you get this error. * However, solving the above issue is probably not enough to get this rule working, mainly because how _CoreRules.ProjectCorrelateTransposeRule_ works. Basically, this rule pushes down projects, without further handling/correcting the references (e.g., LogicalTableFunctionScan will have stale function invocation expression - getCall()). As a result, LogicalTableFunctionScan will try to access some field, however this field is already projected by Calcite rule (there is a LogicalProject operator(s) on top). * The above issue will get even complicated, when there are more operators (e.g., filters and projections) which has dangling references after Calcite rule is applied or many nested fields are accessed (this will result in LogicalCorrelate operators nested in each other) About solution, IMO we should either: # Create a rule that inherits from _CoreRules.ProjectCorrelateTransposeRule_ and overrides its _onMatch_ method. We should gracefully handle the downstream tree of operators when pushing down projections down to the LogicalCorrelate. # Alternatively, we can use _CoreRules.ProjectCorrelateTransposeRule_ and our own rule to match {code:java} +- LogicalCorrelate :- LogicalProject {code} We cannot force matching LogicalTableFunctionScan or LogicalTableScan because dangling references can be in anywhere of the query plan. We need to 1) find all RexCall fields of LogicalTableFunctionScan, 2) check if they exists after projection pushdown, 3) if not, find to which [new] project expressions they correspond, and 4) rewire them. This potentially requires to rewrite expressions thought the whole query plan until the leaf node. Also, we do not need to merge LogicalProject and LogicalTableScan as part of this rule, since other rules will already do it. What do you guys think? [1] https://github.com/apache/calcite/blob/c83ac69111fd9e75af5e3615af29a72284667a4a/core/src/main/java/org/apache/calcite/rel/rules/ProjectCorrelateTransposeRule.java#L126 > Support projection pushdown to table source for column projections through > UDTF > --- > > Key: FLINK-32940 > URL: https://issues.apache.org/jira/browse/FLINK-32940 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > Currently, Flink doesn't push down columns projected through UDTF like > _UNNEST_ to the table source. > For eg: > {code:java} > SELECT t1.deptno, t2.ename FROM db.dept_nested t1, UNNEST(t1.employees) AS > t2{code} > For the above SQL, Flink projects all the columns for DEPT_NESTED rather than > only _name_ and {_}employees{_}. If the table source supports nested fields > column projection, ideally it should project only _t1.employees.ename_ from > the table source. > Query plan: > {code:java} > == Abstract Syntax Tree == > LogicalProject(deptno=[$0], ename=[$5]) > +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], > requiredColumns=[{3}]) > :- LogicalTableScan(table=[[hive_catalog, db, dept_nested]]) > +- Uncollect > +- LogicalProject(employees=[$cor1.employees]) > +- LogicalValues(tuples=[[{ 0 }]]){code} > {code:java} > == Optimized Physical Plan == > Calc(select=[deptno, ename]) > +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], > correlate=[table($UNNEST_ROWS$1($cor1.employees))], > select=[deptno,name,skillrecord,employees,empno,ename,skills], > rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, > RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) > desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) skillrecord, RecordType:peek_no_expand(BIGINT empno, > VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) > type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) > a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT > empno, VARCHAR(2147483647) ename, > RecordType:peek_no_expand(VARCHAR(2147483647) type,
[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF
[ https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17759447#comment-17759447 ] Venkata krishnan Sowrirajan commented on FLINK-32940: - Thanks [~337361...@qq.com] and [~lsy] . [~337361...@qq.com] , it makes sense to me. To summarize: # Apply _CoreRules.ProjectCorrelateTransposeRule_ to {_}FlinkBatchRuleSets{_}. But this only pushes the projects that are referenced by the children of _Correlate_ and not the other projects that needs to be pushed to the TableScan. # Introduce another rule that pushes the projects to the TableScan but only if the _Correlate_ is on a known UDTF like {_}UNNEST{_}. Otherwise, for user defined UDTFs we won't know for sure whether it is safe to push to the _TableScan_ or not. What do you think? I'm still thinking about how to write this rule so that it pushes all the projects to the TableScan operator. # Merge the _LogicalProject_ and _LogicalTableFunctionScan_ to LogicalTableFunctionScan. > Support projection pushdown to table source for column projections through > UDTF > --- > > Key: FLINK-32940 > URL: https://issues.apache.org/jira/browse/FLINK-32940 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > Currently, Flink doesn't push down columns projected through UDTF like > _UNNEST_ to the table source. > For eg: > {code:java} > SELECT t1.deptno, t2.ename FROM db.dept_nested t1, UNNEST(t1.employees) AS > t2{code} > For the above SQL, Flink projects all the columns for DEPT_NESTED rather than > only _name_ and {_}employees{_}. If the table source supports nested fields > column projection, ideally it should project only _t1.employees.ename_ from > the table source. > Query plan: > {code:java} > == Abstract Syntax Tree == > LogicalProject(deptno=[$0], ename=[$5]) > +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], > requiredColumns=[{3}]) > :- LogicalTableScan(table=[[hive_catalog, db, dept_nested]]) > +- Uncollect > +- LogicalProject(employees=[$cor1.employees]) > +- LogicalValues(tuples=[[{ 0 }]]){code} > {code:java} > == Optimized Physical Plan == > Calc(select=[deptno, ename]) > +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], > correlate=[table($UNNEST_ROWS$1($cor1.employees))], > select=[deptno,name,skillrecord,employees,empno,ename,skills], > rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, > RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) > desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) skillrecord, RecordType:peek_no_expand(BIGINT empno, > VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) > type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) > a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT > empno, VARCHAR(2147483647) ename, > RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, > RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) ARRAY skills)], joinType=[INNER]) > +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], > fields=[deptno, name, skillrecord, employees]){code} > {code:java} > == Optimized Execution Plan == > Calc(select=[deptno, ename]) > +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], > correlate=[table($UNNEST_ROWS$1($cor1.employees))], > select=[deptno,name,skillrecord,employees,empno,ename,skills], > rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, > RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) > desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) skillrecord, RecordType:peek_no_expand(BIGINT empno, > VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) > type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) > a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT > empno, VARCHAR(2147483647) ename, > RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, > RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) ARRAY skills)], joinType=[INNER]) > +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], > fields=[deptno, name, skillrecord, employees]) {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF
[ https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758824#comment-17758824 ] Yunhong Zheng commented on FLINK-32940: --- Hi, [~vsowrirajan] . I think your idea is good, but the biggest problem currently is how to pass the column cropping condition of LogicalTableFunctionScan to LogicalTableScan and rewrite LogicalTableFunctionScan. So I think we need to add a rule in project_rewrite stage. # Actually, first we need add rule CoreRules.ProjectCorrelateTransposeRule in FlinkBatchRuleSets to push project into LogicalCorrelate. # And we need add a rule in project_rewrite stage to pass by this project into LogicalTableScan side and rewrite LogicalTableFunctionScan. # For this npe problem, you can add if logical to avoid it. Adding one example to explain step: 2 for this ddl {code:java} String ddl = "CREATE TABLE NestedItemTable1 (\n" + " `deptno` INT,\n" + " `employees` MAP\n" + ") WITH (\n" + " 'connector' = 'values',\n" + " 'nested-projection-supported' = 'true'," + " 'bounded' = 'true'\n" + ")"; util.tableEnv().executeSql(ddl); util.verifyRelPlan( "SELECT t1.deptno, k FROM NestedItemTable1 t1, UNNEST(t1.employees) as f(k, v)");{code} we will get the below plan after add CoreRules.ProjectCorrelateTransposeRule: {code:java} optimize project_rewrite cost 413675 ms. optimize result: LogicalProject(inputs=[0], exprs=[[$2]]) +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{1}]) :- LogicalTableScan(table=[[default_catalog, default_database, NestedItemTable1]]) +- LogicalProject(inputs=[0]) +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.employees)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, VARCHAR(2147483647) f1)]){code} I think for this pattern, we need add a new rule to match this.: {code:java} +- LogicalCorrelate :- LogicalTableScan +- LogicalProject +- LogicalTableFunctionScan{code} In this rule, we first need to create a new LogicalTableFunctionScan after merge LogicalProject and LogicalTableFunctionScan. second, we need add a new LogicalProject for LogicalTableScan, which will be push down to LogicalTableScan in logical stage. IMO, the new plan after match this rule will be (just an example, not correct plan): {code:java} LogicalProject(inputs=[0], exprs=[[$2]]) +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{1}]) :- LogicalProject(inputs=[employees.k]) +- LogicalTableScan(table=[[default_catalog, default_database, NestedItemTable1]]) :- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.employees.k)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)]){code} WDYT? [~vsowrirajan]. Once the solution is determined and u complete the development, you can ping me to review it. > Support projection pushdown to table source for column projections through > UDTF > --- > > Key: FLINK-32940 > URL: https://issues.apache.org/jira/browse/FLINK-32940 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > Currently, Flink doesn't push down columns projected through UDTF like > _UNNEST_ to the table source. > For eg: > {code:java} > SELECT t1.deptno, t2.ename FROM db.dept_nested t1, UNNEST(t1.employees) AS > t2{code} > For the above SQL, Flink projects all the columns for DEPT_NESTED rather than > only _name_ and {_}employees{_}. If the table source supports nested fields > column projection, ideally it should project only _t1.employees.ename_ from > the table source. > Query plan: > {code:java} > == Abstract Syntax Tree == > LogicalProject(deptno=[$0], ename=[$5]) > +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], > requiredColumns=[{3}]) > :- LogicalTableScan(table=[[hive_catalog, db, dept_nested]]) > +- Uncollect > +- LogicalProject(employees=[$cor1.employees]) > +- LogicalValues(tuples=[[{ 0 }]]){code} > {code:java} > == Optimized Physical Plan == > Calc(select=[deptno, ename]) > +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], > correlate=[table($UNNEST_ROWS$1($cor1.employees))], > select=[deptno,name,skillrecord,employees,empno,ename,skills], > rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, > RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) > desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) skillrecord, RecordType:peek_no_expand(BIGINT empno, > VARCHAR(2147483647) ename,
[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF
[ https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758789#comment-17758789 ] dalongliu commented on FLINK-32940: --- [~vsowrirajan] Your attempt look goods to me > Support projection pushdown to table source for column projections through > UDTF > --- > > Key: FLINK-32940 > URL: https://issues.apache.org/jira/browse/FLINK-32940 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > Currently, Flink doesn't push down columns projected through UDTF like > _UNNEST_ to the table source. > For eg: > {code:java} > SELECT t1.deptno, t2.ename FROM db.dept_nested t1, UNNEST(t1.employees) AS > t2{code} > For the above SQL, Flink projects all the columns for DEPT_NESTED rather than > only _name_ and {_}employees{_}. If the table source supports nested fields > column projection, ideally it should project only _t1.employees.ename_ from > the table source. > Query plan: > {code:java} > == Abstract Syntax Tree == > LogicalProject(deptno=[$0], ename=[$5]) > +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], > requiredColumns=[{3}]) > :- LogicalTableScan(table=[[hive_catalog, db, dept_nested]]) > +- Uncollect > +- LogicalProject(employees=[$cor1.employees]) > +- LogicalValues(tuples=[[{ 0 }]]){code} > {code:java} > == Optimized Physical Plan == > Calc(select=[deptno, ename]) > +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], > correlate=[table($UNNEST_ROWS$1($cor1.employees))], > select=[deptno,name,skillrecord,employees,empno,ename,skills], > rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, > RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) > desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) skillrecord, RecordType:peek_no_expand(BIGINT empno, > VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) > type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) > a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT > empno, VARCHAR(2147483647) ename, > RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, > RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) ARRAY skills)], joinType=[INNER]) > +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], > fields=[deptno, name, skillrecord, employees]){code} > {code:java} > == Optimized Execution Plan == > Calc(select=[deptno, ename]) > +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)], > correlate=[table($UNNEST_ROWS$1($cor1.employees))], > select=[deptno,name,skillrecord,employees,empno,ename,skills], > rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name, > RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647) > desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) skillrecord, RecordType:peek_no_expand(BIGINT empno, > VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647) > type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647) > a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT > empno, VARCHAR(2147483647) ename, > RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc, > RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b) > others) ARRAY skills)], joinType=[INNER]) > +- TableSourceScan(table=[[hive_catalog, db, dept_nested]], > fields=[deptno, name, skillrecord, employees]) {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF
[ https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758238#comment-17758238 ] Venkata krishnan Sowrirajan commented on FLINK-32940: - [~jark] Added the SQL and query plan output. This is happening in the master branch of flink. My observations and attempt at fixing the issue: * Apply calcite's {{CoreRules.ProjectCorrelateTransposeRule}} to push project down to the {{Correlate}} `s inputs * Fix {{BatchPhysicalCorrelateRule}} to handle the new logical plan with {{projects}} pushed through {{Correlate}} Adding CoreRules.ProjectCorrelateTransposeRule in FlinkBatchRuleSets. It fails with the below exception {code:java} Exception in thread "main" java.lang.RuntimeException: Error while applying rule BatchPhysicalCorrelateRule(in:LOGICAL,out:BATCH_PHYSICAL), args [rel#372:FlinkLogicalCorrelate.LOGICAL.any.[](left=RelSubset#368,right=RelSubset#371,correlation=$cor2,joinType=inner,requiredColumns={1})] at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250) at org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:318) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45) at scala.collection.immutable.List.foreach(List.scala:388) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:324) at org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:536) at org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:115) at org.apache.flink.table.planner.delegation.BatchPlanner.explain(BatchPlanner.scala:47) at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:696) at org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:482) at org.apache.flink.table.api.Explainable.explain(Explainable.java:40) at org.apache.flink.table.api.Explainable.printExplain(Explainable.java:57) at com.linkedin.tracking3.flink.FlinkTest.runJoinUsingTableAPI(FlinkTest.java:129) at com.linkedin.tracking3.flink.FlinkTest.main(FlinkTest.java:316) Caused by: java.lang.NullPointerException at org.apache.calcite.rex.RexProgram.expandLocalRef(RexProgram.java:549) at org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalCorrelateRule.convertToCorrelate$1(BatchPhysicalCorrelateRule.scala:67) at org.apache.flink.table.planner.plan.rules.physical.batch.BatchPhysicalCorrelateRule.convert(BatchPhysicalCorrelateRule.scala:80) at
[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF
[ https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17757996#comment-17757996 ] Yunhong Zheng commented on FLINK-32940: --- Sure, [~jark] . Please assign to me. Thanks! > Support projection pushdown to table source for column projections through > UDTF > --- > > Key: FLINK-32940 > URL: https://issues.apache.org/jira/browse/FLINK-32940 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > Currently, Flink doesn't push down columns projected through UDTF like > _UNNEST_ to the table source. > For eg: > {code:java} > select t1.name, t2.ename from DEPT_NESTED as t1, unnest(t1.employees) as > t2{code} > For the above SQL, Flink projects all the columns for DEPT_NESTED rather than > only _name_ and {_}employees{_}. If the table source supports nested fields > column projection, ideally it should project only _t1.employees.ename_ from > the table source. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF
[ https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17757993#comment-17757993 ] Jark Wu commented on FLINK-32940: - [~337361...@qq.com] do you have time to help take a look? > Support projection pushdown to table source for column projections through > UDTF > --- > > Key: FLINK-32940 > URL: https://issues.apache.org/jira/browse/FLINK-32940 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > Currently, Flink doesn't push down columns projected through UDTF like > _UNNEST_ to the table source. > For eg: > {code:java} > select t1.name, t2.ename from DEPT_NESTED as t1, unnest(t1.employees) as > t2{code} > For the above SQL, Flink projects all the columns for DEPT_NESTED rather than > only _name_ and {_}employees{_}. If the table source supports nested fields > column projection, ideally it should project only _t1.employees.ename_ from > the table source. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32940) Support projection pushdown to table source for column projections through UDTF
[ https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17757992#comment-17757992 ] Jark Wu commented on FLINK-32940: - Hi [~vsowrirajan], thank you for reporting this problem. Which Flink version are you using? > Support projection pushdown to table source for column projections through > UDTF > --- > > Key: FLINK-32940 > URL: https://issues.apache.org/jira/browse/FLINK-32940 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Venkata krishnan Sowrirajan >Priority: Major > > Currently, Flink doesn't push down columns projected through UDTF like > _UNNEST_ to the table source. > For eg: > {code:java} > select t1.name, t2.ename from DEPT_NESTED as t1, unnest(t1.employees) as > t2{code} > For the above SQL, Flink projects all the columns for DEPT_NESTED rather than > only _name_ and {_}employees{_}. If the table source supports nested fields > column projection, ideally it should project only _t1.employees.ename_ from > the table source. -- This message was sent by Atlassian Jira (v8.20.10#820010)