Hi Han,

generally, Flink is a strongly typed system. I think the easiest way to handle a dynamic schema is to read your JSON as a String. You can then implement your own ScalarFunction (or in this case also a TableFunction) [1] and use any JSON parsing library in this function for preprocessing to a common representation.

I hope this helps.

Regards,
Timo


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/udfs.html


Am 10/23/17 um 5:16 AM schrieb ??????:
Dear All,
?0?2 ?0?2 ?0?2My name is Han. I'm very interested in your advanced Flink system, and I'm learning it. ?0?2 ?0?2 ?0?2I'm writing to your group for communicating about my personal question. I tried to use Table API&SQL and register a TableSource by the KafkaJsonTableSource method, I have to say it works very well. My code as follows. And my question is about TypeInformation. The first parameter returns an array of String types and it contains fields in the kafka, the second parameter corresponds to the type of each field,i.e the below code,new String[]{"a","b","c"} is the the first parameter,new TypeInformation<?>[]{Types.STRING(),Types.STRING(),Types.STRING()} is the second parameter. If Ididn't know the name of the field in the kafka before registering TableSource, in other words,
the fields in the kafka are dynamic, how to solve this problem?
TypeInformation<Row> typeInfo = Types.ROW(
         new String[]{"a","b","c"}, new 
TypeInformation<?>[]{Types.STRING(),Types.STRING(),Types.STRING()}
); KafkaJsonTableSource kafkaTableSource =new Kafka010JsonTableSource(eltResultTopic, inputProperties, 
typeInfo); kafkaTableSource.setFailOnMissingField(false); 
tableEnvironment.registerTableSource("kafkaSource",kafkaTableSource); Table sqlResult = 
tableEnvironment.sql("select a from kafkaSource"); DataStream<String> dataStream = 
tableEnvironment.toAppendStream(sqlResult,String.class); dataStream.print(); environment.execute();
Waiting for your earlier reply. Thanks.
Best Wishes,
Han







Reply via email to