lynn1.zhang created FLINK-21345:
-----------------------------------

             Summary: NullPointerException 
LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
                 Key: FLINK-21345
                 URL: https://issues.apache.org/jira/browse/FLINK-21345
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.12.1
         Environment: Planner: BlinkPlanner
Flink Version: 1.12.1_2.11
Java Version: 1.8
OS: mac os
            Reporter: lynn1.zhang


First Step: Create 2 Source Tables as below:
{code:java}
CREATE TABLE test_streaming(
 vid BIGINT,
 ts BIGINT,
 proc AS proctime()
) WITH (
 'connector' = 'kafka',
 'topic' = 'test-streaming',
 'properties.bootstrap.servers' = '127.0.0.1:9092',
 'scan.startup.mode' = 'latest-offset',
 'format' = 'json'
);
CREATE TABLE test_streaming2(
 vid BIGINT,
 ts BIGINT,
 proc AS proctime()
) WITH (
 'connector' = 'kafka',
 'topic' = 'test-streaming2',
 'properties.bootstrap.servers' = '127.0.0.1:9092',
 'scan.startup.mode' = 'latest-offset',
 'format' = 'json'
);
{code}
Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, 
timestamp:proctime()

Third Step, test_streaming union all  test_streaming2 join dim like below:

 
{code:java}
SELECT r.vid,d.name,timestamp_from_long(r.ts)
FROM (
    SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2
) AS r,
    LATERAL TABLE (dim(r.proc)) AS d
WHERE r.vid = d.vid;
{code}
Exception Detail:(if not union all, the program run ok)

 
{code:java}
Exception in thread "main" java.lang.NullPointerException
        at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157)
        at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99)
        at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
        at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
        at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
        at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
        at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
        at scala.collection.Iterator$class.foreach(Iterator.scala:742)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
        at scala.collection.immutable.Range.foreach(Range.scala:166)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
        at scala.collection.Iterator$class.foreach(Iterator.scala:742)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:286)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
        at 
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:98)Exception
 in thread "main" java.lang.NullPointerException
        at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157)
        at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99)
        at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
        at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
        at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
        at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
        at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
        at scala.collection.Iterator$class.foreach(Iterator.scala:742)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
        at scala.collection.immutable.Range.foreach(Range.scala:166)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
        at scala.collection.Iterator$class.foreach(Iterator.scala:742)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:286)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
        at 
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:98){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to