Re: Streams, Kafka windows

2020-05-28 Thread John Roesler
Setting a new record for procrastination, I've just created this ticket:
https://issues.apache.org/jira/browse/KAFKA-10058

On Sat, Jan 18, 2020, at 22:03, John Roesler wrote:
> Good idea! I’ll make a note to do it when I’m at a computer. 
> 
> On Sat, Jan 18, 2020, at 21:51, Guozhang Wang wrote:
> > Hey John,
> > 
> > Since this is a common question and I've seen many users asking about
> > window semantics like this, could you file a JIRA ticket for creating a
> > wiki page like Join Semantics (
> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics)
> > to summarize the windowing operations like this too?
> > 
> > Guozhang
> > 
> > On Sat, Jan 18, 2020 at 3:22 PM John Roesler  wrote:
> > 
> > > Glad it helped!
> > > -John
> > >
> > > On Sat, Jan 18, 2020, at 12:27, Viktor Markvardt wrote:
> > > > Hi John,
> > > >
> > > > Thank you for your assistance!
> > > > Your example very help me and I understood kafka-streams more clearly
> > > now.
> > > > Have a nice weekend :)
> > > >
> > > > Best regards,
> > > > Viktor Markvardt
> > > >
> > > > чт, 16 янв. 2020 г. в 19:29, John Roesler :
> > > >
> > > > > Hi Viktor,
> > > > >
> > > > > I’m starting to wonder what exactly “duplicate” means in this context.
> > > Can
> > > > > you elaborate?
> > > > >
> > > > > In case it helps, with your window definition, if I send a record with
> > > > > timestamp 20, it would actually belong to three different windows:
> > > > > [0,30)
> > > > > [10,40)
> > > > > [20,50)
> > > > >
> > > > > Because of this, you would (correctly) see three output records for
> > > that
> > > > > one input, but the outputs wouldn’t be “duplicates” properly, because
> > > > > they’d have different keys:
> > > > >
> > > > > Input:
> > > > > Key1: Val1 @ timestamp:20
> > > > >
> > > > > Output:
> > > > > Windowed: 1
> > > > > Windowed: 1
> > > > > Windowed: 1
> > > > >
> > > > > Any chance that explains your observation?
> > > > >
> > > > > Thanks,
> > > > > John
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote:
> > > > > > Hi John,
> > > > > >
> > > > > > Thanks for answering my questions!
> > > > > > I observe behavior which I can not understand.
> > > > > > The code is working, but when delay between records larger then
> > > window
> > > > > > duration I receive duplicated records.
> > > > > > With the code below I received duplicated records in the output
> > > kstream.
> > > > > > Count of duplicate records is always 3. If I change
> > > duration/advanceBy
> > > > > > count of duplicated records is changing also.
> > > > > > Do you have any ideas why duplicated records are received in the
> > > output
> > > > > > kstream?
> > > > > >
> > > > > > KStream windowedStream = source
> > > > > > .groupByKey()
> > > > > >
> > > > > >
> > > > >
> > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10)))
> > > > > > .count()
> > > > > >
> > > > > >
> > > > >
> > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > > > .toStream();
> > > > > >
> > > > > >
> > > > > > Best regards,
> > > > > > Viktor Markvardt
> > > > > >
> > > > > > чт, 16 янв. 2020 г. в 04:59, John Roesler :
> > > > > >
> > > > > > > Hi Viktor,
> > > > > > >
> > > > > > > I’m not sure why you get two identical outputs in response to a
> > > single
> > > > > > > record. Regardless, since you say that you want to get a single,
> > > final
> > > > > > > result for the window and you expect multiple inputs to the
> > > windows,
> > > > > you
> > > > > > > need Suppression.
> > > > > > >
> > > > > > > My guess is that you just sent one record to try it out and didn’t
> > > see
> > > > > any
> > > > > > > output? This is expected. Just as the window boundaries are
> > > defined by
> > > > > the
> > > > > > > time stamps of the records, not by the current system time,
> > > > > suppression is
> > > > > > > governed by the timestamp of the records. I.e., a thirty-second
> > > window
> > > > > is
> > > > > > > not actually closed until you see a new record with a timestamp
> > > thirty
> > > > > > > seconds later.
> > > > > > >
> > > > > > >  Maybe try just sending a sequence of updates with incrementing
> > > > > > > timestamps. If the first record has timestamp T, then you should
> > > see an
> > > > > > > output when you pass in a record with timestamp T+30.
> > > > > > >
> > > > > > > Important note: there is a built-in grace period that delays the
> > > > > output of
> > > > > > > final results after the window ends. For complicated reasons, the
> > > > > default
> > > > > > > is 24 hours! So you would actually not see an output until you
> > > send a
> > > > > > > record with timestamp T+30+(24 hours) ! I strongly recommend you
> > > set
> > > > > the
> > > > > > > grace period on TimeWindows to zero for your testing. You can
> > > increase
> > > > > it
> > > > > > > later if you want to tolerate some late-arriving 

