Read Kafka message and keyBy by tableName, then write the message list to 
DataBase with batchUpdate
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new
 ListAggregate()).addSink(new TemplateMySQLSink());

It seems that for every record comming, the aggregate function will be trigged. 
But I want to trigger only once for every window.How can i implement this?
Thanks,Lei


wangl...@geekplus.com

Reply via email to