Unsubscribe
Sent via the Samsung Galaxy S7, an AT&T 4G LTE smartphone -------- Original message --------From: Guozhang Wang <wangg...@gmail.com> Date: 12/2/16 5:13 PM (GMT-06:00) To: users@kafka.apache.org Subject: Re: Initializing StateStores takes *really* long for large datasets Before we have the a single-knob memory management feature, I'd like to propose reducing the Streams' default config values for RocksDB caching and memory block size. For example, I remember Henry has done some fine tuning on the RocksDB config for his use case: https://github.com/HenryCaiHaiying/kafka/commit/b297f7c585f5a883ee068277e5f0f1224c347bd4 https://github.com/HenryCaiHaiying/kafka/commit/eed1726d16e528d813755a6e66b49d0bf14e8803 https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576 We could check if some of those changes are appropriate in general and if yes change the default settings accordingly. Henry On Wed, Nov 30, 2016 at 11:04 AM, Ara Ebrahimi <ara.ebrah...@argyledata.com> wrote: > +1 on this. > > Ara. > > > On Nov 30, 2016, at 5:18 AM, Mathieu Fenniak < > mathieu.fenn...@replicon.com> wrote: > > > > I'd like to quickly reinforce Frank's opinion regarding the rocksdb > memory > > usage. I was also surprised by the amount of non-JVM-heap memory being > > used and had to tune the 100 MB default down considerably. It's also > > unfortunate that it's hard to estimate the memory requirements for a KS > app > > because of this. If you have ten stores, and assuming the default > config, > > you'd need a GB of memory for the rocksdb cache if you run 1 app, but > only > > half a GB if you run two app instances because the stores will be > > distributed. > > > > It would be much nicer to be able to give KS a fixed amount of memory in > a > > config that it divided among the active stores on a node. Configure it > > with N GB; if a rebalance adds more tasks and stores, they each get less > > RAM; if a rebalance removes tasks and stores, the remaining stores get > more > > RAM. It seems like it'd be hard to do this with the RocksDBConfigSetter > > interface because it doesn't get any state about the KS topology to make > > decisions; which are arguably not config, but tuning / performance > > decisions. > > > > Mathieu > > > > > > > > On Mon, Nov 28, 2016 at 3:45 PM, Frank Lyaruu <flya...@gmail.com> wrote: > > > >> I'll write an update on where I am now. > >> > >> I've got about 40 'primary' topics, some small, some up to about 10M > >> messages, > >> and about 30 internal topics, divided over 6 stream instances, all > running > >> in a single > >> app, talking to a 3 node Kafka cluster. > >> > >> I use a single thread per stream instance, as my prime concern is now to > >> get it > >> to run stable, rather than optimizing performance. > >> > >> My biggest issue was that after a few hours my application started to > slow > >> down > >> to ultimately freeze up or crash. It turned out that RocksDb consumed > all > >> my > >> memory, which I overlooked as it was off-heap. > >> > >> I was fooling around with RocksDb settings a bit but I had missed the > most > >> important > >> one: > >> > >> BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); > >> tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE); > >> tableConfig.setBlockSize(BLOCK_SIZE); > >> options.setTableFormatConfig(tableConfig); > >> > >> The block cache size defaults to a whopping 100Mb per store, and that > gets > >> expensive > >> fast. I reduced it to a few megabytes. My data size is so big that I > doubt > >> it is very effective > >> anyway. Now it seems more stable. > >> > >> I'd say that a smaller default makes sense, especially because the > failure > >> case is > >> so opaque (running all tests just fine but with a serious dataset it > dies > >> slowly) > >> > >> Another thing I see is that while starting all my instances, some are > quick > >> and some take > >> time (makes sense as the data size varies greatly), but as more > instances > >> start up, they > >> start to use more and more CPU I/O and network, that the initialization > of > >> the bigger ones > >> takes even longer, increasing the chance that one of them takes longer > than > >> the > >> MAX_POLL_INTERVAL_MS_CONFIG, and then all hell breaks loose. Maybe we > can > >> separate the 'initialize' and 'start' step somehow. > >> > >> In this case we could log better: If initialization is taking longer > than > >> the timeout, it ends up > >> being reassigned (in my case to the same instance) and then it errors > out > >> on being unable > >> to lock the state dir. That message isn't too informative as the > timeout is > >> the actual problem. > >> > >> regards, Frank > >> > >> > >> On Mon, Nov 28, 2016 at 8:01 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > >> > >>> Hello Frank, > >>> > >>> How many instances do you have in your apps and how many threads did > you > >>> use per thread? Note that besides the topology complexity (i.e. number > of > >>> state stores, number of internal topics etc) the (re-)initialization > >>> process is depending on the underlying consumer's membership protocol, > >> and > >>> hence its rebalance latency could be longer with larger groups. > >>> > >>> We have been optimizing our rebalance latency due to state store > >> migration > >>> and restoration in the latest release, but for now the > re-initialization > >>> latency is still largely depends on 1) topology complexity regarding to > >>> state stores and 2) number of input topic partitions and instance / > >> threads > >>> in the application. > >>> > >>> > >>> Guozhang > >>> > >>> > >>> On Sat, Nov 26, 2016 at 12:57 AM, Damian Guy <damian....@gmail.com> > >> wrote: > >>> > >>>> Hi Frank, > >>>> > >>>> If you are running on a single node then the RocksDB state should be > >>>> re-used by your app. However, it relies on the app being cleanly > >> shutdown > >>>> and the existence of ".checkpoint" files in the state directory for > the > >>>> store, .i.e, /tmp/kafka-streams/application-id/0_0/.checkpoint. If > the > >>>> file > >>>> doesn't exist then the entire state will be restored from the > >> changelog - > >>>> which could take some time. I suspect this is what is happening? > >>>> > >>>> As for the RocksDB memory settings, yes the off heap memory usage does > >>>> sneak under the radar. There is a memory management story for Kafka > >>> Streams > >>>> that is yet to be started. This would involve limiting the off-heap > >>> memory > >>>> that RocksDB uses. > >>>> > >>>> Thanks, > >>>> Damian > >>>> > >>>> On Fri, 25 Nov 2016 at 21:14 Frank Lyaruu <flya...@gmail.com> wrote: > >>>> > >>>>> I'm running all on a single node, so there is no 'data mobility' > >>>> involved. > >>>>> So if Streams does not use any existing data, I might as well wipe > >> the > >>>>> whole RocksDb before starting, right? > >>>>> > >>>>> As for the RocksDb tuning, I am using a RocksDBConfigSetter, to > >> reduce > >>>> the > >>>>> memory usage a bit: > >>>>> > >>>>> options.setWriteBufferSize(3000000); > >>>>> options.setMaxBytesForLevelBase(30000000); > >>>>> options.setMaxBytesForLevelMultiplier(3); > >>>>> > >>>>> I needed to do this as my 16Gb machine would die otherwise but I > >>> honestly > >>>>> was just reducing values more or less randomly until it wouldn't fall > >>>> over. > >>>>> I have to say this is a big drawback of Rocks, I monitor Java memory > >>>> usage > >>>>> but this just sneaks under the radar as it is off heap, and it isn't > >>> very > >>>>> clear what the implications are of different settings, as I can't > >> says > >>>>> something like the Xmx heap setting, meaning: Take whatever you need > >> up > >>>> to > >>>>> this maximum. Also, if I get this right, in the long run, as the data > >>> set > >>>>> changes and grows, I can never be sure it won't take too much memory. > >>>>> > >>>>> I get the impression I'll be better off with an external store, > >>>> something I > >>>>> can monitor, tune and restart separately. > >>>>> > >>>>> But I'm getting ahead of myself. I'll wipe the data before I start, > >> see > >>>> if > >>>>> that gets me any stability > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> On Fri, Nov 25, 2016 at 4:54 PM, Damian Guy <damian....@gmail.com> > >>>> wrote: > >>>>> > >>>>>> Hi Frank, > >>>>>> > >>>>>> If you have run the app before with the same applicationId, > >>> completely > >>>>> shut > >>>>>> it down, and then restarted it again, it will need to restore all > >> of > >>>> the > >>>>>> state which will take some time depending on the amount of data you > >>>> have. > >>>>>> In this case the placement of the partitions doesn't take into > >>> account > >>>>> any > >>>>>> existing state stores, so it might need to load quite a lot of data > >>> if > >>>>>> nodes assigned certain partitions don't have that state-store (this > >>> is > >>>>>> something we should look at improving). > >>>>>> > >>>>>> As for RocksDB tuning - you can provide an implementation of > >>>>>> RocksDBConfigSetter via config: StreamsConfig.ROCKSDB_CONFIG_ > >>>> SETTER_CLASS > >>>>>> it has a single method: > >>>>>> > >>>>>> public void setConfig(final String storeName, final Options > >> options, > >>>>>> final Map<String, Object> configs) > >>>>>> > >>>>>> in this method you can set various options on the provided Options > >>>>> object. > >>>>>> The options that might help in this case are: > >>>>>> options.setWriteBufferSize(..) - default in streams is 32MB > >>>>>> options.setMaxWriteBufferNumer(..) - default in streams is 3 > >>>>>> > >>>>>> However, i'm no expert on RocksDB and i suggest you have look at > >>>>>> https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide for > >>> more > >>>>>> info. > >>>>>> > >>>>>> Thanks, > >>>>>> Damian > >>>>>> > >>>>>> On Fri, 25 Nov 2016 at 13:02 Frank Lyaruu <flya...@gmail.com> > >> wrote: > >>>>>> > >>>>>>> @Damian: > >>>>>>> > >>>>>>> Yes, it ran before, and it has that 200gb blob worth of Rocksdb > >>> stuff > >>>>>>> > >>>>>>> @Svente: It's on a pretty high end san in a managed private > >> cloud, > >>>> I'm > >>>>>>> unsure what the ultimate storage is, but I doubt there is a > >>>> performance > >>>>>>> problem there. > >>>>>>> > >>>>>>> On Fri, 25 Nov 2016 at 13:37, Svante Karlsson < > >>>> svante.karls...@csi.se> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> What kind of disk are you using for the rocksdb store? ie > >>> spinning > >>>> or > >>>>>>> ssd? > >>>>>>>> > >>>>>>>> 2016-11-25 12:51 GMT+01:00 Damian Guy <damian....@gmail.com>: > >>>>>>>> > >>>>>>>>> Hi Frank, > >>>>>>>>> > >>>>>>>>> Is this on a restart of the application? > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Damian > >>>>>>>>> > >>>>>>>>> On Fri, 25 Nov 2016 at 11:09 Frank Lyaruu <flya...@gmail.com > >>> > >>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi y'all, > >>>>>>>>>> > >>>>>>>>>> I have a reasonably simple KafkaStream application, which > >>>> merges > >>>>>>> about > >>>>>>>> 20 > >>>>>>>>>> topics a few times. > >>>>>>>>>> The thing is, some of those topic datasets are pretty big, > >>>> about > >>>>>> 10M > >>>>>>>>>> messages. In total I've got > >>>>>>>>>> about 200Gb worth of state in RocksDB, the largest topic is > >>> 38 > >>>>> Gb. > >>>>>>>>>> > >>>>>>>>>> I had set the MAX_POLL_INTERVAL_MS_CONFIG to one hour to > >>> cover > >>>>> the > >>>>>>>>>> initialization time, > >>>>>>>>>> but that does not seem nearly enough, I'm looking at more > >>> than > >>>>> two > >>>>>>> hour > >>>>>>>>>> startup times, and > >>>>>>>>>> that starts to be a bit ridiculous. > >>>>>>>>>> > >>>>>>>>>> Any tips / experiences on how to deal with this case? Move > >>> away > >>>>>> from > >>>>>>>>> Rocks > >>>>>>>>>> and use an external > >>>>>>>>>> data store? Any tuning tips on how to tune Rocks to be a > >> bit > >>>> more > >>>>>>>> useful > >>>>>>>>>> here? > >>>>>>>>>> > >>>>>>>>>> regards, Frank > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >>> > >>> > >>> -- > >>> -- Guozhang > >>> > >> > > > > > > > > ________________________________ > > > > This message is for the designated recipient only and may contain > privileged, proprietary, or otherwise confidential information. If you have > received it in error, please notify the sender immediately and delete the > original. Any other use of the e-mail by you is prohibited. Thank you in > advance for your cooperation. > > > > ________________________________ > > > > > ________________________________ > > This message is for the designated recipient only and may contain > privileged, proprietary, or otherwise confidential information. If you have > received it in error, please notify the sender immediately and delete the > original. Any other use of the e-mail by you is prohibited. Thank you in > advance for your cooperation. > > ________________________________ > -- -- Guozhang