Hi, Sorry if this has been answered previously but I couldn't find any answer for the question and would appreciate any help. Context: Let's say I have a log stream in Kafka where message values have an *id* field along with a few other fields. I want to count the number of messages for each id for a tumbling window of* ten minutes *and if the count for any id in the given window is higher than 5, I want to write the message into the sink topic. However, I don't want to wait until the end of the 10 minute window to emit the result and want to immediately emit the result when the count is more than 5 for an id in the window. For example, if I see 6 messages in the first minute for an id, I want to trigger a write with the count of 6 in the sink topic immediately and not wait the whole 10 minutes. The following code does the aggregation but emits the results at the end of the window. How can I trigger the emitting result earlier?
final Table resultTable = sourceTable .select( $("id") , $("key") .window(Tumble.over(lit(10).minutes()).on($("timestamp")).as("w") ) .groupBy($("w"), $("id")) .select($("w").start().as("WindowStart"), $("id"), $("key").count().as("count")) ; resultTable.execute().print(); Thanks in advance!