[ https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16235700#comment-16235700 ]
Fabian Hueske commented on FLINK-7942: -------------------------------------- Julian's response on CALCITE-2023 suggests to generate {{RexNode}} expressions without {{AS}} nodes. Maybe we can fix this in the Table API {{Expression}} -> {{RexNode}} translation. > NPE when apply FilterJoinRule > ----------------------------- > > Key: FLINK-7942 > URL: https://issues.apache.org/jira/browse/FLINK-7942 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Reporter: lincoln.lee > Assignee: lincoln.lee > Priority: Major > > Test case *testFilterRule1* fails due to a NPE > {code} > java.lang.RuntimeException: Error while applying rule > FilterJoinRule:FilterJoinRule:filter, args > [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1, > > AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1, > $3), 'c0')), 'c1'), 0)), > rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0, > $2),joinType=left)] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270) > at > org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347) > at > org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186) > testFilterRule1(FilterRuleTest.scala:63) > Caused by: java.lang.NullPointerException > at org.apache.calcite.plan.Strong.isNull(Strong.java:110) > at org.apache.calcite.plan.Strong.anyNull(Strong.java:166) > at org.apache.calcite.plan.Strong.isNull(Strong.java:114) > at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99) > at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84) > at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354) > at > org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149) > at > org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212) > {code} > but *testFilterRule2* works which has the same query written in SQL. > {code} > class FilterRuleTest extends TableTestBase { > @Test > def testFilterRule1(): Unit = { > val util = batchTestUtil() > val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c) > val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f) > val results = t1 > .leftOuterJoin(t2, 'b === 'e) > .select('c, Merger('c, 'f) as 'c0) > .select(Merger('c, 'c0) as 'c1) > .where('c1 >= 0) > val expected = unaryNode( > "DataSetCalc", > binaryNode( > "DataSetJoin", > unaryNode( > "DataSetCalc", > batchTableNode(0), > term("select", "b", "c") > ), > unaryNode( > "DataSetCalc", > batchTableNode(1), > term("select", "e", "f") > ), > term("where", "=(b, e)"), > term("join", "b", "c", "e", "f"), > term("joinType", "LeftOuterJoin") > ), > term("select", "Merger$(c, Merger$(c, f)) AS c1"), > term("where", ">=(Merger$(c, Merger$(c, f)), 0)") > ) > util.verifyTable(results, expected) > } > @Test > def testFilterRule2(): Unit = { > val util = batchTestUtil() > util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c) > util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f) > util.tableEnv.registerFunction("udf_test", Merger) > val sql = > s""" > |select c1 > |from ( > | select udf_test(c, c0) as c1 > | from ( > | select c, udf_test(b, c) as c0 > | from > | (select a, b, c > | from T1 > | left outer join T2 > | on T1.b = T2.e > | ) tmp > | ) tmp1 > |) tmp2 > |where c1 >= 0 > """.stripMargin > val results = util.tableEnv.sqlQuery(sql) > val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) > \n" + > "DataSetJoin(where=[=(b, e)], join=[b, c, e], > joinType=[LeftOuterJoin])\n" + > "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), > 0)])\n" + > "DataSetScan(table=[[_DataSetTable_0]])\n" + > "DataSetCalc(select=[e])\n" + > "DataSetScan(table=[[_DataSetTable_1]])" > util.verifyTable(results, expected) > } > } > object Merger extends ScalarFunction { > def eval(f0: Int, f1: Int): Int = { > f0 + f1 > } > } > {code} > A simple way to fix this is to change the calcite class {code} > org.apache.calcite.plan.Strong{code} > add an additional entry to the EnumMap in createPolicyMap method: > {code}map.put(SqlKind.AS, Policy.AS_IS);{code} > Either copy to Flink package and modify it or using reflection somewhere. > I'm not sure if there exists other issues like this one since not all the > types in SQLKind included in the Strong.MAP. > @[~fhueske] @[~twalthr] any ideas? -- This message was sent by Atlassian JIRA (v6.4.14#64029)