Ah thanks so much for the insights -- we should be in a position to profile the new library against real data in the next week or so so I'll let you know how it goes.
On Oct 11, 2016 6:26 PM, "Guozhang Wang" <wangg...@gmail.com> wrote: > Hello Greg, > > I can share some context of KIP-63 here: > > 1. Like Eno mentioned, we believe RocksDB's own mem-table is already > optimizing a large portion of IO access for its write performance, and > adding an extra caching layer on top of that was mainly for saving ser-de > costs (note that you still need to ser / deser key-value objects into bytes > when interacting with RocksDB). Although it may further help IO, it is not > the main motivation. > > 2. As part of KIP-63 Bill helped investigating the pros / cons of such > object caching (https://issues.apache.org/jira/browse/KAFKA-3973), and our > conclusion based on that is, although it saves serde costs, it also makes > memory management very hard in the long run, with caching based on > num.records, not num.bytes. And when you have an OOM in one of the > instances, it may well result in cascading failures from rebalances and > task migration. Ideally, we want to have some restrict memory bound for > better capacity planning and integration with cluster resource managers > (see > https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Memory+ > Management+in+Kafka+Streams > for more details). > > 3. So as part of KIP-63, we removed object-oriented caching and replaced > with bytes caches, and in addition add the RocksDBConfigSetter to allow > users to configure their RocksDB to tune for their write / > space amplifications for IO. > > > With that, I think shutting off caching for your case should not degrading > the performance too much assuming RocksDB itself can already do a good job > in terms of write access, it may add extra serde costs though depending > your use case (originally it is like 1000 records per cache, so roughly > speaking you are saving those many serde calls per store). But if you do > observe significant performance degradation I'd personally love to learn > more and help on that end. > > > Guozhang > > > > > > On Tue, Oct 11, 2016 at 10:10 AM, Greg Fodor <gfo...@gmail.com> wrote: > > > Thanks Eno -- my understanding is that cache is already enabled to be > > 100MB per rocksdb so it should be on already, but I'll check. I was > > wondering if you could shed some light on the changes between 0.10.0 > > and 0.10.1 -- in 0.10.0 there was an intermediate cache within > > RocksDbStore -- presumably this was there to improve performance, > > despite there still being a lower level cache managed by rocksdb. Can > > you shed some light why this cache was needed in 0.10.0? If it sounds > > like our use case won't warrant the same need then we might be OK. > > > > Overall however, this is really problematic for us, since we will have > > to turn off caching for effectively all of our jobs. The way our > > system works is that we have a number of jobs running kafka streams > > that are configured via database tables we change via our web stack. > > For example, when we want to tell our jobs to begin processing data > > for a user, we insert a record for that user into the database which > > gets passed via kafka connect to a kafka topic. The kafka streams job > > is consuming this topic, does some basic group by operations and > > repartitions on it, and joins it against other data streams so that it > > knows what users should be getting processed. > > > > So fundamentally we have two types of aggregations: the typical case > > that was I think the target for the optimizations in KIP-63, where > > latency is less critical since we are counting and emitting counts for > > analysis, etc. And the other type of aggregation is where we are doing > > simple transformations on data coming from the database in a way to > > configure the live behavior of the job. Latency here is very > > sensitive: users expect the job to react and start sending data for a > > user immediately after the database records are changed. > > > > So as you can see, since this is the paradigm we use to operate jobs, > > we're in a bad position if we ever want to take advantage of the work > > in KIP-63. All of our jobs are set up to work in this way, so we will > > either have to maintain our fork or will have to shut off caching for > > all of our jobs, neither of which sounds like a very good path. > > > > On Tue, Oct 11, 2016 at 4:16 AM, Eno Thereska <eno.there...@gmail.com> > > wrote: > > > Hi Greg, > > > > > > An alternative would be to set up RocksDB's cache, while keeping the > > streams cache to 0. That might give you what you need, especially if you > > can work with RocksDb and don't need to change the store. > > > > > > For example, here is how to set the Block Cache size to 100MB and the > > Write Buffer size to 32MB > > > > > > https://github.com/facebook/rocksdb/wiki/Block-Cache < > > https://github.com/facebook/rocksdb/wiki/Block-Cache> > > > https://github.com/facebook/rocksdb/wiki/Basic-Operations#write-buffer > < > > https://github.com/facebook/rocksdb/wiki/Basic-Operations#write-buffer> > > > > > > They can override these settings by creating an impl of > > RocksDBConfigSetter and setting StreamsConfig.ROCKSDB_CONFIG_ > SETTER_CLASS_CONFIG > > in Kafka Streams. > > > > > > Hope this helps, > > > Eno > > > > > >> On 10 Oct 2016, at 18:19, Greg Fodor <gfo...@gmail.com> wrote: > > >> > > >> Hey Eno, thanks for the suggestion -- understood that my patch is not > > >> something that could be accepted given the API change, I posted it to > > help > > >> make the discussion concrete and because i needed a workaround. > (Likely > > >> we'll maintain this patch internally so we can move forward with the > new > > >> version, since the consumer heartbeat issue is something we really > need > > >> addressed.) > > >> > > >> Looking at the code, it seems that setting the cache size to zero will > > >> disable all caching. However, the previous version of Kafka Streams > had > > a > > >> local cache within the RocksDBStore to reduce I/O. If we were to set > the > > >> cache size to zero, my guess is we'd see a large increase in I/O > > relative > > >> to the previous version since we would no longer have caching of any > > kind > > >> even intra-store. By the looks of it there isn't an easy way to > > replicate > > >> the same caching behavior as the old version of Kafka Streams in the > new > > >> system without increasing latency, but maybe I'm missing something. > > >> > > >> > > >> On Oct 10, 2016 3:10 AM, "Eno Thereska" <eno.there...@gmail.com> > wrote: > > >> > > >>> Hi Greg, > > >>> > > >>> Thanks for trying 0.10.1. The best option you have for your specific > > app > > >>> is to simply turn off caching by setting the cache size to 0. That > > should > > >>> give you the old behaviour: > > >>> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_ > > BUFFERING_CONFIG, > > >>> 0L); > > >>> > > >>> Your PR is an alternative, but it requires changing the APIs and > would > > >>> require a KIP. > > >>> > > >>> Thanks > > >>> Eno > > >>> > > >>>> On 9 Oct 2016, at 23:49, Greg Fodor <gfo...@gmail.com> wrote: > > >>>> > > >>>> JIRA opened here: https://issues.apache.org/jira/browse/KAFKA-4281 > > >>>> > > >>>> On Sun, Oct 9, 2016 at 2:02 AM, Greg Fodor <gfo...@gmail.com> > wrote: > > >>>>> I went ahead and did some more testing, and it feels to me one > option > > >>>>> for resolving this issue is having a method on KGroupedStream which > > >>>>> can be used to configure if the operations on it (reduce/aggregate) > > >>>>> will forward immediately or not. I did a quick patch and was able > to > > >>>>> determine that if the records are forwarded immediately it resolves > > >>>>> the issue I am seeing. Having it be done on a per-KGroupedStream > > basis > > >>>>> would provide maximum flexibility. > > >>>>> > > >>>>> On Sun, Oct 9, 2016 at 1:06 AM, Greg Fodor <gfo...@gmail.com> > wrote: > > >>>>>> I'm taking 0.10.1 for a spin on our existing Kafka Streams jobs > and > > >>>>>> I'm hitting what seems to be a serious issue (at least, for us) > with > > >>>>>> the changes brought about in KIP-63. In our job, we have a number > of > > >>>>>> steps in the topology where we perform a repartition and > aggregation > > >>>>>> on topics that require low latency. These topics have a very low > > >>>>>> message volume but require subsecond latency for the aggregations > to > > >>>>>> complete since they are configuration data that drive the rest of > > the > > >>>>>> job and need to be applied immediately. > > >>>>>> > > >>>>>> In 0.10.0, we performed a through (for repartitioning) and > > aggregateBy > > >>>>>> and this resulted in minimal latency as the aggregateBy would just > > >>>>>> result in a consumer attached to the output of the through and the > > >>>>>> processor would consume + aggregate messages immediately passing > > them > > >>>>>> to the next step in the topology. > > >>>>>> > > >>>>>> However, in 0.10.1 the aggregateBy API is no longer available and > it > > >>>>>> is necessary to pivot the data through a groupByKey and then > > >>>>>> aggregate(). The problem is that this mechanism results in the > > >>>>>> intermediate KTable state store storing the data as usual, but the > > >>>>>> data is not forwarded downstream until the next store flush. (Due > to > > >>>>>> the use of ForwardingCacheFlushListener instead of calling > forward() > > >>>>>> during the process of the record.) > > >>>>>> > > >>>>>> As noted in KIP-63 and as I saw in the code, the flush interval of > > >>>>>> state stores is commit.interval.ms. For us, this has been tuned > to > > a > > >>>>>> few seconds, and since we have a number of these aggregations in > our > > >>>>>> job sequentially, this now results in many seconds of latency in > the > > >>>>>> worst case for a tuple to travel through our topology. > > >>>>>> > > >>>>>> It seems too inflexible to have the flush interval always be the > > same > > >>>>>> as the commit interval across all aggregates. For certain > > aggregations > > >>>>>> which are idempotent regardless of messages being reprocessed, > being > > >>>>>> able to flush more often than the commit interval seems like a > very > > >>>>>> important option when lower latency is required. It would still > make > > >>>>>> sense to flush every commit as well, but having an additional > > >>>>>> configuration to set the maximum time between state store flushes > > >>>>>> seems like it would solve our problem. > > >>>>>> > > >>>>>> In our case, we'd set our flush interval to a few hundred ms. > > Ideally, > > >>>>>> we would really prefer to be able to disable interval based > flushing > > >>>>>> altogether (and just put + forward all processed records) for > > certain > > >>>>>> KTables that are low volume, latency sensitive, and which are > > >>>>>> idempotent under message reprocessing. > > >>>>>> > > >>>>>> Thanks for any help! Right now the only option it seems is for us > to > > >>>>>> radically lower the commit interval and accept any leftover > latency, > > >>>>>> but unless we can find a sweet spot this may be a blocker for us > to > > >>>>>> moving to 0.10.1. > > >>> > > >>> > > > > > > > > > -- > -- Guozhang >