Hi John,

Thanks for answering my questions!
I observe behavior which I can not understand.
The code is working, but when delay between records larger then window
duration I receive duplicated records.
With the code below I received duplicated records in the output kstream.
Count of duplicate records is always 3. If I change duration/advanceBy
count of duplicated records is changing also.
Do you have any ideas why duplicated records are received in the output
kstream?

KStream<String, String> windowedStream = source
    .groupByKey()
    
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10)))
    .count()
    .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
    .toStream();


Best regards,
Viktor Markvardt

чт, 16 янв. 2020 г. в 04:59, John Roesler <vvcep...@apache.org>:

> Hi Viktor,
>
> I’m not sure why you get two identical outputs in response to a single
> record. Regardless, since you say that you want to get a single, final
> result for the window and you expect multiple inputs to the windows, you
> need Suppression.
>
> My guess is that you just sent one record to try it out and didn’t see any
> output? This is expected. Just as the window boundaries are defined by the
> time stamps of the records, not by the current system time, suppression is
> governed by the timestamp of the records. I.e., a thirty-second window is
> not actually closed until you see a new record with a timestamp thirty
> seconds later.
>
>  Maybe try just sending a sequence of updates with incrementing
> timestamps. If the first record has timestamp T, then you should see an
> output when you pass in a record with timestamp T+30.
>
> Important note: there is a built-in grace period that delays the output of
> final results after the window ends. For complicated reasons, the default
> is 24 hours! So you would actually not see an output until you send a
> record with timestamp T+30+(24 hours) ! I strongly recommend you set the
> grace period on TimeWindows to zero for your testing. You can increase it
> later if you want to tolerate some late-arriving records.
>
> Thanks,
> -John
>
> On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> > Hi,
> >
> > My name is Viktor. I'm currently working with Kafka streams and have
> > several questions about Kafka and I can not find answers in the official
> > docs.
> >
> > 1. Why suppress functionality does not work with Hopping windows? How to
> > make it work?
> >
> > Example of the code:
> >
> > KStream<String, String> finalStream = source
> >                 .groupByKey()
> >
> >
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
> >                 .reduce((aggValue, newValue) -> newValue,
> > Materialized.with(Serdes.String(), Serdes.String()))
> >
> >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> >                 .toStream();
> >
> > finalStream.print(Printed.toSysOut());
> > finalStream.to(outputTopic);
> >
> > After I run the code above - output stream is empty. There were no
> > errors/exceptions.
> > NOTE: With Tumbling Window the code working as expected.
> > Maybe I simply use it incorrectly?
> >
> > 2. Why with Hopping windows (without suppress) there are duplicates in
> the
> > output stream?
> > E.g., I send one record in the input kstream with Hopping window
> > (duration=30s, advanceBy=2s) but get two same records (duplicate) in the
> > output kstream.
> > Is that an expected behavior? If so, how can I filter/switch off these
> > duplicates?
> >
> > 3. Mainly I'm trying to solve this problem:
> > I have kstream with events inside and events can be repeated
> (duplicates).
> > In the output kstream I would like to receive only unique events for the
> > last 24 hours (window duration) with 1 hour window overlay (window
> > advanceBy).
> > Could you recommend me any examples of code or docs please?
> > I have already read official docs and examples but it was not enough to
> get
> > full understanding of how I can achieve this.
> >
> > Best regards,
> > Viktor Markvardt
> >
>

Reply via email to