lynn1.zhang created FLINK-21345:
---
Summary: NullPointerException
LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
Key: FLINK-21345
URL: https://issues.apache.org/jira/browse/FLINK-21345
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.12.1
Environment: Planner: BlinkPlanner
Flink Version: 1.12.1_2.11
Java Version: 1.8
OS: mac os
Reporter: lynn1.zhang
First Step: Create 2 Source Tables as below:
{code:java}
CREATE TABLE test_streaming(
vid BIGINT,
ts BIGINT,
proc AS proctime()
) WITH (
'connector' = 'kafka',
'topic' = 'test-streaming',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE test_streaming2(
vid BIGINT,
ts BIGINT,
proc AS proctime()
) WITH (
'connector' = 'kafka',
'topic' = 'test-streaming2',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
{code}
Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid,
timestamp:proctime()
Third Step, test_streaming union all test_streaming2 join dim like below:
{code:java}
SELECT r.vid,d.name,timestamp_from_long(r.ts)
FROM (
SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2
) AS r,
LATERAL TABLE (dim(r.proc)) AS d
WHERE r.vid = d.vid;
{code}
Exception Detail:(if not union all, the program run ok)
{code:java}
Exception in thread "main" java.lang.NullPointerException
at
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157)
at
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99)
at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
at
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
at scala.collection.immutable.Range.foreach(Range.scala:166)
at
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at