Re: Streams, Kafka windows

2020-01-18 Thread John Roesler
Good idea! I’ll make a note to do it when I’m at a computer. 

On Sat, Jan 18, 2020, at 21:51, Guozhang Wang wrote:
> Hey John,
> 
> Since this is a common question and I've seen many users asking about
> window semantics like this, could you file a JIRA ticket for creating a
> wiki page like Join Semantics (
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics)
> to summarize the windowing operations like this too?
> 
> Guozhang
> 
> On Sat, Jan 18, 2020 at 3:22 PM John Roesler  wrote:
> 
> > Glad it helped!
> > -John
> >
> > On Sat, Jan 18, 2020, at 12:27, Viktor Markvardt wrote:
> > > Hi John,
> > >
> > > Thank you for your assistance!
> > > Your example very help me and I understood kafka-streams more clearly
> > now.
> > > Have a nice weekend :)
> > >
> > > Best regards,
> > > Viktor Markvardt
> > >
> > > чт, 16 янв. 2020 г. в 19:29, John Roesler :
> > >
> > > > Hi Viktor,
> > > >
> > > > I’m starting to wonder what exactly “duplicate” means in this context.
> > Can
> > > > you elaborate?
> > > >
> > > > In case it helps, with your window definition, if I send a record with
> > > > timestamp 20, it would actually belong to three different windows:
> > > > [0,30)
> > > > [10,40)
> > > > [20,50)
> > > >
> > > > Because of this, you would (correctly) see three output records for
> > that
> > > > one input, but the outputs wouldn’t be “duplicates” properly, because
> > > > they’d have different keys:
> > > >
> > > > Input:
> > > > Key1: Val1 @ timestamp:20
> > > >
> > > > Output:
> > > > Windowed: 1
> > > > Windowed: 1
> > > > Windowed: 1
> > > >
> > > > Any chance that explains your observation?
> > > >
> > > > Thanks,
> > > > John
> > > >
> > > >
> > > >
> > > > On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote:
> > > > > Hi John,
> > > > >
> > > > > Thanks for answering my questions!
> > > > > I observe behavior which I can not understand.
> > > > > The code is working, but when delay between records larger then
> > window
> > > > > duration I receive duplicated records.
> > > > > With the code below I received duplicated records in the output
> > kstream.
> > > > > Count of duplicate records is always 3. If I change
> > duration/advanceBy
> > > > > count of duplicated records is changing also.
> > > > > Do you have any ideas why duplicated records are received in the
> > output
> > > > > kstream?
> > > > >
> > > > > KStream windowedStream = source
> > > > > .groupByKey()
> > > > >
> > > > >
> > > >
> > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10)))
> > > > > .count()
> > > > >
> > > > >
> > > >
> > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > > .toStream();
> > > > >
> > > > >
> > > > > Best regards,
> > > > > Viktor Markvardt
> > > > >
> > > > > чт, 16 янв. 2020 г. в 04:59, John Roesler :
> > > > >
> > > > > > Hi Viktor,
> > > > > >
> > > > > > I’m not sure why you get two identical outputs in response to a
> > single
> > > > > > record. Regardless, since you say that you want to get a single,
> > final
> > > > > > result for the window and you expect multiple inputs to the
> > windows,
> > > > you
> > > > > > need Suppression.
> > > > > >
> > > > > > My guess is that you just sent one record to try it out and didn’t
> > see
> > > > any
> > > > > > output? This is expected. Just as the window boundaries are
> > defined by
> > > > the
> > > > > > time stamps of the records, not by the current system time,
> > > > suppression is
> > > > > > governed by the timestamp of the records. I.e., a thirty-second
> > window
> > > > is
> > > > > > not actually closed until you see a new record with a timestamp
> > thirty
> > > > > > seconds later.
> > > > > >
> > > > > >  Maybe try just sending a sequence of updates with incrementing
> > > > > > timestamps. If the first record has timestamp T, then you should
> > see an
> > > > > > output when you pass in a record with timestamp T+30.
> > > > > >
> > > > > > Important note: there is a built-in grace period that delays the
> > > > output of
> > > > > > final results after the window ends. For complicated reasons, the
> > > > default
> > > > > > is 24 hours! So you would actually not see an output until you
> > send a
> > > > > > record with timestamp T+30+(24 hours) ! I strongly recommend you
> > set
> > > > the
> > > > > > grace period on TimeWindows to zero for your testing. You can
> > increase
> > > > it
> > > > > > later if you want to tolerate some late-arriving records.
> > > > > >
> > > > > > Thanks,
> > > > > > -John
> > > > > >
> > > > > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> > > > > > > Hi,
> > > > > > >
> > > > > > > My name is Viktor. I'm currently working with Kafka streams and
> > have
> > > > > > > several questions about Kafka and I can not find answers in the
> > > > official
> > > > > > > docs.
> > > > > > >
> > > > > > > 1. Why suppress functionality does not work with Hopping 

