Re: Streams, Kafka windows

2020-01-14 Thread Sachin Mittal
You can try to convert the final resultant stream to table.
Check this page for more info:
https://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

This way table would always contain the latest (single) record for a given
key.

Sachin




On Tue, Jan 14, 2020 at 10:11 PM Viktor Markvardt <
viktor.markva...@gmail.com> wrote:

> Hi,
>
> My name is Viktor. I'm currently working with Kafka streams and have
> several questions about Kafka and I can not find answers in the official
> docs.
>
> 1. Why suppress functionality does not work with Hopping windows? How to
> make it work?
>
> Example of the code:
>
> KStream finalStream = source
> .groupByKey()
>
>
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
> .reduce((aggValue, newValue) -> newValue,
> Materialized.with(Serdes.String(), Serdes.String()))
>
>
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> .toStream();
>
> finalStream.print(Printed.toSysOut());
> finalStream.to(outputTopic);
>
> After I run the code above - output stream is empty. There were no
> errors/exceptions.
> NOTE: With Tumbling Window the code working as expected.
> Maybe I simply use it incorrectly?
>
> 2. Why with Hopping windows (without suppress) there are duplicates in the
> output stream?
> E.g., I send one record in the input kstream with Hopping window
> (duration=30s, advanceBy=2s) but get two same records (duplicate) in the
> output kstream.
> Is that an expected behavior? If so, how can I filter/switch off these
> duplicates?
>
> 3. Mainly I'm trying to solve this problem:
> I have kstream with events inside and events can be repeated (duplicates).
> In the output kstream I would like to receive only unique events for the
> last 24 hours (window duration) with 1 hour window overlay (window
> advanceBy).
> Could you recommend me any examples of code or docs please?
> I have already read official docs and examples but it was not enough to get
> full understanding of how I can achieve this.
>
> Best regards,
> Viktor Markvardt
>


Streams, Kafka windows

2020-01-14 Thread Viktor Markvardt
Hi,

My name is Viktor. I'm currently working with Kafka streams and have
several questions about Kafka and I can not find answers in the official
docs.

1. Why suppress functionality does not work with Hopping windows? How to
make it work?

Example of the code:

KStream finalStream = source
.groupByKey()

.windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
.reduce((aggValue, newValue) -> newValue,
Materialized.with(Serdes.String(), Serdes.String()))

.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream();

finalStream.print(Printed.toSysOut());
finalStream.to(outputTopic);

After I run the code above - output stream is empty. There were no
errors/exceptions.
NOTE: With Tumbling Window the code working as expected.
Maybe I simply use it incorrectly?

2. Why with Hopping windows (without suppress) there are duplicates in the
output stream?
E.g., I send one record in the input kstream with Hopping window
(duration=30s, advanceBy=2s) but get two same records (duplicate) in the
output kstream.
Is that an expected behavior? If so, how can I filter/switch off these
duplicates?

3. Mainly I'm trying to solve this problem:
I have kstream with events inside and events can be repeated (duplicates).
In the output kstream I would like to receive only unique events for the
last 24 hours (window duration) with 1 hour window overlay (window
advanceBy).
Could you recommend me any examples of code or docs please?
I have already read official docs and examples but it was not enough to get
full understanding of how I can achieve this.

Best regards,
Viktor Markvardt