Re: Streams, Kafka windows
Setting a new record for procrastination, I've just created this ticket: https://issues.apache.org/jira/browse/KAFKA-10058 On Sat, Jan 18, 2020, at 22:03, John Roesler wrote: > Good idea! I’ll make a note to do it when I’m at a computer. > > On Sat, Jan 18, 2020, at 21:51, Guozhang Wang wrote: > > Hey John, > > > > Since this is a common question and I've seen many users asking about > > window semantics like this, could you file a JIRA ticket for creating a > > wiki page like Join Semantics ( > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics) > > to summarize the windowing operations like this too? > > > > Guozhang > > > > On Sat, Jan 18, 2020 at 3:22 PM John Roesler wrote: > > > > > Glad it helped! > > > -John > > > > > > On Sat, Jan 18, 2020, at 12:27, Viktor Markvardt wrote: > > > > Hi John, > > > > > > > > Thank you for your assistance! > > > > Your example very help me and I understood kafka-streams more clearly > > > now. > > > > Have a nice weekend :) > > > > > > > > Best regards, > > > > Viktor Markvardt > > > > > > > > чт, 16 янв. 2020 г. в 19:29, John Roesler : > > > > > > > > > Hi Viktor, > > > > > > > > > > I’m starting to wonder what exactly “duplicate” means in this context. > > > Can > > > > > you elaborate? > > > > > > > > > > In case it helps, with your window definition, if I send a record with > > > > > timestamp 20, it would actually belong to three different windows: > > > > > [0,30) > > > > > [10,40) > > > > > [20,50) > > > > > > > > > > Because of this, you would (correctly) see three output records for > > > that > > > > > one input, but the outputs wouldn’t be “duplicates” properly, because > > > > > they’d have different keys: > > > > > > > > > > Input: > > > > > Key1: Val1 @ timestamp:20 > > > > > > > > > > Output: > > > > > Windowed: 1 > > > > > Windowed: 1 > > > > > Windowed: 1 > > > > > > > > > > Any chance that explains your observation? > > > > > > > > > > Thanks, > > > > > John > > > > > > > > > > > > > > > > > > > > On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote: > > > > > > Hi John, > > > > > > > > > > > > Thanks for answering my questions! > > > > > > I observe behavior which I can not understand. > > > > > > The code is working, but when delay between records larger then > > > window > > > > > > duration I receive duplicated records. > > > > > > With the code below I received duplicated records in the output > > > kstream. > > > > > > Count of duplicate records is always 3. If I change > > > duration/advanceBy > > > > > > count of duplicated records is changing also. > > > > > > Do you have any ideas why duplicated records are received in the > > > output > > > > > > kstream? > > > > > > > > > > > > KStream windowedStream = source > > > > > > .groupByKey() > > > > > > > > > > > > > > > > > > > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10))) > > > > > > .count() > > > > > > > > > > > > > > > > > > > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > > > > > .toStream(); > > > > > > > > > > > > > > > > > > Best regards, > > > > > > Viktor Markvardt > > > > > > > > > > > > чт, 16 янв. 2020 г. в 04:59, John Roesler : > > > > > > > > > > > > > Hi Viktor, > > > > > > > > > > > > > > I’m not sure why you get two identical outputs in response to a > > > single > > > > > > > record. Regardless, since you say that you want to get a single, > > > final > > > > > > > result for the window and you expect multiple inputs to the > > > windows, > > > > > you > > > > > > > need Suppression. > > > > > > > > > > > > > > My guess is that you just sent one record to try it out and didn’t > > > see > > > > > any > > > > > > > output? This is expected. Just as the window boundaries are > > > defined by > > > > > the > > > > > > > time stamps of the records, not by the current system time, > > > > > suppression is > > > > > > > governed by the timestamp of the records. I.e., a thirty-second > > > window > > > > > is > > > > > > > not actually closed until you see a new record with a timestamp > > > thirty > > > > > > > seconds later. > > > > > > > > > > > > > > Maybe try just sending a sequence of updates with incrementing > > > > > > > timestamps. If the first record has timestamp T, then you should > > > see an > > > > > > > output when you pass in a record with timestamp T+30. > > > > > > > > > > > > > > Important note: there is a built-in grace period that delays the > > > > > output of > > > > > > > final results after the window ends. For complicated reasons, the > > > > > default > > > > > > > is 24 hours! So you would actually not see an output until you > > > send a > > > > > > > record with timestamp T+30+(24 hours) ! I strongly recommend you > > > set > > > > > the > > > > > > > grace period on TimeWindows to zero for your testing. You can > > > increase > > > > > it > > > > > > > later if you want to tolerate some late-arriving
Re: Streams, Kafka windows
Good idea! I’ll make a note to do it when I’m at a computer. On Sat, Jan 18, 2020, at 21:51, Guozhang Wang wrote: > Hey John, > > Since this is a common question and I've seen many users asking about > window semantics like this, could you file a JIRA ticket for creating a > wiki page like Join Semantics ( > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics) > to summarize the windowing operations like this too? > > Guozhang > > On Sat, Jan 18, 2020 at 3:22 PM John Roesler wrote: > > > Glad it helped! > > -John > > > > On Sat, Jan 18, 2020, at 12:27, Viktor Markvardt wrote: > > > Hi John, > > > > > > Thank you for your assistance! > > > Your example very help me and I understood kafka-streams more clearly > > now. > > > Have a nice weekend :) > > > > > > Best regards, > > > Viktor Markvardt > > > > > > чт, 16 янв. 2020 г. в 19:29, John Roesler : > > > > > > > Hi Viktor, > > > > > > > > I’m starting to wonder what exactly “duplicate” means in this context. > > Can > > > > you elaborate? > > > > > > > > In case it helps, with your window definition, if I send a record with > > > > timestamp 20, it would actually belong to three different windows: > > > > [0,30) > > > > [10,40) > > > > [20,50) > > > > > > > > Because of this, you would (correctly) see three output records for > > that > > > > one input, but the outputs wouldn’t be “duplicates” properly, because > > > > they’d have different keys: > > > > > > > > Input: > > > > Key1: Val1 @ timestamp:20 > > > > > > > > Output: > > > > Windowed: 1 > > > > Windowed: 1 > > > > Windowed: 1 > > > > > > > > Any chance that explains your observation? > > > > > > > > Thanks, > > > > John > > > > > > > > > > > > > > > > On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote: > > > > > Hi John, > > > > > > > > > > Thanks for answering my questions! > > > > > I observe behavior which I can not understand. > > > > > The code is working, but when delay between records larger then > > window > > > > > duration I receive duplicated records. > > > > > With the code below I received duplicated records in the output > > kstream. > > > > > Count of duplicate records is always 3. If I change > > duration/advanceBy > > > > > count of duplicated records is changing also. > > > > > Do you have any ideas why duplicated records are received in the > > output > > > > > kstream? > > > > > > > > > > KStream windowedStream = source > > > > > .groupByKey() > > > > > > > > > > > > > > > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10))) > > > > > .count() > > > > > > > > > > > > > > > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > > > > .toStream(); > > > > > > > > > > > > > > > Best regards, > > > > > Viktor Markvardt > > > > > > > > > > чт, 16 янв. 2020 г. в 04:59, John Roesler : > > > > > > > > > > > Hi Viktor, > > > > > > > > > > > > I’m not sure why you get two identical outputs in response to a > > single > > > > > > record. Regardless, since you say that you want to get a single, > > final > > > > > > result for the window and you expect multiple inputs to the > > windows, > > > > you > > > > > > need Suppression. > > > > > > > > > > > > My guess is that you just sent one record to try it out and didn’t > > see > > > > any > > > > > > output? This is expected. Just as the window boundaries are > > defined by > > > > the > > > > > > time stamps of the records, not by the current system time, > > > > suppression is > > > > > > governed by the timestamp of the records. I.e., a thirty-second > > window > > > > is > > > > > > not actually closed until you see a new record with a timestamp > > thirty > > > > > > seconds later. > > > > > > > > > > > > Maybe try just sending a sequence of updates with incrementing > > > > > > timestamps. If the first record has timestamp T, then you should > > see an > > > > > > output when you pass in a record with timestamp T+30. > > > > > > > > > > > > Important note: there is a built-in grace period that delays the > > > > output of > > > > > > final results after the window ends. For complicated reasons, the > > > > default > > > > > > is 24 hours! So you would actually not see an output until you > > send a > > > > > > record with timestamp T+30+(24 hours) ! I strongly recommend you > > set > > > > the > > > > > > grace period on TimeWindows to zero for your testing. You can > > increase > > > > it > > > > > > later if you want to tolerate some late-arriving records. > > > > > > > > > > > > Thanks, > > > > > > -John > > > > > > > > > > > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote: > > > > > > > Hi, > > > > > > > > > > > > > > My name is Viktor. I'm currently working with Kafka streams and > > have > > > > > > > several questions about Kafka and I can not find answers in the > > > > official > > > > > > > docs. > > > > > > > > > > > > > > 1. Why suppress functionality does not work with Hopping wind
Re: Streams, Kafka windows
Hey John, Since this is a common question and I've seen many users asking about window semantics like this, could you file a JIRA ticket for creating a wiki page like Join Semantics ( https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics) to summarize the windowing operations like this too? Guozhang On Sat, Jan 18, 2020 at 3:22 PM John Roesler wrote: > Glad it helped! > -John > > On Sat, Jan 18, 2020, at 12:27, Viktor Markvardt wrote: > > Hi John, > > > > Thank you for your assistance! > > Your example very help me and I understood kafka-streams more clearly > now. > > Have a nice weekend :) > > > > Best regards, > > Viktor Markvardt > > > > чт, 16 янв. 2020 г. в 19:29, John Roesler : > > > > > Hi Viktor, > > > > > > I’m starting to wonder what exactly “duplicate” means in this context. > Can > > > you elaborate? > > > > > > In case it helps, with your window definition, if I send a record with > > > timestamp 20, it would actually belong to three different windows: > > > [0,30) > > > [10,40) > > > [20,50) > > > > > > Because of this, you would (correctly) see three output records for > that > > > one input, but the outputs wouldn’t be “duplicates” properly, because > > > they’d have different keys: > > > > > > Input: > > > Key1: Val1 @ timestamp:20 > > > > > > Output: > > > Windowed: 1 > > > Windowed: 1 > > > Windowed: 1 > > > > > > Any chance that explains your observation? > > > > > > Thanks, > > > John > > > > > > > > > > > > On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote: > > > > Hi John, > > > > > > > > Thanks for answering my questions! > > > > I observe behavior which I can not understand. > > > > The code is working, but when delay between records larger then > window > > > > duration I receive duplicated records. > > > > With the code below I received duplicated records in the output > kstream. > > > > Count of duplicate records is always 3. If I change > duration/advanceBy > > > > count of duplicated records is changing also. > > > > Do you have any ideas why duplicated records are received in the > output > > > > kstream? > > > > > > > > KStream windowedStream = source > > > > .groupByKey() > > > > > > > > > > > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10))) > > > > .count() > > > > > > > > > > > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > > > .toStream(); > > > > > > > > > > > > Best regards, > > > > Viktor Markvardt > > > > > > > > чт, 16 янв. 2020 г. в 04:59, John Roesler : > > > > > > > > > Hi Viktor, > > > > > > > > > > I’m not sure why you get two identical outputs in response to a > single > > > > > record. Regardless, since you say that you want to get a single, > final > > > > > result for the window and you expect multiple inputs to the > windows, > > > you > > > > > need Suppression. > > > > > > > > > > My guess is that you just sent one record to try it out and didn’t > see > > > any > > > > > output? This is expected. Just as the window boundaries are > defined by > > > the > > > > > time stamps of the records, not by the current system time, > > > suppression is > > > > > governed by the timestamp of the records. I.e., a thirty-second > window > > > is > > > > > not actually closed until you see a new record with a timestamp > thirty > > > > > seconds later. > > > > > > > > > > Maybe try just sending a sequence of updates with incrementing > > > > > timestamps. If the first record has timestamp T, then you should > see an > > > > > output when you pass in a record with timestamp T+30. > > > > > > > > > > Important note: there is a built-in grace period that delays the > > > output of > > > > > final results after the window ends. For complicated reasons, the > > > default > > > > > is 24 hours! So you would actually not see an output until you > send a > > > > > record with timestamp T+30+(24 hours) ! I strongly recommend you > set > > > the > > > > > grace period on TimeWindows to zero for your testing. You can > increase > > > it > > > > > later if you want to tolerate some late-arriving records. > > > > > > > > > > Thanks, > > > > > -John > > > > > > > > > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote: > > > > > > Hi, > > > > > > > > > > > > My name is Viktor. I'm currently working with Kafka streams and > have > > > > > > several questions about Kafka and I can not find answers in the > > > official > > > > > > docs. > > > > > > > > > > > > 1. Why suppress functionality does not work with Hopping windows? > > > How to > > > > > > make it work? > > > > > > > > > > > > Example of the code: > > > > > > > > > > > > KStream finalStream = source > > > > > > .groupByKey() > > > > > > > > > > > > > > > > > > > > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10))) > > > > > > .reduce((aggValue, newValue) -> newValue, > > > > > > Materialized.with(Serdes.Strin
Re: Streams, Kafka windows
Glad it helped! -John On Sat, Jan 18, 2020, at 12:27, Viktor Markvardt wrote: > Hi John, > > Thank you for your assistance! > Your example very help me and I understood kafka-streams more clearly now. > Have a nice weekend :) > > Best regards, > Viktor Markvardt > > чт, 16 янв. 2020 г. в 19:29, John Roesler : > > > Hi Viktor, > > > > I’m starting to wonder what exactly “duplicate” means in this context. Can > > you elaborate? > > > > In case it helps, with your window definition, if I send a record with > > timestamp 20, it would actually belong to three different windows: > > [0,30) > > [10,40) > > [20,50) > > > > Because of this, you would (correctly) see three output records for that > > one input, but the outputs wouldn’t be “duplicates” properly, because > > they’d have different keys: > > > > Input: > > Key1: Val1 @ timestamp:20 > > > > Output: > > Windowed: 1 > > Windowed: 1 > > Windowed: 1 > > > > Any chance that explains your observation? > > > > Thanks, > > John > > > > > > > > On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote: > > > Hi John, > > > > > > Thanks for answering my questions! > > > I observe behavior which I can not understand. > > > The code is working, but when delay between records larger then window > > > duration I receive duplicated records. > > > With the code below I received duplicated records in the output kstream. > > > Count of duplicate records is always 3. If I change duration/advanceBy > > > count of duplicated records is changing also. > > > Do you have any ideas why duplicated records are received in the output > > > kstream? > > > > > > KStream windowedStream = source > > > .groupByKey() > > > > > > > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10))) > > > .count() > > > > > > > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > > .toStream(); > > > > > > > > > Best regards, > > > Viktor Markvardt > > > > > > чт, 16 янв. 2020 г. в 04:59, John Roesler : > > > > > > > Hi Viktor, > > > > > > > > I’m not sure why you get two identical outputs in response to a single > > > > record. Regardless, since you say that you want to get a single, final > > > > result for the window and you expect multiple inputs to the windows, > > you > > > > need Suppression. > > > > > > > > My guess is that you just sent one record to try it out and didn’t see > > any > > > > output? This is expected. Just as the window boundaries are defined by > > the > > > > time stamps of the records, not by the current system time, > > suppression is > > > > governed by the timestamp of the records. I.e., a thirty-second window > > is > > > > not actually closed until you see a new record with a timestamp thirty > > > > seconds later. > > > > > > > > Maybe try just sending a sequence of updates with incrementing > > > > timestamps. If the first record has timestamp T, then you should see an > > > > output when you pass in a record with timestamp T+30. > > > > > > > > Important note: there is a built-in grace period that delays the > > output of > > > > final results after the window ends. For complicated reasons, the > > default > > > > is 24 hours! So you would actually not see an output until you send a > > > > record with timestamp T+30+(24 hours) ! I strongly recommend you set > > the > > > > grace period on TimeWindows to zero for your testing. You can increase > > it > > > > later if you want to tolerate some late-arriving records. > > > > > > > > Thanks, > > > > -John > > > > > > > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote: > > > > > Hi, > > > > > > > > > > My name is Viktor. I'm currently working with Kafka streams and have > > > > > several questions about Kafka and I can not find answers in the > > official > > > > > docs. > > > > > > > > > > 1. Why suppress functionality does not work with Hopping windows? > > How to > > > > > make it work? > > > > > > > > > > Example of the code: > > > > > > > > > > KStream finalStream = source > > > > > .groupByKey() > > > > > > > > > > > > > > > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10))) > > > > > .reduce((aggValue, newValue) -> newValue, > > > > > Materialized.with(Serdes.String(), Serdes.String())) > > > > > > > > > > > > > > > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > > > > .toStream(); > > > > > > > > > > finalStream.print(Printed.toSysOut()); > > > > > finalStream.to(outputTopic); > > > > > > > > > > After I run the code above - output stream is empty. There were no > > > > > errors/exceptions. > > > > > NOTE: With Tumbling Window the code working as expected. > > > > > Maybe I simply use it incorrectly? > > > > > > > > > > 2. Why with Hopping windows (without suppress) there are duplicates > > in > > > > the > > > > > output stream? > > > > > E.g., I send one record in the input kstream with H
Re: Streams, Kafka windows
Hi John, Thank you for your assistance! Your example very help me and I understood kafka-streams more clearly now. Have a nice weekend :) Best regards, Viktor Markvardt чт, 16 янв. 2020 г. в 19:29, John Roesler : > Hi Viktor, > > I’m starting to wonder what exactly “duplicate” means in this context. Can > you elaborate? > > In case it helps, with your window definition, if I send a record with > timestamp 20, it would actually belong to three different windows: > [0,30) > [10,40) > [20,50) > > Because of this, you would (correctly) see three output records for that > one input, but the outputs wouldn’t be “duplicates” properly, because > they’d have different keys: > > Input: > Key1: Val1 @ timestamp:20 > > Output: > Windowed: 1 > Windowed: 1 > Windowed: 1 > > Any chance that explains your observation? > > Thanks, > John > > > > On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote: > > Hi John, > > > > Thanks for answering my questions! > > I observe behavior which I can not understand. > > The code is working, but when delay between records larger then window > > duration I receive duplicated records. > > With the code below I received duplicated records in the output kstream. > > Count of duplicate records is always 3. If I change duration/advanceBy > > count of duplicated records is changing also. > > Do you have any ideas why duplicated records are received in the output > > kstream? > > > > KStream windowedStream = source > > .groupByKey() > > > > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10))) > > .count() > > > > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > .toStream(); > > > > > > Best regards, > > Viktor Markvardt > > > > чт, 16 янв. 2020 г. в 04:59, John Roesler : > > > > > Hi Viktor, > > > > > > I’m not sure why you get two identical outputs in response to a single > > > record. Regardless, since you say that you want to get a single, final > > > result for the window and you expect multiple inputs to the windows, > you > > > need Suppression. > > > > > > My guess is that you just sent one record to try it out and didn’t see > any > > > output? This is expected. Just as the window boundaries are defined by > the > > > time stamps of the records, not by the current system time, > suppression is > > > governed by the timestamp of the records. I.e., a thirty-second window > is > > > not actually closed until you see a new record with a timestamp thirty > > > seconds later. > > > > > > Maybe try just sending a sequence of updates with incrementing > > > timestamps. If the first record has timestamp T, then you should see an > > > output when you pass in a record with timestamp T+30. > > > > > > Important note: there is a built-in grace period that delays the > output of > > > final results after the window ends. For complicated reasons, the > default > > > is 24 hours! So you would actually not see an output until you send a > > > record with timestamp T+30+(24 hours) ! I strongly recommend you set > the > > > grace period on TimeWindows to zero for your testing. You can increase > it > > > later if you want to tolerate some late-arriving records. > > > > > > Thanks, > > > -John > > > > > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote: > > > > Hi, > > > > > > > > My name is Viktor. I'm currently working with Kafka streams and have > > > > several questions about Kafka and I can not find answers in the > official > > > > docs. > > > > > > > > 1. Why suppress functionality does not work with Hopping windows? > How to > > > > make it work? > > > > > > > > Example of the code: > > > > > > > > KStream finalStream = source > > > > .groupByKey() > > > > > > > > > > > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10))) > > > > .reduce((aggValue, newValue) -> newValue, > > > > Materialized.with(Serdes.String(), Serdes.String())) > > > > > > > > > > > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > > > .toStream(); > > > > > > > > finalStream.print(Printed.toSysOut()); > > > > finalStream.to(outputTopic); > > > > > > > > After I run the code above - output stream is empty. There were no > > > > errors/exceptions. > > > > NOTE: With Tumbling Window the code working as expected. > > > > Maybe I simply use it incorrectly? > > > > > > > > 2. Why with Hopping windows (without suppress) there are duplicates > in > > > the > > > > output stream? > > > > E.g., I send one record in the input kstream with Hopping window > > > > (duration=30s, advanceBy=2s) but get two same records (duplicate) in > the > > > > output kstream. > > > > Is that an expected behavior? If so, how can I filter/switch off > these > > > > duplicates? > > > > > > > > 3. Mainly I'm trying to solve this problem: > > > > I have kstream with events inside and events can be repeated > > > (duplicates). > > >
Re: Streams, Kafka windows
Hi Viktor, I’m starting to wonder what exactly “duplicate” means in this context. Can you elaborate? In case it helps, with your window definition, if I send a record with timestamp 20, it would actually belong to three different windows: [0,30) [10,40) [20,50) Because of this, you would (correctly) see three output records for that one input, but the outputs wouldn’t be “duplicates” properly, because they’d have different keys: Input: Key1: Val1 @ timestamp:20 Output: Windowed: 1 Windowed: 1 Windowed: 1 Any chance that explains your observation? Thanks, John On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote: > Hi John, > > Thanks for answering my questions! > I observe behavior which I can not understand. > The code is working, but when delay between records larger then window > duration I receive duplicated records. > With the code below I received duplicated records in the output kstream. > Count of duplicate records is always 3. If I change duration/advanceBy > count of duplicated records is changing also. > Do you have any ideas why duplicated records are received in the output > kstream? > > KStream windowedStream = source > .groupByKey() > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10))) > .count() > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > .toStream(); > > > Best regards, > Viktor Markvardt > > чт, 16 янв. 2020 г. в 04:59, John Roesler : > > > Hi Viktor, > > > > I’m not sure why you get two identical outputs in response to a single > > record. Regardless, since you say that you want to get a single, final > > result for the window and you expect multiple inputs to the windows, you > > need Suppression. > > > > My guess is that you just sent one record to try it out and didn’t see any > > output? This is expected. Just as the window boundaries are defined by the > > time stamps of the records, not by the current system time, suppression is > > governed by the timestamp of the records. I.e., a thirty-second window is > > not actually closed until you see a new record with a timestamp thirty > > seconds later. > > > > Maybe try just sending a sequence of updates with incrementing > > timestamps. If the first record has timestamp T, then you should see an > > output when you pass in a record with timestamp T+30. > > > > Important note: there is a built-in grace period that delays the output of > > final results after the window ends. For complicated reasons, the default > > is 24 hours! So you would actually not see an output until you send a > > record with timestamp T+30+(24 hours) ! I strongly recommend you set the > > grace period on TimeWindows to zero for your testing. You can increase it > > later if you want to tolerate some late-arriving records. > > > > Thanks, > > -John > > > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote: > > > Hi, > > > > > > My name is Viktor. I'm currently working with Kafka streams and have > > > several questions about Kafka and I can not find answers in the official > > > docs. > > > > > > 1. Why suppress functionality does not work with Hopping windows? How to > > > make it work? > > > > > > Example of the code: > > > > > > KStream finalStream = source > > > .groupByKey() > > > > > > > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10))) > > > .reduce((aggValue, newValue) -> newValue, > > > Materialized.with(Serdes.String(), Serdes.String())) > > > > > > > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > > .toStream(); > > > > > > finalStream.print(Printed.toSysOut()); > > > finalStream.to(outputTopic); > > > > > > After I run the code above - output stream is empty. There were no > > > errors/exceptions. > > > NOTE: With Tumbling Window the code working as expected. > > > Maybe I simply use it incorrectly? > > > > > > 2. Why with Hopping windows (without suppress) there are duplicates in > > the > > > output stream? > > > E.g., I send one record in the input kstream with Hopping window > > > (duration=30s, advanceBy=2s) but get two same records (duplicate) in the > > > output kstream. > > > Is that an expected behavior? If so, how can I filter/switch off these > > > duplicates? > > > > > > 3. Mainly I'm trying to solve this problem: > > > I have kstream with events inside and events can be repeated > > (duplicates). > > > In the output kstream I would like to receive only unique events for the > > > last 24 hours (window duration) with 1 hour window overlay (window > > > advanceBy). > > > Could you recommend me any examples of code or docs please? > > > I have already read official docs and examples but it was not enough to > > get > > > full understanding of how I can achieve this. > > > > > > Best regards, > > > Viktor Markvardt > > > > > >
Re: Streams, Kafka windows
Hi John, Thanks for answering my questions! I observe behavior which I can not understand. The code is working, but when delay between records larger then window duration I receive duplicated records. With the code below I received duplicated records in the output kstream. Count of duplicate records is always 3. If I change duration/advanceBy count of duplicated records is changing also. Do you have any ideas why duplicated records are received in the output kstream? KStream windowedStream = source .groupByKey() .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10))) .count() .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) .toStream(); Best regards, Viktor Markvardt чт, 16 янв. 2020 г. в 04:59, John Roesler : > Hi Viktor, > > I’m not sure why you get two identical outputs in response to a single > record. Regardless, since you say that you want to get a single, final > result for the window and you expect multiple inputs to the windows, you > need Suppression. > > My guess is that you just sent one record to try it out and didn’t see any > output? This is expected. Just as the window boundaries are defined by the > time stamps of the records, not by the current system time, suppression is > governed by the timestamp of the records. I.e., a thirty-second window is > not actually closed until you see a new record with a timestamp thirty > seconds later. > > Maybe try just sending a sequence of updates with incrementing > timestamps. If the first record has timestamp T, then you should see an > output when you pass in a record with timestamp T+30. > > Important note: there is a built-in grace period that delays the output of > final results after the window ends. For complicated reasons, the default > is 24 hours! So you would actually not see an output until you send a > record with timestamp T+30+(24 hours) ! I strongly recommend you set the > grace period on TimeWindows to zero for your testing. You can increase it > later if you want to tolerate some late-arriving records. > > Thanks, > -John > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote: > > Hi, > > > > My name is Viktor. I'm currently working with Kafka streams and have > > several questions about Kafka and I can not find answers in the official > > docs. > > > > 1. Why suppress functionality does not work with Hopping windows? How to > > make it work? > > > > Example of the code: > > > > KStream finalStream = source > > .groupByKey() > > > > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10))) > > .reduce((aggValue, newValue) -> newValue, > > Materialized.with(Serdes.String(), Serdes.String())) > > > > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > .toStream(); > > > > finalStream.print(Printed.toSysOut()); > > finalStream.to(outputTopic); > > > > After I run the code above - output stream is empty. There were no > > errors/exceptions. > > NOTE: With Tumbling Window the code working as expected. > > Maybe I simply use it incorrectly? > > > > 2. Why with Hopping windows (without suppress) there are duplicates in > the > > output stream? > > E.g., I send one record in the input kstream with Hopping window > > (duration=30s, advanceBy=2s) but get two same records (duplicate) in the > > output kstream. > > Is that an expected behavior? If so, how can I filter/switch off these > > duplicates? > > > > 3. Mainly I'm trying to solve this problem: > > I have kstream with events inside and events can be repeated > (duplicates). > > In the output kstream I would like to receive only unique events for the > > last 24 hours (window duration) with 1 hour window overlay (window > > advanceBy). > > Could you recommend me any examples of code or docs please? > > I have already read official docs and examples but it was not enough to > get > > full understanding of how I can achieve this. > > > > Best regards, > > Viktor Markvardt > > >
Re: Streams, Kafka windows
Hi Viktor, I’m not sure why you get two identical outputs in response to a single record. Regardless, since you say that you want to get a single, final result for the window and you expect multiple inputs to the windows, you need Suppression. My guess is that you just sent one record to try it out and didn’t see any output? This is expected. Just as the window boundaries are defined by the time stamps of the records, not by the current system time, suppression is governed by the timestamp of the records. I.e., a thirty-second window is not actually closed until you see a new record with a timestamp thirty seconds later. Maybe try just sending a sequence of updates with incrementing timestamps. If the first record has timestamp T, then you should see an output when you pass in a record with timestamp T+30. Important note: there is a built-in grace period that delays the output of final results after the window ends. For complicated reasons, the default is 24 hours! So you would actually not see an output until you send a record with timestamp T+30+(24 hours) ! I strongly recommend you set the grace period on TimeWindows to zero for your testing. You can increase it later if you want to tolerate some late-arriving records. Thanks, -John On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote: > Hi, > > My name is Viktor. I'm currently working with Kafka streams and have > several questions about Kafka and I can not find answers in the official > docs. > > 1. Why suppress functionality does not work with Hopping windows? How to > make it work? > > Example of the code: > > KStream finalStream = source > .groupByKey() > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10))) > .reduce((aggValue, newValue) -> newValue, > Materialized.with(Serdes.String(), Serdes.String())) > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > .toStream(); > > finalStream.print(Printed.toSysOut()); > finalStream.to(outputTopic); > > After I run the code above - output stream is empty. There were no > errors/exceptions. > NOTE: With Tumbling Window the code working as expected. > Maybe I simply use it incorrectly? > > 2. Why with Hopping windows (without suppress) there are duplicates in the > output stream? > E.g., I send one record in the input kstream with Hopping window > (duration=30s, advanceBy=2s) but get two same records (duplicate) in the > output kstream. > Is that an expected behavior? If so, how can I filter/switch off these > duplicates? > > 3. Mainly I'm trying to solve this problem: > I have kstream with events inside and events can be repeated (duplicates). > In the output kstream I would like to receive only unique events for the > last 24 hours (window duration) with 1 hour window overlay (window > advanceBy). > Could you recommend me any examples of code or docs please? > I have already read official docs and examples but it was not enough to get > full understanding of how I can achieve this. > > Best regards, > Viktor Markvardt >
Re: Streams, Kafka windows
You can try to convert the final resultant stream to table. Check this page for more info: https://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step This way table would always contain the latest (single) record for a given key. Sachin On Tue, Jan 14, 2020 at 10:11 PM Viktor Markvardt < viktor.markva...@gmail.com> wrote: > Hi, > > My name is Viktor. I'm currently working with Kafka streams and have > several questions about Kafka and I can not find answers in the official > docs. > > 1. Why suppress functionality does not work with Hopping windows? How to > make it work? > > Example of the code: > > KStream finalStream = source > .groupByKey() > > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10))) > .reduce((aggValue, newValue) -> newValue, > Materialized.with(Serdes.String(), Serdes.String())) > > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > .toStream(); > > finalStream.print(Printed.toSysOut()); > finalStream.to(outputTopic); > > After I run the code above - output stream is empty. There were no > errors/exceptions. > NOTE: With Tumbling Window the code working as expected. > Maybe I simply use it incorrectly? > > 2. Why with Hopping windows (without suppress) there are duplicates in the > output stream? > E.g., I send one record in the input kstream with Hopping window > (duration=30s, advanceBy=2s) but get two same records (duplicate) in the > output kstream. > Is that an expected behavior? If so, how can I filter/switch off these > duplicates? > > 3. Mainly I'm trying to solve this problem: > I have kstream with events inside and events can be repeated (duplicates). > In the output kstream I would like to receive only unique events for the > last 24 hours (window duration) with 1 hour window overlay (window > advanceBy). > Could you recommend me any examples of code or docs please? > I have already read official docs and examples but it was not enough to get > full understanding of how I can achieve this. > > Best regards, > Viktor Markvardt >
Streams, Kafka windows
Hi, My name is Viktor. I'm currently working with Kafka streams and have several questions about Kafka and I can not find answers in the official docs. 1. Why suppress functionality does not work with Hopping windows? How to make it work? Example of the code: KStream finalStream = source .groupByKey() .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10))) .reduce((aggValue, newValue) -> newValue, Materialized.with(Serdes.String(), Serdes.String())) .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) .toStream(); finalStream.print(Printed.toSysOut()); finalStream.to(outputTopic); After I run the code above - output stream is empty. There were no errors/exceptions. NOTE: With Tumbling Window the code working as expected. Maybe I simply use it incorrectly? 2. Why with Hopping windows (without suppress) there are duplicates in the output stream? E.g., I send one record in the input kstream with Hopping window (duration=30s, advanceBy=2s) but get two same records (duplicate) in the output kstream. Is that an expected behavior? If so, how can I filter/switch off these duplicates? 3. Mainly I'm trying to solve this problem: I have kstream with events inside and events can be repeated (duplicates). In the output kstream I would like to receive only unique events for the last 24 hours (window duration) with 1 hour window overlay (window advanceBy). Could you recommend me any examples of code or docs please? I have already read official docs and examples but it was not enough to get full understanding of how I can achieve this. Best regards, Viktor Markvardt