And the answer is to change .windowedBy(TimeWindows.of(Duration.ofMillis(5000))) and specify the grace period: windowedBy(TimeWindows.of(Duration.ofMillis(5000)).grace(Duration.ofMillis(100)))
On Wed, Apr 15, 2020 at 4:34 PM Liam Clarke <liam.cla...@adscale.co.nz> wrote: > Okay, doing some debugging it looks like I'm seeing this behaviour because > it's picking up a grace duration of 86,395,000 ms in > KTableImpl.buildSuppress, which would happen to be 5000 millis (my window > size) off 24 hours, so I've got some clues! > > On Wed, Apr 15, 2020 at 3:43 PM Liam Clarke <liam.cla...@adscale.co.nz> > wrote: > >> Hi all, >> >> I have a case where I want to consume from a topic, count the number of >> certain ids in a given time period X, and emit a new record to a different >> topic after that same time period X has elapsed containing the aggregated >> value. >> >> I'm using suppress with Suppressed.untilWindowCloses, but nothing is ever >> emitted, nor is my peek placed after the suppress ever being hit. >> My code is in the below Gist - I've hardcoded the durations for 5 seconds >> after testing purposes: >> https://gist.github.com/LiamClarkeNZ/24121ccf0f09e4530749cbd92633fa46 >> >> I'm assuming I've misunderstood something drastically, and would greatly >> appreciate a pointer on where I may have gone wrong. I'm wondering if I >> need a larger retention on the persistent store? >> >> I understand that events have to arrive in order for windows to close, so >> I've sent events after the window has expired to attempt to move the window >> on, and my first peek (before the suppression) is emitting as I do: >> >> 1. 2020-04-15T03:36:48.569Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1 >> 1. 2020-04-15T03:37:11.682Z e2442bef-72bf-4424-b94e-7e4743e03c5e - 1 >> 1. 2020-04-15T03:39:18.882Z aqgzftnvyn - 1 >> >> >> Any guidance greatfully appreciated. >> >> Kind regards, >> >> Liam Clarke >> >