I am using a session store in a kafka streams application. With caching turned 
on, average fetch latency was very high, about 200 ms after running for about 1 
hour. With caching turned off, it was about 100 μs. We seem to be running fine 
without caching, but I am very curious as to why caching performance is so bad 
in our case. Any insight into what might be going on would be helpful.


Setup/config

  *   I'm using a custom transformer, but the implementation is almost 
identical to this section of KStreamSessionWindowAggregate 
https://github.com/apache/kafka/blob/fdcf75ea326b8e07d8d7c4f5cc14f1e70451bcd4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L94-L113
 The main difference is I'm forwarding something other than the updated session 
downstream
  *   Logging is turned off, so updates are not pushed to a change log topic. 
The store starts empty whenever streams is initialized
  *   I don't think I'm setting any potentially related configs. Rocksdb config 
is the default.

We're receiving about 1000 messages/second in a topic w/ five partitions. With 
caching turned on, this custom transformer is the bottleneck and processing 
rate is much lower, 100-200 ops per second. With caching turned off the volume 
is no problem. There are about 500k unique keys per hour.

Using a sampling profiler I saw that most time was spent in TreeMap operations. 
Unfortunately I don't have a copy of the profile data anymore, but I think the 
map in question is the `cache` field in the NamedCache class.

If I look at a plot of fetch latency vs time since starting, it looks to me 
like latency is about O(log(time)). I think what's going on is the size of the 
map is increasing linearly in time, particularly for the first few minutes that 
streams is running, because almost all keys will be unique. So the latency is 
almost entirely spent in TreeMap#get.

Questions:
1) Does my theory make sense?
2) Could the issue be related to the fact that I'm using a state store with the 
transformer/processor API vs the dsl? I know that caching is turned on by 
default for state stores in the dsl but not in the processor API, but I don't 
understand why.
3) My understanding is that streams side state store caching is an optimization 
to reduce the number of writes to the underlying rocksdb store. In that case, 
because I have so many unique keys, and the same keys usually show up a few 
minutes apart, it makes sense that caching wouldn't do much for me. Is that 
correct?
4) Given that things seem to work fine with caching turned off, could there be 
any advantage to having it on and configured differently? If so, what 
configuration changes should I try?

If there's any additional context I can provide please let me know.

Thanks,
Sam



Reply via email to