Re: Streams, Kafka windows

2020-01-18 Thread Guozhang Wang
Hey John,

Since this is a common question and I've seen many users asking about
window semantics like this, could you file a JIRA ticket for creating a
wiki page like Join Semantics (
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics)
to summarize the windowing operations like this too?

Guozhang

On Sat, Jan 18, 2020 at 3:22 PM John Roesler  wrote:

> Glad it helped!
> -John
>
> On Sat, Jan 18, 2020, at 12:27, Viktor Markvardt wrote:
> > Hi John,
> >
> > Thank you for your assistance!
> > Your example very help me and I understood kafka-streams more clearly
> now.
> > Have a nice weekend :)
> >
> > Best regards,
> > Viktor Markvardt
> >
> > чт, 16 янв. 2020 г. в 19:29, John Roesler :
> >
> > > Hi Viktor,
> > >
> > > I’m starting to wonder what exactly “duplicate” means in this context.
> Can
> > > you elaborate?
> > >
> > > In case it helps, with your window definition, if I send a record with
> > > timestamp 20, it would actually belong to three different windows:
> > > [0,30)
> > > [10,40)
> > > [20,50)
> > >
> > > Because of this, you would (correctly) see three output records for
> that
> > > one input, but the outputs wouldn’t be “duplicates” properly, because
> > > they’d have different keys:
> > >
> > > Input:
> > > Key1: Val1 @ timestamp:20
> > >
> > > Output:
> > > Windowed: 1
> > > Windowed: 1
> > > Windowed: 1
> > >
> > > Any chance that explains your observation?
> > >
> > > Thanks,
> > > John
> > >
> > >
> > >
> > > On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote:
> > > > Hi John,
> > > >
> > > > Thanks for answering my questions!
> > > > I observe behavior which I can not understand.
> > > > The code is working, but when delay between records larger then
> window
> > > > duration I receive duplicated records.
> > > > With the code below I received duplicated records in the output
> kstream.
> > > > Count of duplicate records is always 3. If I change
> duration/advanceBy
> > > > count of duplicated records is changing also.
> > > > Do you have any ideas why duplicated records are received in the
> output
> > > > kstream?
> > > >
> > > > KStream windowedStream = source
> > > > .groupByKey()
> > > >
> > > >
> > >
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10)))
> > > > .count()
> > > >
> > > >
> > >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > .toStream();
> > > >
> > > >
> > > > Best regards,
> > > > Viktor Markvardt
> > > >
> > > > чт, 16 янв. 2020 г. в 04:59, John Roesler :
> > > >
> > > > > Hi Viktor,
> > > > >
> > > > > I’m not sure why you get two identical outputs in response to a
> single
> > > > > record. Regardless, since you say that you want to get a single,
> final
> > > > > result for the window and you expect multiple inputs to the
> windows,
> > > you
> > > > > need Suppression.
> > > > >
> > > > > My guess is that you just sent one record to try it out and didn’t
> see
> > > any
> > > > > output? This is expected. Just as the window boundaries are
> defined by
> > > the
> > > > > time stamps of the records, not by the current system time,
> > > suppression is
> > > > > governed by the timestamp of the records. I.e., a thirty-second
> window
> > > is
> > > > > not actually closed until you see a new record with a timestamp
> thirty
> > > > > seconds later.
> > > > >
> > > > >  Maybe try just sending a sequence of updates with incrementing
> > > > > timestamps. If the first record has timestamp T, then you should
> see an
> > > > > output when you pass in a record with timestamp T+30.
> > > > >
> > > > > Important note: there is a built-in grace period that delays the
> > > output of
> > > > > final results after the window ends. For complicated reasons, the
> > > default
> > > > > is 24 hours! So you would actually not see an output until you
> send a
> > > > > record with timestamp T+30+(24 hours) ! I strongly recommend you
> set
> > > the
> > > > > grace period on TimeWindows to zero for your testing. You can
> increase
> > > it
> > > > > later if you want to tolerate some late-arriving records.
> > > > >
> > > > > Thanks,
> > > > > -John
> > > > >
> > > > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> > > > > > Hi,
> > > > > >
> > > > > > My name is Viktor. I'm currently working with Kafka streams and
> have
> > > > > > several questions about Kafka and I can not find answers in the
> > > official
> > > > > > docs.
> > > > > >
> > > > > > 1. Why suppress functionality does not work with Hopping windows?
> > > How to
> > > > > > make it work?
> > > > > >
> > > > > > Example of the code:
> > > > > >
> > > > > > KStream finalStream = source
> > > > > > .groupByKey()
> > > > > >
> > > > > >
> > > > >
> > >
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
> > > > > > .reduce((aggValue, newValue) -> newValue,
> > > > > > 

Re: Streams, Kafka windows

2020-01-18 Thread John Roesler
Glad it helped!
-John

On Sat, Jan 18, 2020, at 12:27, Viktor Markvardt wrote:
> Hi John,
> 
> Thank you for your assistance!
> Your example very help me and I understood kafka-streams more clearly now.
> Have a nice weekend :)
> 
> Best regards,
> Viktor Markvardt
> 
> чт, 16 янв. 2020 г. в 19:29, John Roesler :
> 
> > Hi Viktor,
> >
> > I’m starting to wonder what exactly “duplicate” means in this context. Can
> > you elaborate?
> >
> > In case it helps, with your window definition, if I send a record with
> > timestamp 20, it would actually belong to three different windows:
> > [0,30)
> > [10,40)
> > [20,50)
> >
> > Because of this, you would (correctly) see three output records for that
> > one input, but the outputs wouldn’t be “duplicates” properly, because
> > they’d have different keys:
> >
> > Input:
> > Key1: Val1 @ timestamp:20
> >
> > Output:
> > Windowed: 1
> > Windowed: 1
> > Windowed: 1
> >
> > Any chance that explains your observation?
> >
> > Thanks,
> > John
> >
> >
> >
> > On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote:
> > > Hi John,
> > >
> > > Thanks for answering my questions!
> > > I observe behavior which I can not understand.
> > > The code is working, but when delay between records larger then window
> > > duration I receive duplicated records.
> > > With the code below I received duplicated records in the output kstream.
> > > Count of duplicate records is always 3. If I change duration/advanceBy
> > > count of duplicated records is changing also.
> > > Do you have any ideas why duplicated records are received in the output
> > > kstream?
> > >
> > > KStream windowedStream = source
> > > .groupByKey()
> > >
> > >
> > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10)))
> > > .count()
> > >
> > >
> > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > .toStream();
> > >
> > >
> > > Best regards,
> > > Viktor Markvardt
> > >
> > > чт, 16 янв. 2020 г. в 04:59, John Roesler :
> > >
> > > > Hi Viktor,
> > > >
> > > > I’m not sure why you get two identical outputs in response to a single
> > > > record. Regardless, since you say that you want to get a single, final
> > > > result for the window and you expect multiple inputs to the windows,
> > you
> > > > need Suppression.
> > > >
> > > > My guess is that you just sent one record to try it out and didn’t see
> > any
> > > > output? This is expected. Just as the window boundaries are defined by
> > the
> > > > time stamps of the records, not by the current system time,
> > suppression is
> > > > governed by the timestamp of the records. I.e., a thirty-second window
> > is
> > > > not actually closed until you see a new record with a timestamp thirty
> > > > seconds later.
> > > >
> > > >  Maybe try just sending a sequence of updates with incrementing
> > > > timestamps. If the first record has timestamp T, then you should see an
> > > > output when you pass in a record with timestamp T+30.
> > > >
> > > > Important note: there is a built-in grace period that delays the
> > output of
> > > > final results after the window ends. For complicated reasons, the
> > default
> > > > is 24 hours! So you would actually not see an output until you send a
> > > > record with timestamp T+30+(24 hours) ! I strongly recommend you set
> > the
> > > > grace period on TimeWindows to zero for your testing. You can increase
> > it
> > > > later if you want to tolerate some late-arriving records.
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> > > > > Hi,
> > > > >
> > > > > My name is Viktor. I'm currently working with Kafka streams and have
> > > > > several questions about Kafka and I can not find answers in the
> > official
> > > > > docs.
> > > > >
> > > > > 1. Why suppress functionality does not work with Hopping windows?
> > How to
> > > > > make it work?
> > > > >
> > > > > Example of the code:
> > > > >
> > > > > KStream finalStream = source
> > > > > .groupByKey()
> > > > >
> > > > >
> > > >
> > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
> > > > > .reduce((aggValue, newValue) -> newValue,
> > > > > Materialized.with(Serdes.String(), Serdes.String()))
> > > > >
> > > > >
> > > >
> > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > > .toStream();
> > > > >
> > > > > finalStream.print(Printed.toSysOut());
> > > > > finalStream.to(outputTopic);
> > > > >
> > > > > After I run the code above - output stream is empty. There were no
> > > > > errors/exceptions.
> > > > > NOTE: With Tumbling Window the code working as expected.
> > > > > Maybe I simply use it incorrectly?
> > > > >
> > > > > 2. Why with Hopping windows (without suppress) there are duplicates
> > in
> > > > the
> > > > > output stream?
> > > > > E.g., I send one record in the input kstream with 

