Thanks Guowei and Caizhi. As Guowei noted, I am using Table API and it seems that it does not support triggers at the moment. Is there a plan to support custom triggers in Table API/SQL too? Also, if I follow Guowei's suggestion, should I use DataStream for other parts of the aggregate computation too or is there a way to create a GroupedWindowedTable from the DataStream?
Thanks, On Thu, Sep 2, 2021 at 9:24 PM Guowei Ma <guowei....@gmail.com> wrote: > Hi, John > > I agree with Caizhi that you might need to customize a window trigger. But > there is a small addition, you need to convert Table to DataStream first. > Then you can customize the trigger of the window. Because as far as I > know, Table API does not support custom windows yet. For details on how to > convert, you can refer to [1] > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#datastream-api-integration > Best, > Guowei > > > On Fri, Sep 3, 2021 at 10:28 AM Caizhi Weng <tsreape...@gmail.com> wrote: > >> Hi! >> >> You might want to use your custom trigger to achieve this. >> >> Tumble windows are using EventTimeTrigger by default. Flink has another >> built-in trigger called CountTrigger but it only fires for every X records, >> ignoring the event time completely. You might want to create your own >> trigger to combine the two, or more specifically, combine >> EventTimeTrigger#onEventTime and CountTrigger#onElement. >> >> For more about custom triggers see >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/windows/#triggers >> >> John Smith <mylearningemail2...@gmail.com> 于2021年9月3日周五 上午2:00写道: >> >>> Hi, >>> >>> Sorry if this has been answered previously but I couldn't find any >>> answer for the question and would appreciate any help. >>> Context: >>> Let's say I have a log stream in Kafka where message values have an *id* >>> field along with a few other fields. I want to count the number of messages >>> for each id for a tumbling window of* ten minutes *and if the count for >>> any id in the given window is higher than 5, I want to write the message >>> into the sink topic. However, I don't want to wait until the end of the 10 >>> minute window to emit the result and want to immediately emit the result >>> when the count is more than 5 for an id in the window. For example, if I >>> see 6 messages in the first minute for an id, I want to trigger a write >>> with the count of 6 in the sink topic immediately and not wait the whole 10 >>> minutes. >>> The following code does the aggregation but emits the results at the end >>> of the window. How can I trigger the emitting result earlier? >>> >>> final Table resultTable = sourceTable >>> .select( $("id") >>> , $("key") >>> >>> .window(Tumble.over(lit(10).minutes()).on($("timestamp")).as("w") ) >>> .groupBy($("w"), $("id")) >>> .select($("w").start().as("WindowStart"), $("id"), >>> $("key").count().as("count")) >>> ; >>> >>> resultTable.execute().print(); >>> >>> >>> Thanks in advance! >>> >>>