I am hoping you guys can help me. I am stumped how to actually write to Kafka
using Kafka09JsonTableSink using the Table API. Here is my code below, I am
hoping you guys can shed some light on how this should be done. I don’t see any
methods for the actual write to Kafka. I am probably doing something stupid.
TIA.
Thanks!
Kenny
// run some SQL to filter results where a key is not null
String sql = "SELECT icao FROM flights WHERE icao is not null";
tableEnv.registerTableSource("flights", kafkaTableSource);
Table result = tableEnv.sql(sql);
// create a partition for the data going into kafka
FlinkFixedPartitioner partition = new FlinkFixedPartitioner();
// create new tablesink of JSON to kafka
KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
params.getRequired("write-topic"),
params.getProperties(),
partition);
result.writeToSink(tableEnv, kafkaTableSink); // Logically, I want to do this,
but no such method..