Hi, John

I agree with Caizhi that you might need to customize a window trigger. But
there is a small addition, you need to convert Table to DataStream first.
Then you can customize the trigger of the window. Because as far as I know,
Table API does not support custom windows yet. For details on how to
convert, you can refer to [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#datastream-api-integration
Best,
Guowei


On Fri, Sep 3, 2021 at 10:28 AM Caizhi Weng <tsreape...@gmail.com> wrote:

> Hi!
>
> You might want to use your custom trigger to achieve this.
>
> Tumble windows are using EventTimeTrigger by default. Flink has another
> built-in trigger called CountTrigger but it only fires for every X records,
> ignoring the event time completely. You might want to create your own
> trigger to combine the two, or more specifically, combine
> EventTimeTrigger#onEventTime and CountTrigger#onElement.
>
> For more about custom triggers see
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/windows/#triggers
>
> John Smith <mylearningemail2...@gmail.com> 于2021年9月3日周五 上午2:00写道:
>
>> Hi,
>>
>> Sorry if this has been answered previously but I couldn't find any answer
>> for the question and would appreciate any help.
>> Context:
>> Let's say I have a log stream in Kafka where message values have an *id*
>> field along with a few other fields. I want to count the number of messages
>> for each id for a tumbling window of* ten minutes *and if the count for
>> any id in the given window is higher than 5, I want to write the message
>> into the sink topic. However, I don't want to wait until the end of the 10
>> minute window to emit the result and want to immediately emit the result
>> when the count is more than 5 for an id in the window. For example, if I
>> see 6 messages in the first minute for an id, I want to trigger a write
>> with the count of 6 in the sink topic immediately and not wait the whole 10
>> minutes.
>> The following code does the aggregation but emits the results at the end
>> of the window. How can I trigger the emitting result earlier?
>>
>> final Table resultTable = sourceTable
>>                 .select( $("id")
>>                         , $("key")
>>                 
>> .window(Tumble.over(lit(10).minutes()).on($("timestamp")).as("w")   )
>>                 .groupBy($("w"), $("id"))
>>                 .select($("w").start().as("WindowStart"), $("id"), 
>> $("key").count().as("count"))
>>                 ;
>>
>> resultTable.execute().print();
>>
>>
>> Thanks in advance!
>>
>>

Reply via email to