Hi John, Thanks for answering my questions! I observe behavior which I can not understand. The code is working, but when delay between records larger then window duration I receive duplicated records. With the code below I received duplicated records in the output kstream. Count of duplicate records is always 3. If I change duration/advanceBy count of duplicated records is changing also. Do you have any ideas why duplicated records are received in the output kstream?
KStream<String, String> windowedStream = source .groupByKey() .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10))) .count() .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) .toStream(); Best regards, Viktor Markvardt чт, 16 янв. 2020 г. в 04:59, John Roesler <vvcep...@apache.org>: > Hi Viktor, > > I’m not sure why you get two identical outputs in response to a single > record. Regardless, since you say that you want to get a single, final > result for the window and you expect multiple inputs to the windows, you > need Suppression. > > My guess is that you just sent one record to try it out and didn’t see any > output? This is expected. Just as the window boundaries are defined by the > time stamps of the records, not by the current system time, suppression is > governed by the timestamp of the records. I.e., a thirty-second window is > not actually closed until you see a new record with a timestamp thirty > seconds later. > > Maybe try just sending a sequence of updates with incrementing > timestamps. If the first record has timestamp T, then you should see an > output when you pass in a record with timestamp T+30. > > Important note: there is a built-in grace period that delays the output of > final results after the window ends. For complicated reasons, the default > is 24 hours! So you would actually not see an output until you send a > record with timestamp T+30+(24 hours) ! I strongly recommend you set the > grace period on TimeWindows to zero for your testing. You can increase it > later if you want to tolerate some late-arriving records. > > Thanks, > -John > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote: > > Hi, > > > > My name is Viktor. I'm currently working with Kafka streams and have > > several questions about Kafka and I can not find answers in the official > > docs. > > > > 1. Why suppress functionality does not work with Hopping windows? How to > > make it work? > > > > Example of the code: > > > > KStream<String, String> finalStream = source > > .groupByKey() > > > > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10))) > > .reduce((aggValue, newValue) -> newValue, > > Materialized.with(Serdes.String(), Serdes.String())) > > > > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > .toStream(); > > > > finalStream.print(Printed.toSysOut()); > > finalStream.to(outputTopic); > > > > After I run the code above - output stream is empty. There were no > > errors/exceptions. > > NOTE: With Tumbling Window the code working as expected. > > Maybe I simply use it incorrectly? > > > > 2. Why with Hopping windows (without suppress) there are duplicates in > the > > output stream? > > E.g., I send one record in the input kstream with Hopping window > > (duration=30s, advanceBy=2s) but get two same records (duplicate) in the > > output kstream. > > Is that an expected behavior? If so, how can I filter/switch off these > > duplicates? > > > > 3. Mainly I'm trying to solve this problem: > > I have kstream with events inside and events can be repeated > (duplicates). > > In the output kstream I would like to receive only unique events for the > > last 24 hours (window duration) with 1 hour window overlay (window > > advanceBy). > > Could you recommend me any examples of code or docs please? > > I have already read official docs and examples but it was not enough to > get > > full understanding of how I can achieve this. > > > > Best regards, > > Viktor Markvardt > > >