Thanks for reporting this issue. I've pulled in Jark and Kurt who might
help you with this problem.

Cheers,
Till

On Sat, Oct 12, 2019 at 5:42 PM hzp <hz...@qq.com> wrote:

> Hi all,
>
> I'm using flink sql to join a temporal table in a subquery, but it raises
> java.lang.NullPointerException when execute.
>
> Orders is a table source, and Rates is a temporal table
>
> Here are my sqls:
> // works
> SELECT o_amount * r_amount AS amount
> FROM Orders, LATERAL TABLE (Rates(o_proctime))
> WHERE r_currency = o_currency
>
> // sql raise exception
> SELECT o_amount * r_amount AS amount
> FROM (SELECT * FROM Orders) as Orders, LATERAL TABLE (Rates(o_proctime))
> WHERE r_currency = o_currency
>
> The error stack:
> Exception in thread "main" java.lang.NullPointerException
> at
> org.apache.flink.table.planner.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:167)
> at
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:98)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
> at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.immutable.Range.foreach(Range.scala:160)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:166)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:88)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:212)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:77)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:281)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:276)
> at cn.easyops.flink_sql.Test.main(Test.java:159)
>
>
> Here is the complete test code, hope anyone can help, thanks!
>
> package test.flinksql;
>
> import java.util.Random;
>
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSink;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableSchema;
> import org.apache.flink.table.api.Types;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.TemporalTableFunction;
> import org.apache.flink.table.sinks.RetractStreamTableSink;
> import org.apache.flink.table.sinks.TableSink;
> import org.apache.flink.table.sources.DefinedProctimeAttribute;
> import org.apache.flink.table.sources.StreamTableSource;
> import org.apache.flink.types.Row;
>
> public class TemporalTableFunctionTest {
>
>     public static void main(String[] args) throws Exception {
>
>         StreamExecutionEnvironment sEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         sEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>
>         EnvironmentSettings bsSettings =
>             EnvironmentSettings.newInstance()
>                 .useBlinkPlanner()
>                 .inStreamingMode()
>             .build();
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv,
> bsSettings);
>
>         tEnv.registerTableSource("RatesHistory", new FooSource(new
> String[] {"r_currency", "r_amount", "r_proctime"}));
>         Table ratesHistory = tEnv.sqlQuery("SELECT * FROM RatesHistory");
>         TemporalTableFunction rates =
> ratesHistory.createTemporalTableFunction("r_proctime", "r_currency");
>         tEnv.registerFunction("Rates", rates);
>
>         tEnv.registerTableSource("Orders", new FooSource(new String[]
> {"o_currency", "o_amount", "o_proctime"}));
>
>         tEnv.registerTableSink("OutSink", new SysoSink());
>
>         // works
>         Table prices = tEnv.sqlQuery(
>                 "  SELECT                                     \r\n" +
>                 "    o_amount * r_amount AS amount            \r\n" +
>                 "  FROM Orders                                \r\n" +
>                 "     , LATERAL TABLE (Rates(o_proctime))     \r\n" +
>                 "  WHERE r_currency = o_currency              ");
>
>         // Raise NullPointerException
>         //Table prices = tEnv.sqlQuery(
>         //        "  SELECT                                     \r\n" +
>         //        "    o_amount * r_amount AS amount            \r\n" +
>         //        "  FROM (SELECT * FROM Orders) as O
>           \r\n" +
>         //        "     , LATERAL TABLE (Rates(o_proctime))     \r\n" +
>         //        "  WHERE r_currency = o_currency              ");
>
>         prices.insertInto("OutSink");
>
>         sEnv.execute();
>     }
>
>
>     public static class SysoSink implements RetractStreamTableSink<Row> {
>         @Override
>         public String[] getFieldNames() {
>             return new String[] {"out"};
>         }
>         @Override
>         public TypeInformation<?>[] getFieldTypes() {
>             return new TypeInformation[] {Types.LONG()};
>         }
>         @Override
>         public TableSink<Tuple2<Boolean, Row>> configure(String[]
> fieldNames, TypeInformation<?>[] fieldTypes) {
>             return this;
>         }
>         @Override
>         public void emitDataStream(DataStream<Tuple2<Boolean, Row>>
> dataStream) {
>             consumeDataStream(dataStream);
>         }
>         @Override
>         public TypeInformation<Row> getRecordType() {
>             return Types.ROW(getFieldNames(), getFieldTypes());
>         }
>         @Override
>         public DataStreamSink<Tuple2<Boolean, Row>>
> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
>             return dataStream.addSink(new SysoSinkFunction<Tuple2<Boolean,
> Row>>());
>         }
>     }
>
>     @SuppressWarnings("serial")
>     public static class SysoSinkFunction<T> implements SinkFunction<T> {
>         @Override
>         public void invoke(T value) throws Exception {
>             System.out.println(value);
>         }
>     }
>
>     public static class FooSource implements StreamTableSource<Row>,
> DefinedProctimeAttribute {
>
>         String[] fieldNames;
>
>
>         public FooSource(String[] fieldNames) {
>             this.fieldNames = fieldNames;
>         }
>
>         @Override
>         public TableSchema getTableSchema() {
>             return new TableSchema(fieldNames, new TypeInformation[]
> {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
>         }
>
>         @Override
>         public TypeInformation<Row> getReturnType() {
>             return Types.ROW(fieldNames, new TypeInformation[]
> {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
>         }
>
>         @Override
>         public DataStream<Row> getDataStream(StreamExecutionEnvironment
> execEnv) {
>             return execEnv.addSource(new SourceFunction<Row>() {
>
>                 @Override
>                 public void run(SourceContext<Row> ctx) throws Exception {
>                     Random random = new Random();
>
>                     while (true) {
>
>                         Row row = new Row(3);
>                         row.setField(0, "Euro" + random.nextLong() % 3);
>                         row.setField(1, random.nextLong() % 200 );
>                         row.setField(2, new
> java.sql.Timestamp(System.currentTimeMillis()));
>                         ctx.collect(row);
>                         Thread.sleep(100);
>                     }
>
>                 }
>
>                 @Override
>                 public void cancel() {
>                     System.out.println("cancelling
> ----------------------------------------------");
>
>                 }
>             }, getReturnType());
>         }
>
>         @Override
>         public String getProctimeAttribute() {
>             return fieldNames[2];
>         }
>     }
>
> }
>
>

Reply via email to