Hi Han,
generally, Flink is a strongly typed system. I think the easiest way to
handle a dynamic schema is to read your JSON as a String. You can then
implement your own ScalarFunction (or in this case also a TableFunction)
[1] and use any JSON parsing library in this function for preprocessing
to a common representation.
I hope this helps.
Regards,
Timo
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/udfs.html
Am 10/23/17 um 5:16 AM schrieb ??????:
Dear All,
?0?2 ?0?2 ?0?2My name is Han. I'm very interested in your advanced Flink
system, and I'm learning it.
?0?2 ?0?2 ?0?2I'm writing to your group for communicating about my personal
question. I tried to use Table API&SQL and register a TableSource by
the KafkaJsonTableSource method, I have to say it works very well. My
code as follows.
And my question is about TypeInformation. The first parameter returns
an array of String types and it contains fields in the kafka, the
second parameter corresponds to the type of each field,i.e the below
code,new String[]{"a","b","c"} is the the first parameter,new
TypeInformation<?>[]{Types.STRING(),Types.STRING(),Types.STRING()} is
the second parameter.
If Ididn't know the name of the field in the kafka before registering
TableSource, in other words,
the fields in the kafka are dynamic, how to solve this problem?
TypeInformation<Row> typeInfo = Types.ROW(
new String[]{"a","b","c"}, new
TypeInformation<?>[]{Types.STRING(),Types.STRING(),Types.STRING()}
); KafkaJsonTableSource kafkaTableSource =new Kafka010JsonTableSource(eltResultTopic, inputProperties,
typeInfo); kafkaTableSource.setFailOnMissingField(false);
tableEnvironment.registerTableSource("kafkaSource",kafkaTableSource); Table sqlResult =
tableEnvironment.sql("select a from kafkaSource"); DataStream<String> dataStream =
tableEnvironment.toAppendStream(sqlResult,String.class); dataStream.print(); environment.execute();
Waiting for your earlier reply. Thanks.
Best Wishes,
Han