On a clean restart on the same machine, the local RocksDB will just be
reused as it contains the complete state. Thus there is no need to read
the changelog topic at all.

The changelog topic is only read when a state is moved from one node to
another, or the state got corrupted due to an failure (ie, recovery case).

For both those cases, the whole changelog will be consumed. This might
take some time. But keep in mind, that a changelog topic uses
"compaction policy" thus it will eventually only contain a single entry
per window -- additionally, a "retention policy" is applied and entries
are delete after window retention time expires.

If you still have a too large changelog topic and rebuilding the state
takes too long, you can configure StandByTasks that rebuild the state on
a different machine in the background constantly (they do not do any
actual processing). In case of failure StandByTasks will be used to
quickly recreate the failed tasks.

See:
http://docs.confluent.io/current/streams/architecture.html#fault-tolerance


-Matthias

On 1/5/17 8:13 AM, Benjamin Black wrote:
> I understand now. The commit triggers the output of the window data,
> whether or not the window is complete. For example, if I use .print() as
> you suggest:
> 
> [KSTREAM-AGGREGATE-0000000003]: [kafka@1483631920000] , (9<-null)
> [KSTREAM-AGGREGATE-0000000003]: [kafka@1483631925000] , (5<-null)
> [KSTREAM-AGGREGATE-0000000003]: [kafka@1483631925000] , (9<-null)
> [KSTREAM-AGGREGATE-0000000003]: [kafka@1483631930000] , (2<-null)
> 
> The second line is an intermediate result of the third line. I suppose this
> is fine if we are storing the count with the time window, but not if we are
> trying to do a total count of the each word. I'm guessing the only solution
> is a handcrafted solution using the lower level API as suggested in the
> stackoverflow post.
> 
> I have another question concerning how the Count ktable data is stored. If
> I understand correctly, on restart the process will re-create the state of
> the ktable by reading from the beginning the Count topic
> (wordcount-lambda-example-Counts-changelog). Over time wouldn't this be a
> lot of data? Or is there some mechanism used to only read from a position
> near the end?
> 
> On Wed, Jan 4, 2017 at 7:35 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> There is no such thing as a final window aggregate and you might see
>> intermediate results -- thus the count do not add up.
>>
>> Please have a look here:
>>
>>
>> http://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable/38945277#38945277
>>
>> and here:
>>
>>
>> http://docs.confluent.io/current/streams/developer-guide.html#memory-management
>>
>>
>> On each commit, the current intermediate result will be flushed from the
>> de-duplication cache -- thus, for smaller commit interval you see more
>> intermediate results and thus it seems to be more off.
>>
>> In .toStream((k, v) -> k.key()) you get rid of the window-id -- if you
>> keep it, you can see which result record belong to the same window. The
>> simplest way for testing would be to use .print() instead of .toStream()
>> to see the key as window-id plus record-key.
>>
>>
>> -Matthias
>>
>>
>> On 1/4/17 2:09 PM, Benjamin Black wrote:
>>> I'm hoping the DSL will do what I want :) Currently the example is
>>> continuously adding instead of bucketing, so if I modify it by adding a
>>> window to the count function:
>>>
>>> .groupBy((key, word) -> word)
>>> .count(TimeWindows.of(5000L), "Counts")
>>> .toStream((k, v) -> k.key());
>>>
>>> Then I do see bucketing happening. However, it isn't accurate. For
>> example,
>>> I type into the console "kafka" as 20 sentences, but the output I get is:
>>>
>>> kafka 4
>>> kafka 9
>>> kafka 2
>>> kafka 7
>>>
>>> Which equals 22. What am I doing wrong? What is the relationship between
>>> commit interval and time window. The smaller I make commit interval, the
>>> less accurate it becomes.
>>>
>>>
>>> On Wed, Jan 4, 2017 at 3:53 PM Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>
>>>> Do you know about Kafka Streams? It's DSL gives you exactly what you
>>>> want to do.
>>>>
>>>> Check out the documentation and WordCount example:
>>>>
>>>> http://docs.confluent.io/current/streams/index.html
>>>>
>>>>
>> https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java
>>>>
>>>>
>>>> Let us know if you have further questions.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 1/4/17 12:48 PM, Benjamin Black wrote:
>>>>> Hello,
>>>>>
>>>>> I'm looking for guidance on how to approach a counting problem. We want
>>>> to
>>>>> consume a stream of data that consists of IDs and generate an output of
>>>> the
>>>>> aggregated count with a window size of X seconds using processing time
>>>> and
>>>>> a hopping time window. For example, using a window size of 1 second, if
>>>> we
>>>>> get IDs 1, 2, 2, 2 in the 1st second, then the output would be 1=1,
>> 2=3.
>>>> If
>>>>> we get IDs 1, 3, 3 in the 2nd second then the output would be 1=1, 3=2.
>>>> The
>>>>> aggregated count will then be turned into increment commands to a cache
>>>> and
>>>>> a database.
>>>>>
>>>>> Obviously we will need some state to be stored during the count of a
>>>>> window, but we only need to keep it for the time period of the window
>>>> (i.e.
>>>>> a second). I was thinking this could be achieved by using a persistent
>>>>> store, where the counts are reset during the punctuate and the store
>>>> topic
>>>>> uses log compression. Alternatively, we could simple have an in memory
>>>>> store that is reset during the punctuate. My concern with the in memory
>>>>> store is that I don't know when the input topic offset is committed or
>>>> when
>>>>> the output data is written and therefore we could lose data.
>> Ultimately,
>>>> at
>>>>> the end of the second, the input offset and output data should be
>> written
>>>>> at the same time, reducing the likelihood of lost data. We would rather
>>>>> lose data, than have duplicate counts. What is the correct approach? Is
>>>>> there a better way of tackling the problem?
>>>>>
>>>>> I have put together some code, but it doesn't do exactly what I expect.
>>>> I'm
>>>>> happy to share if it helps.
>>>>>
>>>>> Thanks,
>>>>> Ben
>>>>>
>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to