Hi, John I agree with Caizhi that you might need to customize a window trigger. But there is a small addition, you need to convert Table to DataStream first. Then you can customize the trigger of the window. Because as far as I know, Table API does not support custom windows yet. For details on how to convert, you can refer to [1]
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#datastream-api-integration Best, Guowei On Fri, Sep 3, 2021 at 10:28 AM Caizhi Weng <tsreape...@gmail.com> wrote: > Hi! > > You might want to use your custom trigger to achieve this. > > Tumble windows are using EventTimeTrigger by default. Flink has another > built-in trigger called CountTrigger but it only fires for every X records, > ignoring the event time completely. You might want to create your own > trigger to combine the two, or more specifically, combine > EventTimeTrigger#onEventTime and CountTrigger#onElement. > > For more about custom triggers see > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/windows/#triggers > > John Smith <mylearningemail2...@gmail.com> 于2021年9月3日周五 上午2:00写道: > >> Hi, >> >> Sorry if this has been answered previously but I couldn't find any answer >> for the question and would appreciate any help. >> Context: >> Let's say I have a log stream in Kafka where message values have an *id* >> field along with a few other fields. I want to count the number of messages >> for each id for a tumbling window of* ten minutes *and if the count for >> any id in the given window is higher than 5, I want to write the message >> into the sink topic. However, I don't want to wait until the end of the 10 >> minute window to emit the result and want to immediately emit the result >> when the count is more than 5 for an id in the window. For example, if I >> see 6 messages in the first minute for an id, I want to trigger a write >> with the count of 6 in the sink topic immediately and not wait the whole 10 >> minutes. >> The following code does the aggregation but emits the results at the end >> of the window. How can I trigger the emitting result earlier? >> >> final Table resultTable = sourceTable >> .select( $("id") >> , $("key") >> >> .window(Tumble.over(lit(10).minutes()).on($("timestamp")).as("w") ) >> .groupBy($("w"), $("id")) >> .select($("w").start().as("WindowStart"), $("id"), >> $("key").count().as("count")) >> ; >> >> resultTable.execute().print(); >> >> >> Thanks in advance! >> >>