source是kafka,有一个rowtime定义: .field("rowtime", DataTypes.TIMESTAMP(0)) .rowtime(Rowtime() .timestamps_from_field("actionTime") .watermarks_periodic_bounded(60000) )
有两个sink,第一个sink是直接把kafa的数据保存到postgres。 第二个sink是定义一个1小时的tumble window,然后定义了一个udf,udf里面去查询第一个sink保存的数据。 st_env.scan("source") \ .window(Tumble.over("1.hour").on("rowtime").alias("hourlywindow")) \ .group_by("hourlywindow") \ .select("udf(...)") ... 现在的问题是:第二个sink的tumble window触发的时候,数据库里面的数据已经保存了下一个小时的数据了。 有什么办法让tumble window在一个小时结束后马上触发?现在观察的是需要下一个小时的数据来了,才能触发上一个小时的窗口。 -- Sent from: http://apache-flink.147419.n8.nabble.com/