Read Kafka message and keyBy by tableName, then write the message list to DataBase with batchUpdate keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new ListAggregate()).addSink(new TemplateMySQLSink());
It seems that for every record comming, the aggregate function will be trigged. But I want to trigger only once for every window.How can i implement this? Thanks,Lei wangl...@geekplus.com