Re: Streams, Kafka windows

2020-01-18 Thread Viktor Markvardt
Hi John,

Thank you for your assistance!
Your example very help me and I understood kafka-streams more clearly now.
Have a nice weekend :)

Best regards,
Viktor Markvardt

чт, 16 янв. 2020 г. в 19:29, John Roesler :

> Hi Viktor,
>
> I’m starting to wonder what exactly “duplicate” means in this context. Can
> you elaborate?
>
> In case it helps, with your window definition, if I send a record with
> timestamp 20, it would actually belong to three different windows:
> [0,30)
> [10,40)
> [20,50)
>
> Because of this, you would (correctly) see three output records for that
> one input, but the outputs wouldn’t be “duplicates” properly, because
> they’d have different keys:
>
> Input:
> Key1: Val1 @ timestamp:20
>
> Output:
> Windowed: 1
> Windowed: 1
> Windowed: 1
>
> Any chance that explains your observation?
>
> Thanks,
> John
>
>
>
> On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote:
> > Hi John,
> >
> > Thanks for answering my questions!
> > I observe behavior which I can not understand.
> > The code is working, but when delay between records larger then window
> > duration I receive duplicated records.
> > With the code below I received duplicated records in the output kstream.
> > Count of duplicate records is always 3. If I change duration/advanceBy
> > count of duplicated records is changing also.
> > Do you have any ideas why duplicated records are received in the output
> > kstream?
> >
> > KStream windowedStream = source
> > .groupByKey()
> >
> >
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10)))
> > .count()
> >
> >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > .toStream();
> >
> >
> > Best regards,
> > Viktor Markvardt
> >
> > чт, 16 янв. 2020 г. в 04:59, John Roesler :
> >
> > > Hi Viktor,
> > >
> > > I’m not sure why you get two identical outputs in response to a single
> > > record. Regardless, since you say that you want to get a single, final
> > > result for the window and you expect multiple inputs to the windows,
> you
> > > need Suppression.
> > >
> > > My guess is that you just sent one record to try it out and didn’t see
> any
> > > output? This is expected. Just as the window boundaries are defined by
> the
> > > time stamps of the records, not by the current system time,
> suppression is
> > > governed by the timestamp of the records. I.e., a thirty-second window
> is
> > > not actually closed until you see a new record with a timestamp thirty
> > > seconds later.
> > >
> > >  Maybe try just sending a sequence of updates with incrementing
> > > timestamps. If the first record has timestamp T, then you should see an
> > > output when you pass in a record with timestamp T+30.
> > >
> > > Important note: there is a built-in grace period that delays the
> output of
> > > final results after the window ends. For complicated reasons, the
> default
> > > is 24 hours! So you would actually not see an output until you send a
> > > record with timestamp T+30+(24 hours) ! I strongly recommend you set
> the
> > > grace period on TimeWindows to zero for your testing. You can increase
> it
> > > later if you want to tolerate some late-arriving records.
> > >
> > > Thanks,
> > > -John
> > >
> > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> > > > Hi,
> > > >
> > > > My name is Viktor. I'm currently working with Kafka streams and have
> > > > several questions about Kafka and I can not find answers in the
> official
> > > > docs.
> > > >
> > > > 1. Why suppress functionality does not work with Hopping windows?
> How to
> > > > make it work?
> > > >
> > > > Example of the code:
> > > >
> > > > KStream finalStream = source
> > > > .groupByKey()
> > > >
> > > >
> > >
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
> > > > .reduce((aggValue, newValue) -> newValue,
> > > > Materialized.with(Serdes.String(), Serdes.String()))
> > > >
> > > >
> > >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > .toStream();
> > > >
> > > > finalStream.print(Printed.toSysOut());
> > > > finalStream.to(outputTopic);
> > > >
> > > > After I run the code above - output stream is empty. There were no
> > > > errors/exceptions.
> > > > NOTE: With Tumbling Window the code working as expected.
> > > > Maybe I simply use it incorrectly?
> > > >
> > > > 2. Why with Hopping windows (without suppress) there are duplicates
> in
> > > the
> > > > output stream?
> > > > E.g., I send one record in the input kstream with Hopping window
> > > > (duration=30s, advanceBy=2s) but get two same records (duplicate) in
> the
> > > > output kstream.
> > > > Is that an expected behavior? If so, how can I filter/switch off
> these
> > > > duplicates?
> > > >
> > > > 3. Mainly I'm trying to solve this problem:
> > > > I have kstream with events inside and events can be repeated
> > > (duplicates).
> > > 

