And the answer is to change
.windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
and specify the grace period:
 
windowedBy(TimeWindows.of(Duration.ofMillis(5000)).grace(Duration.ofMillis(100)))

On Wed, Apr 15, 2020 at 4:34 PM Liam Clarke <liam.cla...@adscale.co.nz>
wrote:

> Okay, doing some debugging it looks like I'm seeing this behaviour because
> it's picking up a grace duration of 86,395,000 ms in
> KTableImpl.buildSuppress, which would happen to be  5000 millis (my window
> size) off 24 hours, so I've got some clues!
>
> On Wed, Apr 15, 2020 at 3:43 PM Liam Clarke <liam.cla...@adscale.co.nz>
> wrote:
>
>> Hi all,
>>
>> I have a case where I want to consume from a topic, count the number of
>> certain ids in a given time period X, and emit a new record to a different
>> topic after that same time period X has elapsed containing the aggregated
>> value.
>>
>> I'm using suppress with Suppressed.untilWindowCloses, but nothing is ever
>> emitted, nor is my peek placed after the suppress ever being hit.
>> My code is in the below Gist - I've hardcoded the durations for 5 seconds
>> after testing purposes:
>> https://gist.github.com/LiamClarkeNZ/24121ccf0f09e4530749cbd92633fa46
>>
>> I'm assuming I've misunderstood something drastically, and would greatly
>> appreciate a pointer on where I may have gone wrong. I'm wondering if I
>> need a larger retention on the persistent store?
>>
>> I understand that events have to arrive in order for windows to close, so
>> I've sent events after the window has expired to attempt to move the window
>> on, and my first peek (before the suppression) is emitting as I do:
>>
>> 1. 2020-04-15T03:36:48.569Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
>> 1. 2020-04-15T03:37:11.682Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1
>> 1. 2020-04-15T03:39:18.882Z aqgzftnvyn - 1
>>
>>
>>  Any guidance greatfully appreciated.
>>
>> Kind regards,
>>
>> Liam Clarke
>>
>

Reply via email to