hanningning created FLINK-7852: ---------------------------------- Summary: An input of GenericTypeInfo<Row> cannot be converted to Table Key: FLINK-7852 URL: https://issues.apache.org/jira/browse/FLINK-7852 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.3.2 Reporter: hanningning
Dear All: I'm starting to learn about Flink,and I have a question about Table API&SQL as follows. It will be much appreciated to get your help ASAP. I tried to convert a stream into a table. The initial data type of this stream is String, and I converted the String type to Row through the map method, then converted this Row type DataStream to a Table, but I got a error, the error details is following: =================The error msg======================================= Exception in thread "main" org.apache.flink.table.api.TableException: An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo. at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:620) at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398) at org.apache.flink.table.api.java.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:88) at com.xiaoju.manhattan.fbi.data.calc.test.TableDynamicRowTest.main(TableDynamicRowTest.java:85) In addition, My code as below: ========================My Code================================== public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.getConfig().disableSysoutLogging(); StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment); DataStream<String> dataStream = environment.addSource(new SourceFunction<String>() { private String str1 = "{\"name\":\"name-value\",\"age\":\"28\",\"sex\":\"1\"}"; private long count = 0L; private volatile boolean isRunning = true; @Override public void run(SourceContext<String> ctx) throws Exception { while (isRunning && count<2){ synchronized (ctx.getCheckpointLock()){ ctx.collect(str1); count++; } } } @Override public void cancel() { isRunning = false; } }); DataStream<JsonNode> dataStreamJson = dataStream.map(new MapFunction<String, JsonNode>() { @Override public JsonNode map(String s) throws Exception { ObjectMapper objectMapper = new ObjectMapper(); JsonNode node = objectMapper.readTree(s); return node; } }); DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() { @Override public Row map(JsonNode jsonNode) throws Exception { int pos = 0; Row row = new Row(jsonNode.size()); Iterator<String> iterator = jsonNode.fieldNames(); while (iterator.hasNext()){ String key = iterator.next(); row.setField(pos,jsonNode.get(key).asText()); pos++; } return row; } }); dataStreamRow.addSink(new SinkFunction<Row>() { @Override public void invoke(Row value) throws Exception { System.out.println(value.getField(0)); } }); Table myTable = tableEnvironment.fromDataStream(dataStreamRow); Table result = myTable.select("f0"); DataStream<String> dataStreamResult = tableEnvironment.toAppendStream(result,String.class); dataStreamResult.print(); environment.execute(); } Waiting for your earlier reply, thanks. Best Wishes, Han -- This message was sent by Atlassian JIRA (v6.4.14#64029)