Hi!

You might want to use your custom trigger to achieve this.

Tumble windows are using EventTimeTrigger by default. Flink has another
built-in trigger called CountTrigger but it only fires for every X records,
ignoring the event time completely. You might want to create your own
trigger to combine the two, or more specifically, combine
EventTimeTrigger#onEventTime and CountTrigger#onElement.

For more about custom triggers see
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/windows/#triggers

John Smith <mylearningemail2...@gmail.com> 于2021年9月3日周五 上午2:00写道:

> Hi,
>
> Sorry if this has been answered previously but I couldn't find any answer
> for the question and would appreciate any help.
> Context:
> Let's say I have a log stream in Kafka where message values have an *id*
> field along with a few other fields. I want to count the number of messages
> for each id for a tumbling window of* ten minutes *and if the count for
> any id in the given window is higher than 5, I want to write the message
> into the sink topic. However, I don't want to wait until the end of the 10
> minute window to emit the result and want to immediately emit the result
> when the count is more than 5 for an id in the window. For example, if I
> see 6 messages in the first minute for an id, I want to trigger a write
> with the count of 6 in the sink topic immediately and not wait the whole 10
> minutes.
> The following code does the aggregation but emits the results at the end
> of the window. How can I trigger the emitting result earlier?
>
> final Table resultTable = sourceTable
>                 .select( $("id")
>                         , $("key")
>                 
> .window(Tumble.over(lit(10).minutes()).on($("timestamp")).as("w")   )
>                 .groupBy($("w"), $("id"))
>                 .select($("w").start().as("WindowStart"), $("id"), 
> $("key").count().as("count"))
>                 ;
>
> resultTable.execute().print();
>
>
> Thanks in advance!
>
>

Reply via email to