I understand now. The commit triggers the output of the window data, whether or not the window is complete. For example, if I use .print() as you suggest:
[KSTREAM-AGGREGATE-0000000003]: [kafka@1483631920000] , (9<-null) [KSTREAM-AGGREGATE-0000000003]: [kafka@1483631925000] , (5<-null) [KSTREAM-AGGREGATE-0000000003]: [kafka@1483631925000] , (9<-null) [KSTREAM-AGGREGATE-0000000003]: [kafka@1483631930000] , (2<-null) The second line is an intermediate result of the third line. I suppose this is fine if we are storing the count with the time window, but not if we are trying to do a total count of the each word. I'm guessing the only solution is a handcrafted solution using the lower level API as suggested in the stackoverflow post. I have another question concerning how the Count ktable data is stored. If I understand correctly, on restart the process will re-create the state of the ktable by reading from the beginning the Count topic (wordcount-lambda-example-Counts-changelog). Over time wouldn't this be a lot of data? Or is there some mechanism used to only read from a position near the end? On Wed, Jan 4, 2017 at 7:35 PM Matthias J. Sax <matth...@confluent.io> wrote: > There is no such thing as a final window aggregate and you might see > intermediate results -- thus the count do not add up. > > Please have a look here: > > > http://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable/38945277#38945277 > > and here: > > > http://docs.confluent.io/current/streams/developer-guide.html#memory-management > > > On each commit, the current intermediate result will be flushed from the > de-duplication cache -- thus, for smaller commit interval you see more > intermediate results and thus it seems to be more off. > > In .toStream((k, v) -> k.key()) you get rid of the window-id -- if you > keep it, you can see which result record belong to the same window. The > simplest way for testing would be to use .print() instead of .toStream() > to see the key as window-id plus record-key. > > > -Matthias > > > On 1/4/17 2:09 PM, Benjamin Black wrote: > > I'm hoping the DSL will do what I want :) Currently the example is > > continuously adding instead of bucketing, so if I modify it by adding a > > window to the count function: > > > > .groupBy((key, word) -> word) > > .count(TimeWindows.of(5000L), "Counts") > > .toStream((k, v) -> k.key()); > > > > Then I do see bucketing happening. However, it isn't accurate. For > example, > > I type into the console "kafka" as 20 sentences, but the output I get is: > > > > kafka 4 > > kafka 9 > > kafka 2 > > kafka 7 > > > > Which equals 22. What am I doing wrong? What is the relationship between > > commit interval and time window. The smaller I make commit interval, the > > less accurate it becomes. > > > > > > On Wed, Jan 4, 2017 at 3:53 PM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> Do you know about Kafka Streams? It's DSL gives you exactly what you > >> want to do. > >> > >> Check out the documentation and WordCount example: > >> > >> http://docs.confluent.io/current/streams/index.html > >> > >> > https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java > >> > >> > >> Let us know if you have further questions. > >> > >> > >> -Matthias > >> > >> On 1/4/17 12:48 PM, Benjamin Black wrote: > >>> Hello, > >>> > >>> I'm looking for guidance on how to approach a counting problem. We want > >> to > >>> consume a stream of data that consists of IDs and generate an output of > >> the > >>> aggregated count with a window size of X seconds using processing time > >> and > >>> a hopping time window. For example, using a window size of 1 second, if > >> we > >>> get IDs 1, 2, 2, 2 in the 1st second, then the output would be 1=1, > 2=3. > >> If > >>> we get IDs 1, 3, 3 in the 2nd second then the output would be 1=1, 3=2. > >> The > >>> aggregated count will then be turned into increment commands to a cache > >> and > >>> a database. > >>> > >>> Obviously we will need some state to be stored during the count of a > >>> window, but we only need to keep it for the time period of the window > >> (i.e. > >>> a second). I was thinking this could be achieved by using a persistent > >>> store, where the counts are reset during the punctuate and the store > >> topic > >>> uses log compression. Alternatively, we could simple have an in memory > >>> store that is reset during the punctuate. My concern with the in memory > >>> store is that I don't know when the input topic offset is committed or > >> when > >>> the output data is written and therefore we could lose data. > Ultimately, > >> at > >>> the end of the second, the input offset and output data should be > written > >>> at the same time, reducing the likelihood of lost data. We would rather > >>> lose data, than have duplicate counts. What is the correct approach? Is > >>> there a better way of tackling the problem? > >>> > >>> I have put together some code, but it doesn't do exactly what I expect. > >> I'm > >>> happy to share if it helps. > >>> > >>> Thanks, > >>> Ben > >>> > >> > >> > > > >