lincoln.lee created FLINK-7942: ---------------------------------- Summary: NPE when apply FilterJoinRule Key: FLINK-7942 URL: https://issues.apache.org/jira/browse/FLINK-7942 Project: Flink Issue Type: Bug Reporter: lincoln.lee Assignee: lincoln.lee
Test case *testFilterRule1* fails due to a NPE {code} java.lang.RuntimeException: Error while applying rule FilterJoinRule:FilterJoinRule:filter, args [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1, AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1, $3), 'c0')), 'c1'), 0)), rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0, $2),joinType=left)] at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270) at org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347) at org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186) at org.apache.flink.table.api.batch.table.FilterRuleITCase.testFilterRule1(FilterRuleTest.scala:63) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) Caused by: java.lang.NullPointerException at org.apache.calcite.plan.Strong.isNull(Strong.java:110) at org.apache.calcite.plan.Strong.anyNull(Strong.java:166) at org.apache.calcite.plan.Strong.isNull(Strong.java:114) at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99) at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84) at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354) at org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149) at org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212) {code} but *testFilterRule2* works which has the same query written in SQL. {code} class FilterRuleTest extends TableTestBase { @Test def testFilterRule1(): Unit = { val util = batchTestUtil() val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c) val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f) val results = t1 .leftOuterJoin(t2, 'b === 'e) .select('c, Merger('c, 'f) as 'c0) .select(Merger('c, 'c0) as 'c1) .where('c1 >= 0) val expected = unaryNode( "DataSetCalc", binaryNode( "DataSetJoin", unaryNode( "DataSetCalc", batchTableNode(0), term("select", "b", "c") ), unaryNode( "DataSetCalc", batchTableNode(1), term("select", "e", "f") ), term("where", "=(b, e)"), term("join", "b", "c", "e", "f"), term("joinType", "LeftOuterJoin") ), term("select", "Merger$(c, Merger$(c, f)) AS c1"), term("where", ">=(Merger$(c, Merger$(c, f)), 0)") ) util.verifyTable(results, expected) } @Test def testFilterRule2(): Unit = { val util = batchTestUtil() util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c) util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f) util.tableEnv.registerFunction("udf_test", Merger) val sql = s""" |select c1 |from ( | select udf_test(c, c0) as c1 | from ( | select c, udf_test(b, c) as c0 | from | (select a, b, c | from T1 | left outer join T2 | on T1.b = T2.e | ) tmp | ) tmp1 |) tmp2 |where c1 >= 0 """.stripMargin val results = util.tableEnv.sqlQuery(sql) val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) \n" + "DataSetJoin(where=[=(b, e)], join=[b, c, e], joinType=[LeftOuterJoin])\n" + "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 0)])\n" + "DataSetScan(table=[[_DataSetTable_0]])\n" + "DataSetCalc(select=[e])\n" + "DataSetScan(table=[[_DataSetTable_1]])" util.verifyTable(results, expected) } } object Merger extends ScalarFunction { def eval(f0: Int, f1: Int): Int = { f0 + f1 } } {code} A simple way to fix this is to change the calcite class {code} org.apache.calcite.plan.Strong{code} add an additional entry to the EnumMap in createPolicyMap method: {code}map.put(SqlKind.AS, Policy.AS_IS);{code} Either copy to Flink package and modify it or using reflection somewhere. I'm not sure if there exists other issues like this one since not all the types in SQLKind included in the Strong.MAP. [~fhueske] [~twalthr] any ideas? -- This message was sent by Atlassian JIRA (v6.4.14#64029)