Thanks Guowei and Caizhi.
As Guowei noted, I am using Table API and it seems that it does not support
triggers at the moment. Is there a plan to support custom triggers in Table
API/SQL too?
Also, if I follow Guowei's suggestion, should I use DataStream for other
parts of the aggregate computation too or is there a way to create a
GroupedWindowedTable from the DataStream?


On Thu, Sep 2, 2021 at 9:24 PM Guowei Ma <> wrote:

> Hi, John
> I agree with Caizhi that you might need to customize a window trigger. But
> there is a small addition, you need to convert Table to DataStream first.
> Then you can customize the trigger of the window. Because as far as I
> know, Table API does not support custom windows yet. For details on how to
> convert, you can refer to [1]
> [1]
> Best,
> Guowei
> On Fri, Sep 3, 2021 at 10:28 AM Caizhi Weng <> wrote:
>> Hi!
>> You might want to use your custom trigger to achieve this.
>> Tumble windows are using EventTimeTrigger by default. Flink has another
>> built-in trigger called CountTrigger but it only fires for every X records,
>> ignoring the event time completely. You might want to create your own
>> trigger to combine the two, or more specifically, combine
>> EventTimeTrigger#onEventTime and CountTrigger#onElement.
>> For more about custom triggers see
>> John Smith <> 于2021年9月3日周五 上午2:00写道:
>>> Hi,
>>> Sorry if this has been answered previously but I couldn't find any
>>> answer for the question and would appreciate any help.
>>> Context:
>>> Let's say I have a log stream in Kafka where message values have an *id*
>>> field along with a few other fields. I want to count the number of messages
>>> for each id for a tumbling window of* ten minutes *and if the count for
>>> any id in the given window is higher than 5, I want to write the message
>>> into the sink topic. However, I don't want to wait until the end of the 10
>>> minute window to emit the result and want to immediately emit the result
>>> when the count is more than 5 for an id in the window. For example, if I
>>> see 6 messages in the first minute for an id, I want to trigger a write
>>> with the count of 6 in the sink topic immediately and not wait the whole 10
>>> minutes.
>>> The following code does the aggregation but emits the results at the end
>>> of the window. How can I trigger the emitting result earlier?
>>> final Table resultTable = sourceTable
>>>                 .select( $("id")
>>>                         , $("key")
>>> .window(Tumble.over(lit(10).minutes()).on($("timestamp")).as("w")   )
>>>                 .groupBy($("w"), $("id"))
>>>                 .select($("w").start().as("WindowStart"), $("id"), 
>>> $("key").count().as("count"))
>>>                 ;
>>> resultTable.execute().print();
>>> Thanks in advance!

Reply via email to