Dear All,
     My name is Han. I'm very interested in your advanced Flink system, and I'm 
learning it.
     I'm writing to your group for communicating about my personal question. I 
tried to use Table API&SQL and register a TableSource by the 
KafkaJsonTableSource method, I have to say it works very well. My code as 
follows.
And my question is about TypeInformation. The first parameter returns an array 
of String types and it contains fields in the kafka, the second parameter 
corresponds to the type of each field,i.e the below code,new 
String[]{"a","b","c"} is the the first parameter,new 
TypeInformation<?>[]{Types.STRING(),Types.STRING(),Types.STRING()} is the 
second parameter.
If I didn't know the name of the field in the kafka before registering 
TableSource, in other words,
the fields in the kafka are dynamic, how to solve this problem?

TypeInformation<Row> typeInfo = Types.ROW(
        new String[]{"a","b","c"},
        new TypeInformation<?>[]{Types.STRING(),Types.STRING(),Types.STRING()}
);
KafkaJsonTableSource kafkaTableSource = new 
Kafka010JsonTableSource(eltResultTopic,
        inputProperties, typeInfo);
kafkaTableSource.setFailOnMissingField(false);

tableEnvironment.registerTableSource("kafkaSource",kafkaTableSource);
Table sqlResult = tableEnvironment.sql("select a from kafkaSource");
DataStream<String> dataStream = 
tableEnvironment.toAppendStream(sqlResult,String.class);
dataStream.print();

environment.execute();
Waiting for your earlier reply. Thanks.
Best Wishes,
Han

Reply via email to