Hi,

Sorry if this has been answered previously but I couldn't find any answer
for the question and would appreciate any help.
Context:
Let's say I have a log stream in Kafka where message values have an *id*
field along with a few other fields. I want to count the number of messages
for each id for a tumbling window of* ten minutes *and if the count for any
id in the given window is higher than 5, I want to write the message into
the sink topic. However, I don't want to wait until the end of the 10
minute window to emit the result and want to immediately emit the result
when the count is more than 5 for an id in the window. For example, if I
see 6 messages in the first minute for an id, I want to trigger a write
with the count of 6 in the sink topic immediately and not wait the whole 10
minutes.
The following code does the aggregation but emits the results at the end of
the window. How can I trigger the emitting result earlier?

final Table resultTable = sourceTable
                .select( $("id")
                        , $("key")

.window(Tumble.over(lit(10).minutes()).on($("timestamp")).as("w")   )
                .groupBy($("w"), $("id"))
                .select($("w").start().as("WindowStart"), $("id"),
$("key").count().as("count"))
                ;

resultTable.execute().print();


Thanks in advance!

Reply via email to