Re: Streams, Kafka windows

2020-01-16 Thread John Roesler
Hi Viktor,

I’m starting to wonder what exactly “duplicate” means in this context. Can you 
elaborate?

In case it helps, with your window definition, if I send a record with 
timestamp 20, it would actually belong to three different windows:
[0,30)
[10,40)
[20,50)

Because of this, you would (correctly) see three output records for that one 
input, but the outputs wouldn’t be “duplicates” properly, because they’d have 
different keys:

Input:
Key1: Val1 @ timestamp:20

Output:
Windowed: 1
Windowed: 1
Windowed: 1

Any chance that explains your observation?

Thanks,
John



On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote:
> Hi John,
> 
> Thanks for answering my questions!
> I observe behavior which I can not understand.
> The code is working, but when delay between records larger then window
> duration I receive duplicated records.
> With the code below I received duplicated records in the output kstream.
> Count of duplicate records is always 3. If I change duration/advanceBy
> count of duplicated records is changing also.
> Do you have any ideas why duplicated records are received in the output
> kstream?
> 
> KStream windowedStream = source
> .groupByKey()
> 
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10)))
> .count()
> 
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> .toStream();
> 
> 
> Best regards,
> Viktor Markvardt
> 
> чт, 16 янв. 2020 г. в 04:59, John Roesler :
> 
> > Hi Viktor,
> >
> > I’m not sure why you get two identical outputs in response to a single
> > record. Regardless, since you say that you want to get a single, final
> > result for the window and you expect multiple inputs to the windows, you
> > need Suppression.
> >
> > My guess is that you just sent one record to try it out and didn’t see any
> > output? This is expected. Just as the window boundaries are defined by the
> > time stamps of the records, not by the current system time, suppression is
> > governed by the timestamp of the records. I.e., a thirty-second window is
> > not actually closed until you see a new record with a timestamp thirty
> > seconds later.
> >
> >  Maybe try just sending a sequence of updates with incrementing
> > timestamps. If the first record has timestamp T, then you should see an
> > output when you pass in a record with timestamp T+30.
> >
> > Important note: there is a built-in grace period that delays the output of
> > final results after the window ends. For complicated reasons, the default
> > is 24 hours! So you would actually not see an output until you send a
> > record with timestamp T+30+(24 hours) ! I strongly recommend you set the
> > grace period on TimeWindows to zero for your testing. You can increase it
> > later if you want to tolerate some late-arriving records.
> >
> > Thanks,
> > -John
> >
> > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> > > Hi,
> > >
> > > My name is Viktor. I'm currently working with Kafka streams and have
> > > several questions about Kafka and I can not find answers in the official
> > > docs.
> > >
> > > 1. Why suppress functionality does not work with Hopping windows? How to
> > > make it work?
> > >
> > > Example of the code:
> > >
> > > KStream finalStream = source
> > > .groupByKey()
> > >
> > >
> > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
> > > .reduce((aggValue, newValue) -> newValue,
> > > Materialized.with(Serdes.String(), Serdes.String()))
> > >
> > >
> > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > .toStream();
> > >
> > > finalStream.print(Printed.toSysOut());
> > > finalStream.to(outputTopic);
> > >
> > > After I run the code above - output stream is empty. There were no
> > > errors/exceptions.
> > > NOTE: With Tumbling Window the code working as expected.
> > > Maybe I simply use it incorrectly?
> > >
> > > 2. Why with Hopping windows (without suppress) there are duplicates in
> > the
> > > output stream?
> > > E.g., I send one record in the input kstream with Hopping window
> > > (duration=30s, advanceBy=2s) but get two same records (duplicate) in the
> > > output kstream.
> > > Is that an expected behavior? If so, how can I filter/switch off these
> > > duplicates?
> > >
> > > 3. Mainly I'm trying to solve this problem:
> > > I have kstream with events inside and events can be repeated
> > (duplicates).
> > > In the output kstream I would like to receive only unique events for the
> > > last 24 hours (window duration) with 1 hour window overlay (window
> > > advanceBy).
> > > Could you recommend me any examples of code or docs please?
> > > I have already read official docs and examples but it was not enough to
> > get
> > > full understanding of how I can achieve this.
> > >
> > > Best regards,
> > > Viktor Markvardt
> > >
> >
>


