Hi all,
I'm using flink sql to join a temporal table in a subquery, but it raises java.lang.NullPointerException when execute. Orders is a table source, and Rates is a temporal table Here are my sqls: // works SELECT o_amount * r_amount AS amount FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency // sql raise exception SELECT o_amount * r_amount AS amount FROM (SELECT * FROM Orders) as Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency The error stack: Exception in thread "main" java.lang.NullPointerException at org.apache.flink.table.planner.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:167) at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:98) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:166) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:88) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:212) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:77) at org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:281) at org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:276) at cn.easyops.flink_sql.Test.main(Test.java:159) Here is the complete test code, hope anyone can help, thanks! package test.flinksql; import java.util.Random; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.Types; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.functions.TemporalTableFunction; import org.apache.flink.table.sinks.RetractStreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.DefinedProctimeAttribute; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.types.Row; public class TemporalTableFunctionTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); sEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv, bsSettings); tEnv.registerTableSource("RatesHistory", new FooSource(new String[] {"r_currency", "r_amount", "r_proctime"})); Table ratesHistory = tEnv.sqlQuery("SELECT * FROM RatesHistory"); TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency"); tEnv.registerFunction("Rates", rates); tEnv.registerTableSource("Orders", new FooSource(new String[] {"o_currency", "o_amount", "o_proctime"})); tEnv.registerTableSink("OutSink", new SysoSink()); // works Table prices = tEnv.sqlQuery( " SELECT \r\n" + " o_amount * r_amount AS amount \r\n" + " FROM Orders \r\n" + " , LATERAL TABLE (Rates(o_proctime)) \r\n" + " WHERE r_currency = o_currency "); // Raise NullPointerException //Table prices = tEnv.sqlQuery( // " SELECT \r\n" + // " o_amount * r_amount AS amount \r\n" + // " FROM (SELECT * FROM Orders) as O \r\n" + // " , LATERAL TABLE (Rates(o_proctime)) \r\n" + // " WHERE r_currency = o_currency "); prices.insertInto("OutSink"); sEnv.execute(); } public static class SysoSink implements RetractStreamTableSink<Row> { @Override public String[] getFieldNames() { return new String[] {"out"}; } @Override public TypeInformation<?>[] getFieldTypes() { return new TypeInformation[] {Types.LONG()}; } @Override public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { return this; } @Override public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) { consumeDataStream(dataStream); } @Override public TypeInformation<Row> getRecordType() { return Types.ROW(getFieldNames(), getFieldTypes()); } @Override public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) { return dataStream.addSink(new SysoSinkFunction<Tuple2<Boolean, Row>>()); } } @SuppressWarnings("serial") public static class SysoSinkFunction<T> implements SinkFunction<T> { @Override public void invoke(T value) throws Exception { System.out.println(value); } } public static class FooSource implements StreamTableSource<Row>, DefinedProctimeAttribute { String[] fieldNames; public FooSource(String[] fieldNames) { this.fieldNames = fieldNames; } @Override public TableSchema getTableSchema() { return new TableSchema(fieldNames, new TypeInformation[] {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()}); } @Override public TypeInformation<Row> getReturnType() { return Types.ROW(fieldNames, new TypeInformation[] {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()}); } @Override public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) { return execEnv.addSource(new SourceFunction<Row>() { @Override public void run(SourceContext<Row> ctx) throws Exception { Random random = new Random(); while (true) { Row row = new Row(3); row.setField(0, "Euro" + random.nextLong() % 3); row.setField(1, random.nextLong() % 200 ); row.setField(2, new java.sql.Timestamp(System.currentTimeMillis())); ctx.collect(row); Thread.sleep(100); } } @Override public void cancel() { System.out.println("cancelling ----------------------------------------------"); } }, getReturnType()); } @Override public String getProctimeAttribute() { return fieldNames[2]; } } }