Hi,
I could found the problem in your implementation. The Table API program
is correct. However, the DataStream program that you construct in your
TableSource has a wrong type. When ever you use a Row type, you need to
specify the type either by implementing ResultTypeQueryable or in your
can by supplying the info in the second parameter.
DataStream<Row> dataStream = execEnv.addSource(new SourceFunction<Row>() {
@Override public void run(SourceContext<Row> ctx)throws Exception {
}
@Override public void cancel() {
}
}, Types.ROW(Types.STRING(),Types.STRING(),Types.STRING()));
Otherwise your SourceFunction will have a generic black box type that
can not be accessed by the Table API.
Regards,
Timo
Am 10/23/17 um 1:01 PM schrieb ??????:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment);
tableEnvironment.registerTableSource("myTable",new MyTableSource()); String sql ="select f0 from
myTable"; Table sqlResult = tableEnvironment.sql(sql); DataStream<Tuple2<Boolean,String>>
result = tableEnvironment.toRetractStream(sqlResult,String.class); result.print(); environment.execute();