[ 
https://issues.apache.org/jira/browse/FLINK-21345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-21345:
-----------------------------------
    Labels: pull-request-available stale-assigned  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 14, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it. If the "warning_label" label is not removed in 7 days, the 
issue will be automatically unassigned.


> NullPointerException 
> LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-21345
>                 URL: https://issues.apache.org/jira/browse/FLINK-21345
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.12.1
>         Environment: Planner: BlinkPlanner
> Flink Version: 1.12.1_2.11
> Java Version: 1.8
> OS: mac os
>            Reporter: lynn1.zhang
>            Assignee: lynn1.zhang
>            Priority: Minor
>              Labels: pull-request-available, stale-assigned
>             Fix For: 1.14.0
>
>         Attachments: image-2021-02-10-16-00-45-553.png
>
>
> First Step: Create 2 Source Tables as below:
> {code:java}
> CREATE TABLE test_streaming(
>  vid BIGINT,
>  ts BIGINT,
>  proc AS proctime()
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'test-streaming',
>  'properties.bootstrap.servers' = '127.0.0.1:9092',
>  'scan.startup.mode' = 'latest-offset',
>  'format' = 'json'
> );
> CREATE TABLE test_streaming2(
>  vid BIGINT,
>  ts BIGINT,
>  proc AS proctime()
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'test-streaming2',
>  'properties.bootstrap.servers' = '127.0.0.1:9092',
>  'scan.startup.mode' = 'latest-offset',
>  'format' = 'json'
> );
> {code}
> Second Step: Create a TEMPORARY Table Function, function name:dim, key:vid, 
> timestamp:proctime()
> Third Step: test_streaming union all  test_streaming2 join dim like below:
> {code:java}
> SELECT r.vid,d.name,timestamp_from_long(r.ts)
> FROM (
>     SELECT * FROM test_streaming UNION ALL SELECT * FROM test_streaming2
> ) AS r,
>     LATERAL TABLE (dim(r.proc)) AS d
> WHERE r.vid = d.vid;
> {code}
> Exception Detail: (if only use test-streaming or test-streaming2 join 
> temporary table function, the program run ok)
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>       at 
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.getRelOptSchema(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:157)
>       at 
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:99)
>       at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>       at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>       at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:742)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
>       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>       at scala.collection.immutable.Range.foreach(Range.scala:166)
>       at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
>       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:155)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:742)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:155)
>       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to