Thanks for your response Damian.

> However the in-memory store will write each update to the changelog 
> (regardless of context.commit), which seems to be the issue you have?
Yes. I have the issue.
Although I can't say a specific number, it is issue for me, for example,
that Kafka Streams reads hundreds of thousands of EPS source streams and writes 
hundreds of thousands of EPS changelogs on the broker.

> Have you tested it and observed that it puts load on the broker?
Yes. 
I checked the behavior for each state store type in a small test.
I tried the following in the test code with 3 patterns:
  P1:persistent state store without cache /  P2:persistent state store with 
cache / P3:in-memory state store.

1. Set up a local broker
2. Create a source topic for 1 partition
3. Produce 10 records whose value changes with the same key to source topic
4. Execute the Kafka Streams application that includes WordCountProcessor 
(changelog topic is left up to automatic creation by Kafka Streams)
5. Consume the changelog topic from the first offset. 

Since I cleaned the environment every time I tested, 3 patterns of tests were 
independent.
It was confirmed that compaction was not occurred by the broker.
As a result, I observed 10 records from changelog topic in the case where the 
state store is P1:(persistent without cache) and P3(in case of in-memory).
I observed 1 record which is the latest record only when the state store is 
P2(persistent with cache).
I expected that records written to changelog will be reduced even in the case 
of in-memory state store, but in my local test it did not.

My Question:
Is this a normal behavior of Kafka Streams?

Thank you.
--
Daisuke




Hi,

There is no way to enable caching on in-memory-store - by definition it is
already cached. However the in-memory store will write each update to the
changelog (regardless of context.commit), which seems to be the issue you
have?

When you say large, how large? Have you tested it and observed that it puts
load on the broker?

Thanks,
Damian

On Wed, 11 Jan 2017 at 06:10 Daisuke Moriya <dmor...@yahoo-corp.jp> wrote:

Hi.

I am developing a simple log counting application using Kafka Streams
0.10.1.1.
Its implementation is almost the same as the WordCountProcessor in the
confluent document [
http://docs.confluent.io/3.1.1/streams/developer-guide.html#processor-api].
I am using in-memory state store,
its key is the ID of log category, value is count.
All the changelogs are written to a broker by context.commit() for fault
tolerance,
but since the data I handle is large and the size of key is large, it takes
a long time to process.
Even if it is compacted with a broker, this will put a load on the broker.
I would like to write only the latest records for each key on the broker
instead of all changelogs at context.commit().
This will reduce the load on the broker and I do not think
there will be any negative impact on fault tolerance.
If I use the persistent state store, I can do this by enabling caching,
but I couldn't find how to accomplish this with the in-memory state store.
Can I do this?

Thank you,
--
Daisuke



Reply via email to