Thanks for reporting this issue. I've pulled in Jark and Kurt who might help you with this problem.
Cheers, Till On Sat, Oct 12, 2019 at 5:42 PM hzp <hz...@qq.com> wrote: > Hi all, > > I'm using flink sql to join a temporal table in a subquery, but it raises > java.lang.NullPointerException when execute. > > Orders is a table source, and Rates is a temporal table > > Here are my sqls: > // works > SELECT o_amount * r_amount AS amount > FROM Orders, LATERAL TABLE (Rates(o_proctime)) > WHERE r_currency = o_currency > > // sql raise exception > SELECT o_amount * r_amount AS amount > FROM (SELECT * FROM Orders) as Orders, LATERAL TABLE (Rates(o_proctime)) > WHERE r_currency = o_currency > > The error stack: > Exception in thread "main" java.lang.NullPointerException > at > org.apache.flink.table.planner.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:167) > at > org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:98) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560) > at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419) > at > org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256) > at > org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) > at > org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215) > at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.immutable.Range.foreach(Range.scala:160) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:166) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:88) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:212) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:77) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:281) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:276) > at cn.easyops.flink_sql.Test.main(Test.java:159) > > > Here is the complete test code, hope anyone can help, thanks! > > package test.flinksql; > > import java.util.Random; > > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.datastream.DataStreamSink; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.sink.SinkFunction; > import org.apache.flink.streaming.api.functions.source.SourceFunction; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.TableSchema; > import org.apache.flink.table.api.Types; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.table.functions.TemporalTableFunction; > import org.apache.flink.table.sinks.RetractStreamTableSink; > import org.apache.flink.table.sinks.TableSink; > import org.apache.flink.table.sources.DefinedProctimeAttribute; > import org.apache.flink.table.sources.StreamTableSource; > import org.apache.flink.types.Row; > > public class TemporalTableFunctionTest { > > public static void main(String[] args) throws Exception { > > StreamExecutionEnvironment sEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > sEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv, > bsSettings); > > tEnv.registerTableSource("RatesHistory", new FooSource(new > String[] {"r_currency", "r_amount", "r_proctime"})); > Table ratesHistory = tEnv.sqlQuery("SELECT * FROM RatesHistory"); > TemporalTableFunction rates = > ratesHistory.createTemporalTableFunction("r_proctime", "r_currency"); > tEnv.registerFunction("Rates", rates); > > tEnv.registerTableSource("Orders", new FooSource(new String[] > {"o_currency", "o_amount", "o_proctime"})); > > tEnv.registerTableSink("OutSink", new SysoSink()); > > // works > Table prices = tEnv.sqlQuery( > " SELECT \r\n" + > " o_amount * r_amount AS amount \r\n" + > " FROM Orders \r\n" + > " , LATERAL TABLE (Rates(o_proctime)) \r\n" + > " WHERE r_currency = o_currency "); > > // Raise NullPointerException > //Table prices = tEnv.sqlQuery( > // " SELECT \r\n" + > // " o_amount * r_amount AS amount \r\n" + > // " FROM (SELECT * FROM Orders) as O > \r\n" + > // " , LATERAL TABLE (Rates(o_proctime)) \r\n" + > // " WHERE r_currency = o_currency "); > > prices.insertInto("OutSink"); > > sEnv.execute(); > } > > > public static class SysoSink implements RetractStreamTableSink<Row> { > @Override > public String[] getFieldNames() { > return new String[] {"out"}; > } > @Override > public TypeInformation<?>[] getFieldTypes() { > return new TypeInformation[] {Types.LONG()}; > } > @Override > public TableSink<Tuple2<Boolean, Row>> configure(String[] > fieldNames, TypeInformation<?>[] fieldTypes) { > return this; > } > @Override > public void emitDataStream(DataStream<Tuple2<Boolean, Row>> > dataStream) { > consumeDataStream(dataStream); > } > @Override > public TypeInformation<Row> getRecordType() { > return Types.ROW(getFieldNames(), getFieldTypes()); > } > @Override > public DataStreamSink<Tuple2<Boolean, Row>> > consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) { > return dataStream.addSink(new SysoSinkFunction<Tuple2<Boolean, > Row>>()); > } > } > > @SuppressWarnings("serial") > public static class SysoSinkFunction<T> implements SinkFunction<T> { > @Override > public void invoke(T value) throws Exception { > System.out.println(value); > } > } > > public static class FooSource implements StreamTableSource<Row>, > DefinedProctimeAttribute { > > String[] fieldNames; > > > public FooSource(String[] fieldNames) { > this.fieldNames = fieldNames; > } > > @Override > public TableSchema getTableSchema() { > return new TableSchema(fieldNames, new TypeInformation[] > {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()}); > } > > @Override > public TypeInformation<Row> getReturnType() { > return Types.ROW(fieldNames, new TypeInformation[] > {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()}); > } > > @Override > public DataStream<Row> getDataStream(StreamExecutionEnvironment > execEnv) { > return execEnv.addSource(new SourceFunction<Row>() { > > @Override > public void run(SourceContext<Row> ctx) throws Exception { > Random random = new Random(); > > while (true) { > > Row row = new Row(3); > row.setField(0, "Euro" + random.nextLong() % 3); > row.setField(1, random.nextLong() % 200 ); > row.setField(2, new > java.sql.Timestamp(System.currentTimeMillis())); > ctx.collect(row); > Thread.sleep(100); > } > > } > > @Override > public void cancel() { > System.out.println("cancelling > ----------------------------------------------"); > > } > }, getReturnType()); > } > > @Override > public String getProctimeAttribute() { > return fieldNames[2]; > } > } > > } > >