Hi,

I could found the problem in your implementation. The Table API program is correct. However, the DataStream program that you construct in your TableSource has a wrong type. When ever you use a Row type, you need to specify the type either by implementing ResultTypeQueryable or in your can by supplying the info in the second parameter.


DataStream<Row> dataStream = execEnv.addSource(new SourceFunction<Row>() {
    @Override public void run(SourceContext<Row> ctx)throws Exception {
    }

    @Override public void cancel() {
    }
}, Types.ROW(Types.STRING(),Types.STRING(),Types.STRING()));


Otherwise your SourceFunction will have a generic black box type that can not be accessed by the Table API.

Regards,
Timo



Am 10/23/17 um 1:01 PM schrieb ??????:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); 
StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment); 
tableEnvironment.registerTableSource("myTable",new MyTableSource()); String sql ="select f0 from 
myTable"; Table sqlResult = tableEnvironment.sql(sql); DataStream<Tuple2<Boolean,String>> 
result = tableEnvironment.toRetractStream(sqlResult,String.class); result.print(); environment.execute();


Reply via email to