I understand now. The commit triggers the output of the window data,
whether or not the window is complete. For example, if I use .print() as
you suggest:

[KSTREAM-AGGREGATE-0000000003]: [kafka@1483631920000] , (9<-null)
[KSTREAM-AGGREGATE-0000000003]: [kafka@1483631925000] , (5<-null)
[KSTREAM-AGGREGATE-0000000003]: [kafka@1483631925000] , (9<-null)
[KSTREAM-AGGREGATE-0000000003]: [kafka@1483631930000] , (2<-null)

The second line is an intermediate result of the third line. I suppose this
is fine if we are storing the count with the time window, but not if we are
trying to do a total count of the each word. I'm guessing the only solution
is a handcrafted solution using the lower level API as suggested in the
stackoverflow post.

I have another question concerning how the Count ktable data is stored. If
I understand correctly, on restart the process will re-create the state of
the ktable by reading from the beginning the Count topic
(wordcount-lambda-example-Counts-changelog). Over time wouldn't this be a
lot of data? Or is there some mechanism used to only read from a position
near the end?

On Wed, Jan 4, 2017 at 7:35 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> There is no such thing as a final window aggregate and you might see
> intermediate results -- thus the count do not add up.
>
> Please have a look here:
>
>
> http://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable/38945277#38945277
>
> and here:
>
>
> http://docs.confluent.io/current/streams/developer-guide.html#memory-management
>
>
> On each commit, the current intermediate result will be flushed from the
> de-duplication cache -- thus, for smaller commit interval you see more
> intermediate results and thus it seems to be more off.
>
> In .toStream((k, v) -> k.key()) you get rid of the window-id -- if you
> keep it, you can see which result record belong to the same window. The
> simplest way for testing would be to use .print() instead of .toStream()
> to see the key as window-id plus record-key.
>
>
> -Matthias
>
>
> On 1/4/17 2:09 PM, Benjamin Black wrote:
> > I'm hoping the DSL will do what I want :) Currently the example is
> > continuously adding instead of bucketing, so if I modify it by adding a
> > window to the count function:
> >
> > .groupBy((key, word) -> word)
> > .count(TimeWindows.of(5000L), "Counts")
> > .toStream((k, v) -> k.key());
> >
> > Then I do see bucketing happening. However, it isn't accurate. For
> example,
> > I type into the console "kafka" as 20 sentences, but the output I get is:
> >
> > kafka 4
> > kafka 9
> > kafka 2
> > kafka 7
> >
> > Which equals 22. What am I doing wrong? What is the relationship between
> > commit interval and time window. The smaller I make commit interval, the
> > less accurate it becomes.
> >
> >
> > On Wed, Jan 4, 2017 at 3:53 PM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> Do you know about Kafka Streams? It's DSL gives you exactly what you
> >> want to do.
> >>
> >> Check out the documentation and WordCount example:
> >>
> >> http://docs.confluent.io/current/streams/index.html
> >>
> >>
> https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java
> >>
> >>
> >> Let us know if you have further questions.
> >>
> >>
> >> -Matthias
> >>
> >> On 1/4/17 12:48 PM, Benjamin Black wrote:
> >>> Hello,
> >>>
> >>> I'm looking for guidance on how to approach a counting problem. We want
> >> to
> >>> consume a stream of data that consists of IDs and generate an output of
> >> the
> >>> aggregated count with a window size of X seconds using processing time
> >> and
> >>> a hopping time window. For example, using a window size of 1 second, if
> >> we
> >>> get IDs 1, 2, 2, 2 in the 1st second, then the output would be 1=1,
> 2=3.
> >> If
> >>> we get IDs 1, 3, 3 in the 2nd second then the output would be 1=1, 3=2.
> >> The
> >>> aggregated count will then be turned into increment commands to a cache
> >> and
> >>> a database.
> >>>
> >>> Obviously we will need some state to be stored during the count of a
> >>> window, but we only need to keep it for the time period of the window
> >> (i.e.
> >>> a second). I was thinking this could be achieved by using a persistent
> >>> store, where the counts are reset during the punctuate and the store
> >> topic
> >>> uses log compression. Alternatively, we could simple have an in memory
> >>> store that is reset during the punctuate. My concern with the in memory
> >>> store is that I don't know when the input topic offset is committed or
> >> when
> >>> the output data is written and therefore we could lose data.
> Ultimately,
> >> at
> >>> the end of the second, the input offset and output data should be
> written
> >>> at the same time, reducing the likelihood of lost data. We would rather
> >>> lose data, than have duplicate counts. What is the correct approach? Is
> >>> there a better way of tackling the problem?
> >>>
> >>> I have put together some code, but it doesn't do exactly what I expect.
> >> I'm
> >>> happy to share if it helps.
> >>>
> >>> Thanks,
> >>> Ben
> >>>
> >>
> >>
> >
>
>

Reply via email to