Hi all,

I'm using flink sql to join a temporal table in a subquery, but it raises 
java.lang.NullPointerException when execute.


Orders is a table source, and Rates is a temporal table




Here are my sqls:

// works
SELECT o_amount * r_amount AS amount
FROM Orders, LATERAL TABLE (Rates(o_proctime))
WHERE r_currency = o_currency



// sql raise exception
SELECT o_amount * r_amount AS amount
FROM (SELECT * FROM Orders) as Orders, LATERAL TABLE (Rates(o_proctime))
WHERE r_currency = o_currency




The error stack: 
Exception in thread "main" java.lang.NullPointerException
        at 
org.apache.flink.table.planner.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:167)
        at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:98)
        at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
        at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
        at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
        at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
        at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.immutable.Range.foreach(Range.scala:160)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:166)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:88)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:212)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:77)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:281)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:276)
        at cn.easyops.flink_sql.Test.main(Test.java:159)





Here is the complete test code, hope anyone can help, thanks! 


package test.flinksql;


import java.util.Random;


import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.table.sinks.RetractStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;


public class TemporalTableFunctionTest {


    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment sEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
        sEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);


        EnvironmentSettings bsSettings = 
            EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
            .build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv, 
bsSettings);
        
        tEnv.registerTableSource("RatesHistory", new FooSource(new String[] 
{"r_currency", "r_amount", "r_proctime"}));
        Table ratesHistory = tEnv.sqlQuery("SELECT * FROM RatesHistory");
        TemporalTableFunction rates = 
ratesHistory.createTemporalTableFunction("r_proctime", "r_currency");
        tEnv.registerFunction("Rates", rates);
        
        tEnv.registerTableSource("Orders", new FooSource(new String[] 
{"o_currency", "o_amount", "o_proctime"}));
        
        tEnv.registerTableSink("OutSink", new SysoSink());


        // works
        Table prices = tEnv.sqlQuery(
                "  SELECT                                     \r\n" +
                "    o_amount * r_amount AS amount            \r\n" +
                "  FROM Orders                                \r\n" +
                "     , LATERAL TABLE (Rates(o_proctime))     \r\n" +
                "  WHERE r_currency = o_currency              ");


        // Raise NullPointerException
        //Table prices = tEnv.sqlQuery(
        //        "  SELECT                                     \r\n" +
        //        "    o_amount * r_amount AS amount            \r\n" +
        //        "  FROM (SELECT * FROM Orders) as O                           
     \r\n" +
        //        "     , LATERAL TABLE (Rates(o_proctime))     \r\n" +
        //        "  WHERE r_currency = o_currency              ");
        
        prices.insertInto("OutSink");
        
        sEnv.execute();
    }
    


    public static class SysoSink implements RetractStreamTableSink<Row> {
        @Override
        public String[] getFieldNames() {
            return new String[] {"out"};
        }
        @Override
        public TypeInformation<?>[] getFieldTypes() {
            return new TypeInformation[] {Types.LONG()};
        }
        @Override
        public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
            return this;
        }
        @Override
        public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) 
{
            consumeDataStream(dataStream);
        }
        @Override
        public TypeInformation<Row> getRecordType() {
            return Types.ROW(getFieldNames(), getFieldTypes());
        }
        @Override
        public DataStreamSink<Tuple2<Boolean, Row>> 
consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
            return dataStream.addSink(new SysoSinkFunction<Tuple2<Boolean, 
Row>>());
        }
    }
    
    @SuppressWarnings("serial")
    public static class SysoSinkFunction<T> implements SinkFunction<T> {
        @Override
        public void invoke(T value) throws Exception {
            System.out.println(value);
        }
    }
    
    public static class FooSource implements StreamTableSource<Row>, 
DefinedProctimeAttribute {
        
        String[] fieldNames;
        
        
        public FooSource(String[] fieldNames) {
            this.fieldNames = fieldNames;
        }


        @Override
        public TableSchema getTableSchema() {
            return new TableSchema(fieldNames, new TypeInformation[] 
{Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
        }
        
        @Override
        public TypeInformation<Row> getReturnType() {
            return Types.ROW(fieldNames, new TypeInformation[] {Types.STRING(), 
Types.LONG(), Types.SQL_TIMESTAMP()});
        }


        @Override
        public DataStream<Row> getDataStream(StreamExecutionEnvironment 
execEnv) {
            return execEnv.addSource(new SourceFunction<Row>() {


                @Override
                public void run(SourceContext<Row> ctx) throws Exception {
                    Random random = new Random();
                    
                    while (true) {


                        Row row = new Row(3);
                        row.setField(0, "Euro" + random.nextLong() % 3);
                        row.setField(1, random.nextLong() % 200 );
                        row.setField(2, new 
java.sql.Timestamp(System.currentTimeMillis()));
                        ctx.collect(row);
                        Thread.sleep(100);
                    }
                    
                }


                @Override
                public void cancel() {
                    System.out.println("cancelling 
----------------------------------------------");
                    
                }
            }, getReturnType());
        }


        @Override
        public String getProctimeAttribute() {
            return fieldNames[2];
        }
    }


}

Reply via email to