Re: Streams, Kafka windows

2020-01-16 Thread Viktor Markvardt
Hi John,

Thanks for answering my questions!
I observe behavior which I can not understand.
The code is working, but when delay between records larger then window
duration I receive duplicated records.
With the code below I received duplicated records in the output kstream.
Count of duplicate records is always 3. If I change duration/advanceBy
count of duplicated records is changing also.
Do you have any ideas why duplicated records are received in the output
kstream?

KStream windowedStream = source
.groupByKey()

.windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10)))
.count()
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream();


Best regards,
Viktor Markvardt

чт, 16 янв. 2020 г. в 04:59, John Roesler :

> Hi Viktor,
>
> I’m not sure why you get two identical outputs in response to a single
> record. Regardless, since you say that you want to get a single, final
> result for the window and you expect multiple inputs to the windows, you
> need Suppression.
>
> My guess is that you just sent one record to try it out and didn’t see any
> output? This is expected. Just as the window boundaries are defined by the
> time stamps of the records, not by the current system time, suppression is
> governed by the timestamp of the records. I.e., a thirty-second window is
> not actually closed until you see a new record with a timestamp thirty
> seconds later.
>
>  Maybe try just sending a sequence of updates with incrementing
> timestamps. If the first record has timestamp T, then you should see an
> output when you pass in a record with timestamp T+30.
>
> Important note: there is a built-in grace period that delays the output of
> final results after the window ends. For complicated reasons, the default
> is 24 hours! So you would actually not see an output until you send a
> record with timestamp T+30+(24 hours) ! I strongly recommend you set the
> grace period on TimeWindows to zero for your testing. You can increase it
> later if you want to tolerate some late-arriving records.
>
> Thanks,
> -John
>
> On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> > Hi,
> >
> > My name is Viktor. I'm currently working with Kafka streams and have
> > several questions about Kafka and I can not find answers in the official
> > docs.
> >
> > 1. Why suppress functionality does not work with Hopping windows? How to
> > make it work?
> >
> > Example of the code:
> >
> > KStream finalStream = source
> > .groupByKey()
> >
> >
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
> > .reduce((aggValue, newValue) -> newValue,
> > Materialized.with(Serdes.String(), Serdes.String()))
> >
> >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > .toStream();
> >
> > finalStream.print(Printed.toSysOut());
> > finalStream.to(outputTopic);
> >
> > After I run the code above - output stream is empty. There were no
> > errors/exceptions.
> > NOTE: With Tumbling Window the code working as expected.
> > Maybe I simply use it incorrectly?
> >
> > 2. Why with Hopping windows (without suppress) there are duplicates in
> the
> > output stream?
> > E.g., I send one record in the input kstream with Hopping window
> > (duration=30s, advanceBy=2s) but get two same records (duplicate) in the
> > output kstream.
> > Is that an expected behavior? If so, how can I filter/switch off these
> > duplicates?
> >
> > 3. Mainly I'm trying to solve this problem:
> > I have kstream with events inside and events can be repeated
> (duplicates).
> > In the output kstream I would like to receive only unique events for the
> > last 24 hours (window duration) with 1 hour window overlay (window
> > advanceBy).
> > Could you recommend me any examples of code or docs please?
> > I have already read official docs and examples but it was not enough to
> get
> > full understanding of how I can achieve this.
> >
> > Best regards,
> > Viktor Markvardt
> >
>


