Re: Kafka Streams Session store fetch latency very high with caching turned on

2018-06-25 Thread Guozhang Wang
Hello Sam,

That is an interesting find. My reasoning is similar to yours: since you
have 1K / sec input traffic, it means 3600K / hour. Since you mean there
are about 500K / hour unique keys, it means each key will be updated
roughly about 7 times per hour. Assuming the traffic is even not skewed,
and your cache size is not large (by default it is only 50Mb) then it may
not help too much.

About the caching space, we had some optimizations along with KIP-155
implementations before (https://github.com/apache/kafka/pull/3027), to
reduce the search space inside cache to also corresponding segments in the
underlying store, for the same reason that `TreeMap#get()` is less
efficient with large key space. For session windows, there is no fixed
window length and the segment interval is purely dependent on the retention
period, my suspicion is that if your retention period is set to be small,
then `sessionStore#findSessions()`'s range will still span over almost all
segments which will not help reducing the search key space.

So for your case, I think disabling caching for that session store is a
good idea. At the same time we should consider further improving our
caching implementations to have sth. better than TreeMap.



Guozhang




On Sun, Jun 24, 2018 at 5:47 PM, Matthias J. Sax 
wrote:

> Sam,
>
> Thanks for your email. This is a very interesting find. I did not double
> check the code but your reasoning makes sense to me. Note, that caching
> was _not_ introduced to reduce the writes to RocksDB, but to reduce the
> write the the changelog topic and to reduce the number of records send
> downstream.
>
> Because, you don't want to have a fault-tolerant store and disabled
> caching, I see no reason why disabling caching would be a bad idea for
> your use case.
>
> From a performance point of view, there should be no difference between
> DSL and Processor API. Note, that the DSL sits on top of Processor API
> and at runtime, we don't even know if DSL was used or not. Caching is
> enabled by default to reduce the downstream load -- we have many
> discussion if this is the best default behavior. The latest conclusion
> was, that it is... :)
>
>
>
> -Matthias
>
>
>
> On 6/22/18 12:11 PM, Sam Lendle wrote:
> > I am using a session store in a kafka streams application. With caching
> turned on, average fetch latency was very high, about 200 ms after running
> for about 1 hour. With caching turned off, it was about 100 μs. We seem to
> be running fine without caching, but I am very curious as to why caching
> performance is so bad in our case. Any insight into what might be going on
> would be helpful.
> >
> >
> > Setup/config
> >
> >   *   I'm using a custom transformer, but the implementation is almost
> identical to this section of KStreamSessionWindowAggregate
> https://github.com/apache/kafka/blob/fdcf75ea326b8e07d8d7c4f5cc14f1
> e70451bcd4/streams/src/main/java/org/apache/kafka/streams/
> kstream/internals/KStreamSessionWindowAggregate.java#L94-L113 The main
> difference is I'm forwarding something other than the updated session
> downstream
> >   *   Logging is turned off, so updates are not pushed to a change log
> topic. The store starts empty whenever streams is initialized
> >   *   I don't think I'm setting any potentially related configs. Rocksdb
> config is the default.
> >
> > We're receiving about 1000 messages/second in a topic w/ five
> partitions. With caching turned on, this custom transformer is the
> bottleneck and processing rate is much lower, 100-200 ops per second. With
> caching turned off the volume is no problem. There are about 500k unique
> keys per hour.
> >
> > Using a sampling profiler I saw that most time was spent in TreeMap
> operations. Unfortunately I don't have a copy of the profile data anymore,
> but I think the map in question is the `cache` field in the NamedCache
> class.
> >
> > If I look at a plot of fetch latency vs time since starting, it looks to
> me like latency is about O(log(time)). I think what's going on is the size
> of the map is increasing linearly in time, particularly for the first few
> minutes that streams is running, because almost all keys will be unique. So
> the latency is almost entirely spent in TreeMap#get.
> >
> > Questions:
> > 1) Does my theory make sense?
> > 2) Could the issue be related to the fact that I'm using a state store
> with the transformer/processor API vs the dsl? I know that caching is
> turned on by default for state stores in the dsl but not in the processor
> API, but I don't understand why.
> > 3) My understanding is that streams side state store caching is an
> optimization to reduce the number of writes to the underlying rocksdb
> store. In that case, because I have so many unique keys, and the same keys
> usually show up a few minutes apart, it makes sense that caching wouldn't
> do much for me. Is that correct?
> > 4) Given that things seem to work fine with caching turned off, could
> there be any advantage to having it 

Re: Kafka Streams Session store fetch latency very high with caching turned on

2018-06-24 Thread Matthias J. Sax
Sam,

Thanks for your email. This is a very interesting find. I did not double
check the code but your reasoning makes sense to me. Note, that caching
was _not_ introduced to reduce the writes to RocksDB, but to reduce the
write the the changelog topic and to reduce the number of records send
downstream.

Because, you don't want to have a fault-tolerant store and disabled
caching, I see no reason why disabling caching would be a bad idea for
your use case.

