Hi! You might want to use your custom trigger to achieve this.
Tumble windows are using EventTimeTrigger by default. Flink has another built-in trigger called CountTrigger but it only fires for every X records, ignoring the event time completely. You might want to create your own trigger to combine the two, or more specifically, combine EventTimeTrigger#onEventTime and CountTrigger#onElement. For more about custom triggers see https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/windows/#triggers John Smith <mylearningemail2...@gmail.com> 于2021年9月3日周五 上午2:00写道: > Hi, > > Sorry if this has been answered previously but I couldn't find any answer > for the question and would appreciate any help. > Context: > Let's say I have a log stream in Kafka where message values have an *id* > field along with a few other fields. I want to count the number of messages > for each id for a tumbling window of* ten minutes *and if the count for > any id in the given window is higher than 5, I want to write the message > into the sink topic. However, I don't want to wait until the end of the 10 > minute window to emit the result and want to immediately emit the result > when the count is more than 5 for an id in the window. For example, if I > see 6 messages in the first minute for an id, I want to trigger a write > with the count of 6 in the sink topic immediately and not wait the whole 10 > minutes. > The following code does the aggregation but emits the results at the end > of the window. How can I trigger the emitting result earlier? > > final Table resultTable = sourceTable > .select( $("id") > , $("key") > > .window(Tumble.over(lit(10).minutes()).on($("timestamp")).as("w") ) > .groupBy($("w"), $("id")) > .select($("w").start().as("WindowStart"), $("id"), > $("key").count().as("count")) > ; > > resultTable.execute().print(); > > > Thanks in advance! > >