Re: Streams, Kafka windows

2020-01-15 Thread John Roesler
Hi Viktor,

I’m not sure why you get two identical outputs in response to a single record. 
Regardless, since you say that you want to get a single, final result for the 
window and you expect multiple inputs to the windows, you need Suppression.

My guess is that you just sent one record to try it out and didn’t see any 
output? This is expected. Just as the window boundaries are defined by the time 
stamps of the records, not by the current system time, suppression is governed 
by the timestamp of the records. I.e., a thirty-second window is not actually 
closed until you see a new record with a timestamp thirty seconds later.

 Maybe try just sending a sequence of updates with incrementing timestamps. If 
the first record has timestamp T, then you should see an output when you pass 
in a record with timestamp T+30. 

Important note: there is a built-in grace period that delays the output of 
final results after the window ends. For complicated reasons, the default is 24 
hours! So you would actually not see an output until you send a record with 
timestamp T+30+(24 hours) ! I strongly recommend you set the grace period on 
TimeWindows to zero for your testing. You can increase it later if you want to 
tolerate some late-arriving records. 

Thanks,
-John

On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> Hi,
> 
> My name is Viktor. I'm currently working with Kafka streams and have
> several questions about Kafka and I can not find answers in the official
> docs.
> 
> 1. Why suppress functionality does not work with Hopping windows? How to
> make it work?
> 
> Example of the code:
> 
> KStream finalStream = source
> .groupByKey()
> 
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
> .reduce((aggValue, newValue) -> newValue,
> Materialized.with(Serdes.String(), Serdes.String()))
> 
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> .toStream();
> 
> finalStream.print(Printed.toSysOut());
> finalStream.to(outputTopic);
> 
> After I run the code above - output stream is empty. There were no
> errors/exceptions.
> NOTE: With Tumbling Window the code working as expected.
> Maybe I simply use it incorrectly?
> 
> 2. Why with Hopping windows (without suppress) there are duplicates in the
> output stream?
> E.g., I send one record in the input kstream with Hopping window
> (duration=30s, advanceBy=2s) but get two same records (duplicate) in the
> output kstream.
> Is that an expected behavior? If so, how can I filter/switch off these
> duplicates?
> 
> 3. Mainly I'm trying to solve this problem:
> I have kstream with events inside and events can be repeated (duplicates).
> In the output kstream I would like to receive only unique events for the
> last 24 hours (window duration) with 1 hour window overlay (window
> advanceBy).
> Could you recommend me any examples of code or docs please?
> I have already read official docs and examples but it was not enough to get
> full understanding of how I can achieve this.
> 
> Best regards,
> Viktor Markvardt
>


