[ https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17282291#comment-17282291 ]
lynn1.zhang edited comment on FLINK-21345 at 2/10/21, 8:51 AM: --------------------------------------------------------------- [~jark] [~Leonard Xu] !image-2021-02-10-16-00-45-553.png! I add the code of the method getRelOptSchema in LogicalCorrelateToJoinFromTemporalTableFunctionRule, It seems run ok. Can I create a merge request to fix the issue? was (Author: zicat): [~jark] [~Leonard Xu] !image-2021-02-10-16-00-45-553.png! I add the code of the method getRelOptSchema in LogicalCorrelateToJoinFromTemporalTableFunctionRule, It seems run ok. Can I create a merge request to fix the issue. > NullPointerException > LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157 > ---------------------------------------------------------------------------------- > > Key: FLINK-21345 > URL: https://issues.apache.org/jira/browse/FLINK-21345 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.12.1 > Environment: Planner: BlinkPlanner > Flink Version: 1.12.1_2.11 > Java Version: 1.8 > OS: mac os > Reporter: lynn1.zhang > Priority: Major > Attachments: image-2021-02-10-16-00-45-553.png > > > First Step: Create 2 Source Tables as below: > {code:java} > CREATE TABLE test_streaming( > vid BIGINT, > ts BIGINT, > proc AS proctime() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'test-streaming', > 'properties.bootstrap.servers' = '127.0.0.1:9092', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ); > CREATE TABLE test_streaming2( > vid BIGINT, > ts BIGINT, > proc AS proctime() > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'test-streaming2', > 'properties.bootstrap.servers' = '127.0.0.1:9092', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ); > {code} > Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, > timestamp:proctime() > Third Step: test_streaming union all test_streaming2 join dim like below: > {code:java} > SELECT r.vid,d.name,timestamp_from_long(r.ts) > FROM ( > SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2 > ) AS r, > LATERAL TABLE (dim(r.proc)) AS d > WHERE r.vid = d.vid; > {code} > Exception Detail: (if only use test-streaming or test-streaming2 join > temporary table function, the program run ok) > {code:java} > Exception in thread "main" java.lang.NullPointerException > at > org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157) > at > org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) > at > org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407) > at > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) > at > org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) > at > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202) > at > org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at scala.collection.immutable.Range.foreach(Range.scala:166) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57){code} -- This message was sent by Atlassian Jira (v8.3.4#803005)