From a performance point of view, there should be no difference between
DSL and Processor API. Note, that the DSL sits on top of Processor API
and at runtime, we don't even know if DSL was used or not. Caching is
enabled by default to reduce the downstream load -- we have many
discussion if this is the best default behavior. The latest conclusion
was, that it is... :)



-Matthias



On 6/22/18 12:11 PM, Sam Lendle wrote:
> I am using a session store in a kafka streams application. With caching 
> turned on, average fetch latency was very high, about 200 ms after running 
> for about 1 hour. With caching turned off, it was about 100 μs. We seem to be 
> running fine without caching, but I am very curious as to why caching 
> performance is so bad in our case. Any insight into what might be going on 
> would be helpful.
> 
> 
> Setup/config
> 
>   *   I'm using a custom transformer, but the implementation is almost 
> identical to this section of KStreamSessionWindowAggregate 
> https://github.com/apache/kafka/blob/fdcf75ea326b8e07d8d7c4f5cc14f1e70451bcd4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L94-L113
>  The main difference is I'm forwarding something other than the updated 
> session downstream
>   *   Logging is turned off, so updates are not pushed to a change log topic. 
> The store starts empty whenever streams is initialized
>   *   I don't think I'm setting any potentially related configs. Rocksdb 
> config is the default.
> 
> We're receiving about 1000 messages/second in a topic w/ five partitions. 
> With caching turned on, this custom transformer is the bottleneck and 
> processing rate is much lower, 100-200 ops per second. With caching turned 
> off the volume is no problem. There are about 500k unique keys per hour.
> 
> Using a sampling profiler I saw that most time was spent in TreeMap 
> operations. Unfortunately I don't have a copy of the profile data anymore, 
> but I think the map in question is the `cache` field in the NamedCache class.
> 
> If I look at a plot of fetch latency vs time since starting, it looks to me 
> like latency is about O(log(time)). I think what's going on is the size of 
> the map is increasing linearly in time, particularly for the first few 
> minutes that streams is running, because almost all keys will be unique. So 
> the latency is almost entirely spent in TreeMap#get.
> 
> Questions:
> 1) Does my theory make sense?
> 2) Could the issue be related to the fact that I'm using a state store with 
> the transformer/processor API vs the dsl? I know that caching is turned on by 
> default for state stores in the dsl but not in the processor API, but I don't 
> understand why.
> 3) My understanding is that streams side state store caching is an 
> optimization to reduce the number of writes to the underlying rocksdb store. 
> In that case, because I have so many unique keys, and the same keys usually 
> show up a few minutes apart, it makes sense that caching wouldn't do much for 
> me. Is that correct?
> 4) Given that things seem to work fine with caching turned off, could there 
> be any advantage to having it on and configured differently? If so, what 
> configuration changes should I try?
> 
> If there's any additional context I can provide please let me know.
> 
> Thanks,
> Sam
> 
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Kafka Streams Session store fetch latency very high with caching turned on

2018-06-22 Thread Sam Lendle
I am using a session store in a kafka streams application. With caching turned 
on, average fetch latency was very high, about 200 ms after running for about 1 
hour. With caching turned off, it was about 100 μs. We seem to be running fine 
without caching, but I am very curious as to why caching performance is so bad 
in our case. Any insight into what might be going on would be helpful.


Setup/config

  *   I'm using a custom transformer, but the implementation is almost 
identical to this section of KStreamSessionWindowAggregate 
https://github.com/apache/kafka/blob/fdcf75ea326b8e07d8d7c4f5cc14f1e70451bcd4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L94-L113
 The main difference is I'm forwarding something other than the updated session 
downstream
  *   Logging is turned off, so updates are not pushed to a change log topic. 
The store starts empty whenever streams is initialized
  *   I don't think I'm setting any potentially related configs. Rocksdb config 
is the default.

We're receiving about 1000 messages/second in a topic w/ five partitions. With 
caching turned on, this custom transformer is the bottleneck and processing 
rate is much lower, 100-200 ops per second. With caching turned off the volume 
is no problem. There are about 500k unique keys per hour.

Using a sampling profiler I saw that most time was spent in TreeMap operations. 
Unfortunately I don't have a copy of the profile data anymore, but I think the 
map in question is the `cache` field in the NamedCache class.

If I look at a plot of fetch latency vs time since starting, it looks to me 
like latency is about O(log(time)). I think what's going on is the size of the 
map is increasing linearly in time, particularly for the first few minutes that 
streams is running, because almost all keys will be unique. So the latency is 
almost entirely spent in TreeMap#get.

Questions:
1) Does my theory make sense?
2) Could the issue be related to the fact that I'm using a state store with the 
transformer/processor API vs the dsl? I know that caching is turned on by 
default for state stores in the dsl but not in the processor API, but I don't 
understand why.
3) My understanding is that streams side state store caching is an optimization 
to reduce the number of writes to the underlying rocksdb store. In that case, 
because I have so many unique keys, and the same keys usually show up a few 
minutes apart, it makes sense that caching wouldn't do much for me. Is that 
correct?
4) Given that things seem to work fine with caching turned off, could there be 
any advantage to having it on and configured differently? If so, what 
configuration changes should I try?

If there's any additional context I can provide please let me know.

Thanks,
Sam