source是kafka,有一个rowtime定义:

            .field("rowtime", DataTypes.TIMESTAMP(0))
            .rowtime(Rowtime()
                .timestamps_from_field("actionTime")
                .watermarks_periodic_bounded(60000)
            )

有两个sink,第一个sink是直接把kafa的数据保存到postgres。
第二个sink是定义一个1小时的tumble window,然后定义了一个udf,udf里面去查询第一个sink保存的数据。
    st_env.scan("source") \
         .window(Tumble.over("1.hour").on("rowtime").alias("hourlywindow"))
\
         .group_by("hourlywindow") \
         .select("udf(...)")
         ...


现在的问题是:第二个sink的tumble window触发的时候,数据库里面的数据已经保存了下一个小时的数据了。

有什么办法让tumble window在一个小时结束后马上触发?现在观察的是需要下一个小时的数据来了,才能触发上一个小时的窗口。




--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复