Re: Streams, Kafka windows

2020-01-14 Thread Sachin Mittal
You can try to convert the final resultant stream to table.
Check this page for more info:
https://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

This way table would always contain the latest (single) record for a given
key.

Sachin




On Tue, Jan 14, 2020 at 10:11 PM Viktor Markvardt <
viktor.markva...@gmail.com> wrote:

> Hi,
>
> My name is Viktor. I'm currently working with Kafka streams and have
> several questions about Kafka and I can not find answers in the official
> docs.
>
> 1. Why suppress functionality does not work with Hopping windows? How to
> make it work?
>
> Example of the code:
>
> KStream finalStream = source
> .groupByKey()
>
>
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
> .reduce((aggValue, newValue) -> newValue,
> Materialized.with(Serdes.String(), Serdes.String()))
>
>
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> .toStream();
>
> finalStream.print(Printed.toSysOut());
> finalStream.to(outputTopic);
>
> After I run the code above - output stream is empty. There were no
> errors/exceptions.
> NOTE: With Tumbling Window the code working as expected.
> Maybe I simply use it incorrectly?
>
> 2. Why with Hopping windows (without suppress) there are duplicates in the
> output stream?
> E.g., I send one record in the input kstream with Hopping window
> (duration=30s, advanceBy=2s) but get two same records (duplicate) in the
> output kstream.
> Is that an expected behavior? If so, how can I filter/switch off these
> duplicates?
>
> 3. Mainly I'm trying to solve this problem:
> I have kstream with events inside and events can be repeated (duplicates).
> In the output kstream I would like to receive only unique events for the
> last 24 hours (window duration) with 1 hour window overlay (window
> advanceBy).
> Could you recommend me any examples of code or docs please?
> I have already read official docs and examples but it was not enough to get
> full understanding of how I can achieve this.
>
> Best regards,
> Viktor Markvardt
>