lincoln.lee created FLINK-7942:
----------------------------------

             Summary: NPE when apply FilterJoinRule
                 Key: FLINK-7942
                 URL: https://issues.apache.org/jira/browse/FLINK-7942
             Project: Flink
          Issue Type: Bug
            Reporter: lincoln.lee
            Assignee: lincoln.lee


Test case *testFilterRule1* fails due to a NPE 
{code}
java.lang.RuntimeException: Error while applying rule 
FilterJoinRule:FilterJoinRule:filter, args 
[rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
 
AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
 $3), 'c0')), 'c1'), 0)), 
rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
 $2),joinType=left)]

        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
        at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
        at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
        at 
org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
        at 
org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
        at 
org.apache.flink.table.api.batch.table.FilterRuleITCase.testFilterRule1(FilterRuleTest.scala:63)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at 
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
        at org.junit.rules.RunRules.evaluate(RunRules.java:20)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
        at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
        at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
        at 
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.NullPointerException
        at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
        at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
        at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
        at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
        at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
        at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
        at 
org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
        at 
org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
{code}
but *testFilterRule2* works which has the same query written in SQL.

{code}
class FilterRuleTest extends TableTestBase {

  @Test
  def testFilterRule1(): Unit = {
    val util = batchTestUtil()
    val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
    val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
    val results = t1
      .leftOuterJoin(t2, 'b === 'e)
      .select('c, Merger('c, 'f) as 'c0)
      .select(Merger('c, 'c0) as 'c1)
      .where('c1 >= 0)

    val expected = unaryNode(
      "DataSetCalc",
      binaryNode(
        "DataSetJoin",
        unaryNode(
          "DataSetCalc",
          batchTableNode(0),
          term("select", "b", "c")
        ),
        unaryNode(
          "DataSetCalc",
          batchTableNode(1),
          term("select", "e", "f")
        ),
        term("where", "=(b, e)"),
        term("join", "b", "c", "e", "f"),
        term("joinType", "LeftOuterJoin")
      ),
      term("select", "Merger$(c, Merger$(c, f)) AS c1"),
      term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
    )

    util.verifyTable(results, expected)
  }

  @Test
  def testFilterRule2(): Unit = {
    val util = batchTestUtil()
    util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
    util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
    util.tableEnv.registerFunction("udf_test", Merger)

    val sql =
      s"""
         |select c1
         |from (
         |  select udf_test(c, c0) as c1
         |  from (
         |    select c, udf_test(b, c) as c0
         |      from
         |      (select a, b, c
         |        from T1
         |        left outer join T2
         |        on T1.b = T2.e
         |      ) tmp
         |  ) tmp1
         |) tmp2
         |where c1 >= 0
       """.stripMargin

    val results = util.tableEnv.sqlQuery(sql)
    val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) \n" 
+
      "DataSetJoin(where=[=(b, e)], join=[b, c, e], 
joinType=[LeftOuterJoin])\n" +
      "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 
0)])\n" +
      "DataSetScan(table=[[_DataSetTable_0]])\n" +
      "DataSetCalc(select=[e])\n" +
      "DataSetScan(table=[[_DataSetTable_1]])"
    util.verifyTable(results, expected)
  }
}

object Merger extends ScalarFunction {
  def eval(f0: Int, f1: Int): Int = {
    f0 + f1
  }
}
{code}

A simple way to fix this is to change the calcite class {code} 
org.apache.calcite.plan.Strong{code}
add an additional entry to the EnumMap in createPolicyMap method:
{code}map.put(SqlKind.AS, Policy.AS_IS);{code}
Either copy to Flink package and modify it  or using reflection somewhere.

I'm not sure if there exists other issues like this one since not all the types 
in SQLKind included in the Strong.MAP.
[~fhueske]  [~twalthr] any ideas?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to