Re: Initializing StateStores takes *really* long for large datasets

2016-12-03 Thread williamtellme123
Unsubscribe


Sent via the Samsung Galaxy S7, an AT 4G LTE smartphone
 Original message From: Guozhang Wang <wangg...@gmail.com> 
Date: 12/2/16  5:13 PM  (GMT-06:00) To: users@kafka.apache.org Subject: Re: 
Initializing StateStores takes *really* long for large datasets 
Before we have the a single-knob memory management feature, I'd like to
propose reducing the Streams' default config values for RocksDB caching and
memory block size. For example, I remember Henry has done some fine tuning
on the RocksDB config for his use case:

https://github.com/HenryCaiHaiying/kafka/commit/b297f7c585f5a883ee068277e5f0f1224c347bd4
https://github.com/HenryCaiHaiying/kafka/commit/eed1726d16e528d813755a6e66b49d0bf14e8803
https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576



We could check if some of those changes are appropriate in general and if
yes change the default settings accordingly.

Henry

On Wed, Nov 30, 2016 at 11:04 AM, Ara Ebrahimi <ara.ebrah...@argyledata.com>
wrote:

> +1 on this.
>
> Ara.
>
> > On Nov 30, 2016, at 5:18 AM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
> >
> > I'd like to quickly reinforce Frank's opinion regarding the rocksdb
> memory
> > usage.  I was also surprised by the amount of non-JVM-heap memory being
> > used and had to tune the 100 MB default down considerably.  It's also
> > unfortunate that it's hard to estimate the memory requirements for a KS
> app
> > because of this.  If you have ten stores, and assuming the default
> config,
> > you'd need a GB of memory for the rocksdb cache if you run 1 app, but
> only
> > half a GB if you run two app instances because the stores will be
> > distributed.
> >
> > It would be much nicer to be able to give KS a fixed amount of memory in
> a
> > config that it divided among the active stores on a node.  Configure it
> > with N GB; if a rebalance adds more tasks and stores, they each get less
> > RAM; if a rebalance removes tasks and stores, the remaining stores get
> more
> > RAM.  It seems like it'd be hard to do this with the RocksDBConfigSetter
> > interface because it doesn't get any state about the KS topology to make
> > decisions; which are arguably not config, but tuning / performance
> > decisions.
> >
> > Mathieu
> >
> >
> >
> > On Mon, Nov 28, 2016 at 3:45 PM, Frank Lyaruu <flya...@gmail.com> wrote:
> >
> >> I'll write an update on where I am now.
> >>
> >> I've got about 40 'primary' topics, some small, some up to about 10M
> >> messages,
> >> and about 30 internal topics, divided over 6 stream instances, all
> running
> >> in a single
> >> app, talking to a 3 node Kafka cluster.
> >>
> >> I use a single thread per stream instance, as my prime concern is now to
> >> get it
> >> to run stable, rather than optimizing performance.
> >>
> >> My biggest issue was that after a few hours my application started to
> slow
> >> down
> >> to ultimately freeze up or crash. It turned out that RocksDb consumed
> all
> >> my
> >> memory, which I overlooked as it was off-heap.
> >>
> >> I was fooling around with RocksDb settings a bit but I had missed the
> most
> >> important
> >> one:
> >>
> >>    BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
> >>    tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
> >>    tableConfig.setBlockSize(BLOCK_SIZE);
> >>    options.setTableFormatConfig(tableConfig);
> >>
> >> The block cache size defaults to a whopping 100Mb per store, and that
> gets
> >> expensive
> >> fast. I reduced it to a few megabytes. My data size is so big that I
> doubt
> >> it is very effective
> >> anyway. Now it seems more stable.
> >>
> >> I'd say that a smaller default makes sense, especially because the
> failure
> >> case is
> >> so opaque (running all tests just fine but with a serious dataset it
> dies
> >> slowly)
> >>
> >> Another thing I see is that while starting all my instances, some are
> quick
> >> and some take
> >> time (makes sense as the data size varies greatly), but as more
> instances
> >> start up, they
> >> start to use more and more CPU I/O and network, that the initialization
> of
> >> the bigger ones
> >> takes even longer, increasing the chance that one of them takes longer
> than
> >> the
> >> MAX_POLL_INTERVAL_MS_CONFIG, and then all hell breaks

Re: Initializing StateStores takes *really* long for large datasets

2016-12-02 Thread Guozhang Wang
Before we have the a single-knob memory management feature, I'd like to
propose reducing the Streams' default config values for RocksDB caching and
memory block size. For example, I remember Henry has done some fine tuning
on the RocksDB config for his use case:

https://github.com/HenryCaiHaiying/kafka/commit/b297f7c585f5a883ee068277e5f0f1224c347bd4
https://github.com/HenryCaiHaiying/kafka/commit/eed1726d16e528d813755a6e66b49d0bf14e8803
https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576



We could check if some of those changes are appropriate in general and if
yes change the default settings accordingly.

Henry

On Wed, Nov 30, 2016 at 11:04 AM, Ara Ebrahimi 
wrote:

> +1 on this.
>
> Ara.
>
> > On Nov 30, 2016, at 5:18 AM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
> >
> > I'd like to quickly reinforce Frank's opinion regarding the rocksdb
> memory
> > usage.  I was also surprised by the amount of non-JVM-heap memory being
> > used and had to tune the 100 MB default down considerably.  It's also
> > unfortunate that it's hard to estimate the memory requirements for a KS
> app
> > because of this.  If you have ten stores, and assuming the default
> config,
> > you'd need a GB of memory for the rocksdb cache if you run 1 app, but
> only
> > half a GB if you run two app instances because the stores will be
> > distributed.
> >
> > It would be much nicer to be able to give KS a fixed amount of memory in
> a
> > config that it divided among the active stores on a node.  Configure it
> > with N GB; if a rebalance adds more tasks and stores, they each get less
> > RAM; if a rebalance removes tasks and stores, the remaining stores get
> more
> > RAM.  It seems like it'd be hard to do this with the RocksDBConfigSetter
> > interface because it doesn't get any state about the KS topology to make
> > decisions; which are arguably not config, but tuning / performance
> > decisions.
> >
> > Mathieu
> >
> >
> >
> > On Mon, Nov 28, 2016 at 3:45 PM, Frank Lyaruu  wrote:
> >
> >> I'll write an update on where I am now.
> >>
> >> I've got about 40 'primary' topics, some small, some up to about 10M
> >> messages,
> >> and about 30 internal topics, divided over 6 stream instances, all
> running
> >> in a single
> >> app, talking to a 3 node Kafka cluster.
> >>
> >> I use a single thread per stream instance, as my prime concern is now to
> >> get it
> >> to run stable, rather than optimizing performance.
> >>
> >> My biggest issue was that after a few hours my application started to
> slow
> >> down
> >> to ultimately freeze up or crash. It turned out that RocksDb consumed
> all
> >> my
> >> memory, which I overlooked as it was off-heap.
> >>
> >> I was fooling around with RocksDb settings a bit but I had missed the
> most
> >> important
> >> one:
> >>
> >>BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
> >>tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
> >>tableConfig.setBlockSize(BLOCK_SIZE);
> >>options.setTableFormatConfig(tableConfig);
> >>
> >> The block cache size defaults to a whopping 100Mb per store, and that
> gets
> >> expensive
> >> fast. I reduced it to a few megabytes. My data size is so big that I
> doubt
> >> it is very effective
> >> anyway. Now it seems more stable.
> >>
> >> I'd say that a smaller default makes sense, especially because the
> failure
> >> case is
> >> so opaque (running all tests just fine but with a serious dataset it
> dies
> >> slowly)
> >>
> >> Another thing I see is that while starting all my instances, some are
> quick
> >> and some take
> >> time (makes sense as the data size varies greatly), but as more
> instances
> >> start up, they
> >> start to use more and more CPU I/O and network, that the initialization
> of
> >> the bigger ones
> >> takes even longer, increasing the chance that one of them takes longer
> than
> >> the
> >> MAX_POLL_INTERVAL_MS_CONFIG, and then all hell breaks loose. Maybe we
> can
> >> separate the 'initialize' and 'start' step somehow.
> >>
> >> In this case we could log better: If initialization is taking longer
> than
> >> the timeout, it ends up
> >> being reassigned (in my case to the same instance) and then it errors
> out
> >> on being unable
> >> to lock the state dir. That message isn't too informative as the
> timeout is
> >> the actual problem.
> >>
> >> regards, Frank
> >>
> >>
> >> On Mon, Nov 28, 2016 at 8:01 PM, Guozhang Wang 
> wrote:
> >>
> >>> Hello Frank,
> >>>
> >>> How many instances do you have in your apps and how many threads did
> you
> >>> use per thread? Note that besides the topology complexity (i.e. number
> of
> >>> state stores, number of internal topics etc) the (re-)initialization
> >>> process is depending on the underlying consumer's membership protocol,
> >> and
> >>> hence its rebalance latency could be longer with larger groups.
> >>>
> >>> We have 

Re: Initializing StateStores takes *really* long for large datasets

2016-11-30 Thread Ara Ebrahimi
+1 on this.

Ara.

> On Nov 30, 2016, at 5:18 AM, Mathieu Fenniak  
> wrote:
>
> I'd like to quickly reinforce Frank's opinion regarding the rocksdb memory
> usage.  I was also surprised by the amount of non-JVM-heap memory being
> used and had to tune the 100 MB default down considerably.  It's also
> unfortunate that it's hard to estimate the memory requirements for a KS app
> because of this.  If you have ten stores, and assuming the default config,
> you'd need a GB of memory for the rocksdb cache if you run 1 app, but only
> half a GB if you run two app instances because the stores will be
> distributed.
>
> It would be much nicer to be able to give KS a fixed amount of memory in a
> config that it divided among the active stores on a node.  Configure it
> with N GB; if a rebalance adds more tasks and stores, they each get less
> RAM; if a rebalance removes tasks and stores, the remaining stores get more
> RAM.  It seems like it'd be hard to do this with the RocksDBConfigSetter
> interface because it doesn't get any state about the KS topology to make
> decisions; which are arguably not config, but tuning / performance
> decisions.
>
> Mathieu
>
>
>
> On Mon, Nov 28, 2016 at 3:45 PM, Frank Lyaruu  wrote:
>
>> I'll write an update on where I am now.
>>
>> I've got about 40 'primary' topics, some small, some up to about 10M
>> messages,
>> and about 30 internal topics, divided over 6 stream instances, all running
>> in a single
>> app, talking to a 3 node Kafka cluster.
>>
>> I use a single thread per stream instance, as my prime concern is now to
>> get it
>> to run stable, rather than optimizing performance.
>>
>> My biggest issue was that after a few hours my application started to slow
>> down
>> to ultimately freeze up or crash. It turned out that RocksDb consumed all
>> my
>> memory, which I overlooked as it was off-heap.
>>
>> I was fooling around with RocksDb settings a bit but I had missed the most
>> important
>> one:
>>
>>BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
>>tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
>>tableConfig.setBlockSize(BLOCK_SIZE);
>>options.setTableFormatConfig(tableConfig);
>>
>> The block cache size defaults to a whopping 100Mb per store, and that gets
>> expensive
>> fast. I reduced it to a few megabytes. My data size is so big that I doubt
>> it is very effective
>> anyway. Now it seems more stable.
>>
>> I'd say that a smaller default makes sense, especially because the failure
>> case is
>> so opaque (running all tests just fine but with a serious dataset it dies
>> slowly)
>>
>> Another thing I see is that while starting all my instances, some are quick
>> and some take
>> time (makes sense as the data size varies greatly), but as more instances
>> start up, they
>> start to use more and more CPU I/O and network, that the initialization of
>> the bigger ones
>> takes even longer, increasing the chance that one of them takes longer than
>> the
>> MAX_POLL_INTERVAL_MS_CONFIG, and then all hell breaks loose. Maybe we can
>> separate the 'initialize' and 'start' step somehow.
>>
>> In this case we could log better: If initialization is taking longer than
>> the timeout, it ends up
>> being reassigned (in my case to the same instance) and then it errors out
>> on being unable
>> to lock the state dir. That message isn't too informative as the timeout is
>> the actual problem.
>>
>> regards, Frank
>>
>>
>> On Mon, Nov 28, 2016 at 8:01 PM, Guozhang Wang  wrote:
>>
>>> Hello Frank,
>>>
>>> How many instances do you have in your apps and how many threads did you
>>> use per thread? Note that besides the topology complexity (i.e. number of
>>> state stores, number of internal topics etc) the (re-)initialization
>>> process is depending on the underlying consumer's membership protocol,
>> and
>>> hence its rebalance latency could be longer with larger groups.
>>>
>>> We have been optimizing our rebalance latency due to state store
>> migration
>>> and restoration in the latest release, but for now the re-initialization
>>> latency is still largely depends on 1) topology complexity regarding to
>>> state stores and 2) number of input topic partitions and instance /
>> threads
>>> in the application.
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Sat, Nov 26, 2016 at 12:57 AM, Damian Guy 
>> wrote:
>>>
 Hi Frank,

 If you are running on a single node then the RocksDB state should be
 re-used by your app. However, it relies on the app being cleanly
>> shutdown
 and the existence of ".checkpoint" files in the state directory for the
 store, .i.e, /tmp/kafka-streams/application-id/0_0/.checkpoint. If the
 file
 doesn't exist then the entire state will be restored from the
>> changelog -
 which could take some time. I suspect this is what is happening?

 As for the RocksDB memory settings, yes the off 

Re: Initializing StateStores takes *really* long for large datasets

2016-11-30 Thread Eno Thereska
Mathieu,

You are absolutely right. We've written about the memory management strategy 
below: 
https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Memory+Management+in+Kafka+Streams
 

 

Currently we have done what you suggest, but for the internal streams caches 
only (see KIP-63), but not for the RocksDb caches. If people in the community 
are interested to contribute to the later, that'd be great, but it's also on 
our radar (and it sounds like we might have to accelerate its implementation 
from your experience).

Thanks,
Eno

> On 30 Nov 2016, at 13:18, Mathieu Fenniak  
> wrote:
> 
> I'd like to quickly reinforce Frank's opinion regarding the rocksdb memory
> usage.  I was also surprised by the amount of non-JVM-heap memory being
> used and had to tune the 100 MB default down considerably.  It's also
> unfortunate that it's hard to estimate the memory requirements for a KS app
> because of this.  If you have ten stores, and assuming the default config,
> you'd need a GB of memory for the rocksdb cache if you run 1 app, but only
> half a GB if you run two app instances because the stores will be
> distributed.
> 
> It would be much nicer to be able to give KS a fixed amount of memory in a
> config that it divided among the active stores on a node.  Configure it
> with N GB; if a rebalance adds more tasks and stores, they each get less
> RAM; if a rebalance removes tasks and stores, the remaining stores get more
> RAM.  It seems like it'd be hard to do this with the RocksDBConfigSetter
> interface because it doesn't get any state about the KS topology to make
> decisions; which are arguably not config, but tuning / performance
> decisions.
> 
> Mathieu
> 
> 
> 
> On Mon, Nov 28, 2016 at 3:45 PM, Frank Lyaruu  wrote:
> 
>> I'll write an update on where I am now.
>> 
>> I've got about 40 'primary' topics, some small, some up to about 10M
>> messages,
>> and about 30 internal topics, divided over 6 stream instances, all running
>> in a single
>> app, talking to a 3 node Kafka cluster.
>> 
>> I use a single thread per stream instance, as my prime concern is now to
>> get it
>> to run stable, rather than optimizing performance.
>> 
>> My biggest issue was that after a few hours my application started to slow
>> down
>> to ultimately freeze up or crash. It turned out that RocksDb consumed all
>> my
>> memory, which I overlooked as it was off-heap.
>> 
>> I was fooling around with RocksDb settings a bit but I had missed the most
>> important
>> one:
>> 
>>BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
>>tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
>>tableConfig.setBlockSize(BLOCK_SIZE);
>>options.setTableFormatConfig(tableConfig);
>> 
>> The block cache size defaults to a whopping 100Mb per store, and that gets
>> expensive
>> fast. I reduced it to a few megabytes. My data size is so big that I doubt
>> it is very effective
>> anyway. Now it seems more stable.
>> 
>> I'd say that a smaller default makes sense, especially because the failure
>> case is
>> so opaque (running all tests just fine but with a serious dataset it dies
>> slowly)
>> 
>> Another thing I see is that while starting all my instances, some are quick
>> and some take
>> time (makes sense as the data size varies greatly), but as more instances
>> start up, they
>> start to use more and more CPU I/O and network, that the initialization of
>> the bigger ones
>> takes even longer, increasing the chance that one of them takes longer than
>> the
>> MAX_POLL_INTERVAL_MS_CONFIG, and then all hell breaks loose. Maybe we can
>> separate the 'initialize' and 'start' step somehow.
>> 
>> In this case we could log better: If initialization is taking longer than
>> the timeout, it ends up
>> being reassigned (in my case to the same instance) and then it errors out
>> on being unable
>> to lock the state dir. That message isn't too informative as the timeout is
>> the actual problem.
>> 
>> regards, Frank
>> 
>> 
>> On Mon, Nov 28, 2016 at 8:01 PM, Guozhang Wang  wrote:
>> 
>>> Hello Frank,
>>> 
>>> How many instances do you have in your apps and how many threads did you
>>> use per thread? Note that besides the topology complexity (i.e. number of
>>> state stores, number of internal topics etc) the (re-)initialization
>>> process is depending on the underlying consumer's membership protocol,
>> and
>>> hence its rebalance latency could be longer with larger groups.
>>> 
>>> We have been optimizing our rebalance latency due to state store
>> migration
>>> and restoration in the latest release, but for now the re-initialization
>>> latency is still largely depends on 1) topology complexity regarding to
>>> state stores and 2) number of input topic partitions and instance /
>> threads
>>> in the application.
>>> 
>>> 
>>> 

Re: Initializing StateStores takes *really* long for large datasets

2016-11-30 Thread Mathieu Fenniak
I'd like to quickly reinforce Frank's opinion regarding the rocksdb memory
usage.  I was also surprised by the amount of non-JVM-heap memory being
used and had to tune the 100 MB default down considerably.  It's also
unfortunate that it's hard to estimate the memory requirements for a KS app
because of this.  If you have ten stores, and assuming the default config,
you'd need a GB of memory for the rocksdb cache if you run 1 app, but only
half a GB if you run two app instances because the stores will be
distributed.

It would be much nicer to be able to give KS a fixed amount of memory in a
config that it divided among the active stores on a node.  Configure it
with N GB; if a rebalance adds more tasks and stores, they each get less
RAM; if a rebalance removes tasks and stores, the remaining stores get more
RAM.  It seems like it'd be hard to do this with the RocksDBConfigSetter
interface because it doesn't get any state about the KS topology to make
decisions; which are arguably not config, but tuning / performance
decisions.

Mathieu



On Mon, Nov 28, 2016 at 3:45 PM, Frank Lyaruu  wrote:

> I'll write an update on where I am now.
>
> I've got about 40 'primary' topics, some small, some up to about 10M
> messages,
> and about 30 internal topics, divided over 6 stream instances, all running
> in a single
> app, talking to a 3 node Kafka cluster.
>
> I use a single thread per stream instance, as my prime concern is now to
> get it
> to run stable, rather than optimizing performance.
>
> My biggest issue was that after a few hours my application started to slow
> down
> to ultimately freeze up or crash. It turned out that RocksDb consumed all
> my
> memory, which I overlooked as it was off-heap.
>
> I was fooling around with RocksDb settings a bit but I had missed the most
> important
> one:
>
> BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
> tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
> tableConfig.setBlockSize(BLOCK_SIZE);
> options.setTableFormatConfig(tableConfig);
>
> The block cache size defaults to a whopping 100Mb per store, and that gets
> expensive
> fast. I reduced it to a few megabytes. My data size is so big that I doubt
> it is very effective
> anyway. Now it seems more stable.
>
> I'd say that a smaller default makes sense, especially because the failure
> case is
> so opaque (running all tests just fine but with a serious dataset it dies
> slowly)
>
> Another thing I see is that while starting all my instances, some are quick
> and some take
> time (makes sense as the data size varies greatly), but as more instances
> start up, they
> start to use more and more CPU I/O and network, that the initialization of
> the bigger ones
> takes even longer, increasing the chance that one of them takes longer than
> the
> MAX_POLL_INTERVAL_MS_CONFIG, and then all hell breaks loose. Maybe we can
> separate the 'initialize' and 'start' step somehow.
>
> In this case we could log better: If initialization is taking longer than
> the timeout, it ends up
> being reassigned (in my case to the same instance) and then it errors out
> on being unable
> to lock the state dir. That message isn't too informative as the timeout is
> the actual problem.
>
> regards, Frank
>
>
> On Mon, Nov 28, 2016 at 8:01 PM, Guozhang Wang  wrote:
>
> > Hello Frank,
> >
> > How many instances do you have in your apps and how many threads did you
> > use per thread? Note that besides the topology complexity (i.e. number of
> > state stores, number of internal topics etc) the (re-)initialization
> > process is depending on the underlying consumer's membership protocol,
> and
> > hence its rebalance latency could be longer with larger groups.
> >
> > We have been optimizing our rebalance latency due to state store
> migration
> > and restoration in the latest release, but for now the re-initialization
> > latency is still largely depends on 1) topology complexity regarding to
> > state stores and 2) number of input topic partitions and instance /
> threads
> > in the application.
> >
> >
> > Guozhang
> >
> >
> > On Sat, Nov 26, 2016 at 12:57 AM, Damian Guy 
> wrote:
> >
> > > Hi Frank,
> > >
> > > If you are running on a single node then the RocksDB state should be
> > > re-used by your app. However, it relies on the app being cleanly
> shutdown
> > > and the existence of ".checkpoint" files in the state directory for the
> > > store, .i.e, /tmp/kafka-streams/application-id/0_0/.checkpoint. If the
> > > file
> > > doesn't exist then the entire state will be restored from the
> changelog -
> > > which could take some time. I suspect this is what is happening?
> > >
> > > As for the RocksDB memory settings, yes the off heap memory usage does
> > > sneak under the radar. There is a memory management story for Kafka
> > Streams
> > > that is yet to be started. This would involve limiting the off-heap
> > memory
> > > that 

Re: Initializing StateStores takes *really* long for large datasets

2016-11-28 Thread Frank Lyaruu
I'll write an update on where I am now.

I've got about 40 'primary' topics, some small, some up to about 10M
messages,
and about 30 internal topics, divided over 6 stream instances, all running
in a single
app, talking to a 3 node Kafka cluster.

I use a single thread per stream instance, as my prime concern is now to
get it
to run stable, rather than optimizing performance.

My biggest issue was that after a few hours my application started to slow
down
to ultimately freeze up or crash. It turned out that RocksDb consumed all
my
memory, which I overlooked as it was off-heap.

I was fooling around with RocksDb settings a bit but I had missed the most
important
one:

BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
tableConfig.setBlockSize(BLOCK_SIZE);
options.setTableFormatConfig(tableConfig);

The block cache size defaults to a whopping 100Mb per store, and that gets
expensive
fast. I reduced it to a few megabytes. My data size is so big that I doubt
it is very effective
anyway. Now it seems more stable.

I'd say that a smaller default makes sense, especially because the failure
case is
so opaque (running all tests just fine but with a serious dataset it dies
slowly)

Another thing I see is that while starting all my instances, some are quick
and some take
time (makes sense as the data size varies greatly), but as more instances
start up, they
start to use more and more CPU I/O and network, that the initialization of
the bigger ones
takes even longer, increasing the chance that one of them takes longer than
the
MAX_POLL_INTERVAL_MS_CONFIG, and then all hell breaks loose. Maybe we can
separate the 'initialize' and 'start' step somehow.

In this case we could log better: If initialization is taking longer than
the timeout, it ends up
being reassigned (in my case to the same instance) and then it errors out
on being unable
to lock the state dir. That message isn't too informative as the timeout is
the actual problem.

regards, Frank


On Mon, Nov 28, 2016 at 8:01 PM, Guozhang Wang  wrote:

> Hello Frank,
>
> How many instances do you have in your apps and how many threads did you
> use per thread? Note that besides the topology complexity (i.e. number of
> state stores, number of internal topics etc) the (re-)initialization
> process is depending on the underlying consumer's membership protocol, and
> hence its rebalance latency could be longer with larger groups.
>
> We have been optimizing our rebalance latency due to state store migration
> and restoration in the latest release, but for now the re-initialization
> latency is still largely depends on 1) topology complexity regarding to
> state stores and 2) number of input topic partitions and instance / threads
> in the application.
>
>
> Guozhang
>
>
> On Sat, Nov 26, 2016 at 12:57 AM, Damian Guy  wrote:
>
> > Hi Frank,
> >
> > If you are running on a single node then the RocksDB state should be
> > re-used by your app. However, it relies on the app being cleanly shutdown
> > and the existence of ".checkpoint" files in the state directory for the
> > store, .i.e, /tmp/kafka-streams/application-id/0_0/.checkpoint. If the
> > file
> > doesn't exist then the entire state will be restored from the changelog -
> > which could take some time. I suspect this is what is happening?
> >
> > As for the RocksDB memory settings, yes the off heap memory usage does
> > sneak under the radar. There is a memory management story for Kafka
> Streams
> > that is yet to be started. This would involve limiting the off-heap
> memory
> > that RocksDB uses.
> >
> > Thanks,
> > Damian
> >
> > On Fri, 25 Nov 2016 at 21:14 Frank Lyaruu  wrote:
> >
> > > I'm running all on a single node, so there is no 'data mobility'
> > involved.
> > > So if Streams does not use any existing data, I might as well wipe the
> > > whole RocksDb before starting, right?
> > >
> > > As for the RocksDb tuning, I am using a RocksDBConfigSetter, to reduce
> > the
> > > memory usage a bit:
> > >
> > > options.setWriteBufferSize(300);
> > > options.setMaxBytesForLevelBase(3000);
> > > options.setMaxBytesForLevelMultiplier(3);
> > >
> > > I needed to do this as my 16Gb machine would die otherwise but I
> honestly
> > > was just reducing values more or less randomly until it wouldn't fall
> > over.
> > > I have to say this is a big drawback of Rocks, I monitor Java memory
> > usage
> > > but this just sneaks under the radar as it is off heap, and it isn't
> very
> > > clear what the implications are of different settings, as I can't says
> > > something like the Xmx heap setting, meaning: Take whatever you need up
> > to
> > > this maximum. Also, if I get this right, in the long run, as the data
> set
> > > changes and grows, I can never be sure it won't take too much memory.
> > >
> > > I get the impression I'll be better off with an external 

Re: Initializing StateStores takes *really* long for large datasets

2016-11-28 Thread Guozhang Wang
Hello Frank,

How many instances do you have in your apps and how many threads did you
use per thread? Note that besides the topology complexity (i.e. number of
state stores, number of internal topics etc) the (re-)initialization
process is depending on the underlying consumer's membership protocol, and
hence its rebalance latency could be longer with larger groups.

We have been optimizing our rebalance latency due to state store migration
and restoration in the latest release, but for now the re-initialization
latency is still largely depends on 1) topology complexity regarding to
state stores and 2) number of input topic partitions and instance / threads
in the application.


Guozhang


On Sat, Nov 26, 2016 at 12:57 AM, Damian Guy  wrote:

> Hi Frank,
>
> If you are running on a single node then the RocksDB state should be
> re-used by your app. However, it relies on the app being cleanly shutdown
> and the existence of ".checkpoint" files in the state directory for the
> store, .i.e, /tmp/kafka-streams/application-id/0_0/.checkpoint. If the
> file
> doesn't exist then the entire state will be restored from the changelog -
> which could take some time. I suspect this is what is happening?
>
> As for the RocksDB memory settings, yes the off heap memory usage does
> sneak under the radar. There is a memory management story for Kafka Streams
> that is yet to be started. This would involve limiting the off-heap memory
> that RocksDB uses.
>
> Thanks,
> Damian
>
> On Fri, 25 Nov 2016 at 21:14 Frank Lyaruu  wrote:
>
> > I'm running all on a single node, so there is no 'data mobility'
> involved.
> > So if Streams does not use any existing data, I might as well wipe the
> > whole RocksDb before starting, right?
> >
> > As for the RocksDb tuning, I am using a RocksDBConfigSetter, to reduce
> the
> > memory usage a bit:
> >
> > options.setWriteBufferSize(300);
> > options.setMaxBytesForLevelBase(3000);
> > options.setMaxBytesForLevelMultiplier(3);
> >
> > I needed to do this as my 16Gb machine would die otherwise but I honestly
> > was just reducing values more or less randomly until it wouldn't fall
> over.
> > I have to say this is a big drawback of Rocks, I monitor Java memory
> usage
> > but this just sneaks under the radar as it is off heap, and it isn't very
> > clear what the implications are of different settings, as I can't says
> > something like the Xmx heap setting, meaning: Take whatever you need up
> to
> > this maximum. Also, if I get this right, in the long run, as the data set
> > changes and grows, I can never be sure it won't take too much memory.
> >
> > I get the impression I'll be better off with an external store,
> something I
> > can monitor, tune and restart separately.
> >
> > But I'm getting ahead of myself. I'll wipe the data before I start, see
> if
> > that gets me any stability
> >
> >
> >
> >
> > On Fri, Nov 25, 2016 at 4:54 PM, Damian Guy 
> wrote:
> >
> > > Hi Frank,
> > >
> > > If you have run the app before with the same applicationId, completely
> > shut
> > > it down, and then restarted it again, it will need to restore all of
> the
> > > state which will take some time depending on the amount of data you
> have.
> > > In this case the placement of the partitions doesn't take into account
> > any
> > > existing state stores, so it might need to load quite a lot of data if
> > > nodes assigned certain partitions don't have that state-store (this is
> > > something we should look at improving).
> > >
> > > As for RocksDB tuning - you can provide an implementation of
> > > RocksDBConfigSetter via config: StreamsConfig.ROCKSDB_CONFIG_
> SETTER_CLASS
> > > it has a single method:
> > >
> > > public void setConfig(final String storeName, final Options options,
> > > final Map configs)
> > >
> > > in this method you can set various options on the provided Options
> > object.
> > > The options that might help in this case are:
> > > options.setWriteBufferSize(..)  - default in streams is 32MB
> > > options.setMaxWriteBufferNumer(..) - default in streams is 3
> > >
> > > However, i'm no expert on RocksDB and i suggest you have look at
> > > https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide for more
> > > info.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Fri, 25 Nov 2016 at 13:02 Frank Lyaruu  wrote:
> > >
> > > > @Damian:
> > > >
> > > > Yes, it ran before, and it has that 200gb blob worth of Rocksdb stuff
> > > >
> > > > @Svente: It's on a pretty high end san in a managed private cloud,
> I'm
> > > > unsure what the ultimate storage is, but I doubt there is a
> performance
> > > > problem there.
> > > >
> > > > On Fri, 25 Nov 2016 at 13:37, Svante Karlsson <
> svante.karls...@csi.se>
> > > > wrote:
> > > >
> > > > > What kind of disk are you using for the rocksdb store? ie spinning
> or
> > > > ssd?
> > > > >
> > > > > 2016-11-25 12:51 GMT+01:00 Damian Guy 

Re: Initializing StateStores takes *really* long for large datasets

2016-11-25 Thread Frank Lyaruu
I'm running all on a single node, so there is no 'data mobility' involved.
So if Streams does not use any existing data, I might as well wipe the
whole RocksDb before starting, right?

As for the RocksDb tuning, I am using a RocksDBConfigSetter, to reduce the
memory usage a bit:

options.setWriteBufferSize(300);
options.setMaxBytesForLevelBase(3000);
options.setMaxBytesForLevelMultiplier(3);

I needed to do this as my 16Gb machine would die otherwise but I honestly
was just reducing values more or less randomly until it wouldn't fall over.
I have to say this is a big drawback of Rocks, I monitor Java memory usage
but this just sneaks under the radar as it is off heap, and it isn't very
clear what the implications are of different settings, as I can't says
something like the Xmx heap setting, meaning: Take whatever you need up to
this maximum. Also, if I get this right, in the long run, as the data set
changes and grows, I can never be sure it won't take too much memory.

I get the impression I'll be better off with an external store, something I
can monitor, tune and restart separately.

But I'm getting ahead of myself. I'll wipe the data before I start, see if
that gets me any stability




On Fri, Nov 25, 2016 at 4:54 PM, Damian Guy  wrote:

> Hi Frank,
>
> If you have run the app before with the same applicationId, completely shut
> it down, and then restarted it again, it will need to restore all of the
> state which will take some time depending on the amount of data you have.
> In this case the placement of the partitions doesn't take into account any
> existing state stores, so it might need to load quite a lot of data if
> nodes assigned certain partitions don't have that state-store (this is
> something we should look at improving).
>
> As for RocksDB tuning - you can provide an implementation of
> RocksDBConfigSetter via config: StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS
> it has a single method:
>
> public void setConfig(final String storeName, final Options options,
> final Map configs)
>
> in this method you can set various options on the provided Options object.
> The options that might help in this case are:
> options.setWriteBufferSize(..)  - default in streams is 32MB
> options.setMaxWriteBufferNumer(..) - default in streams is 3
>
> However, i'm no expert on RocksDB and i suggest you have look at
> https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide for more
> info.
>
> Thanks,
> Damian
>
> On Fri, 25 Nov 2016 at 13:02 Frank Lyaruu  wrote:
>
> > @Damian:
> >
> > Yes, it ran before, and it has that 200gb blob worth of Rocksdb stuff
> >
> > @Svente: It's on a pretty high end san in a managed private cloud, I'm
> > unsure what the ultimate storage is, but I doubt there is a performance
> > problem there.
> >
> > On Fri, 25 Nov 2016 at 13:37, Svante Karlsson 
> > wrote:
> >
> > > What kind of disk are you using for the rocksdb store? ie spinning or
> > ssd?
> > >
> > > 2016-11-25 12:51 GMT+01:00 Damian Guy :
> > >
> > > > Hi Frank,
> > > >
> > > > Is this on a restart of the application?
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Fri, 25 Nov 2016 at 11:09 Frank Lyaruu  wrote:
> > > >
> > > > > Hi y'all,
> > > > >
> > > > > I have a reasonably simple KafkaStream application, which merges
> > about
> > > 20
> > > > > topics a few times.
> > > > > The thing is, some of those topic datasets are pretty big, about
> 10M
> > > > > messages. In total I've got
> > > > > about 200Gb worth of state in RocksDB, the largest topic is 38 Gb.
> > > > >
> > > > > I had set the MAX_POLL_INTERVAL_MS_CONFIG to one hour to cover the
> > > > > initialization time,
> > > > > but that does not seem nearly enough, I'm looking at more than two
> > hour
> > > > > startup times, and
> > > > > that starts to be a bit ridiculous.
> > > > >
> > > > > Any tips / experiences on how to deal with this case? Move away
> from
> > > > Rocks
> > > > > and use an external
> > > > > data store? Any tuning tips on how to tune Rocks to be a bit more
> > > useful
> > > > > here?
> > > > >
> > > > > regards, Frank
> > > > >
> > > >
> > >
> >
>


Re: Initializing StateStores takes *really* long for large datasets

2016-11-25 Thread Damian Guy
Hi Frank,

If you have run the app before with the same applicationId, completely shut
it down, and then restarted it again, it will need to restore all of the
state which will take some time depending on the amount of data you have.
In this case the placement of the partitions doesn't take into account any
existing state stores, so it might need to load quite a lot of data if
nodes assigned certain partitions don't have that state-store (this is
something we should look at improving).

As for RocksDB tuning - you can provide an implementation of
RocksDBConfigSetter via config: StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS
it has a single method:

public void setConfig(final String storeName, final Options options,
final Map configs)

in this method you can set various options on the provided Options object.
The options that might help in this case are:
options.setWriteBufferSize(..)  - default in streams is 32MB
options.setMaxWriteBufferNumer(..) - default in streams is 3

However, i'm no expert on RocksDB and i suggest you have look at
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide for more info.

Thanks,
Damian

On Fri, 25 Nov 2016 at 13:02 Frank Lyaruu  wrote:

> @Damian:
>
> Yes, it ran before, and it has that 200gb blob worth of Rocksdb stuff
>
> @Svente: It's on a pretty high end san in a managed private cloud, I'm
> unsure what the ultimate storage is, but I doubt there is a performance
> problem there.
>
> On Fri, 25 Nov 2016 at 13:37, Svante Karlsson 
> wrote:
>
> > What kind of disk are you using for the rocksdb store? ie spinning or
> ssd?
> >
> > 2016-11-25 12:51 GMT+01:00 Damian Guy :
> >
> > > Hi Frank,
> > >
> > > Is this on a restart of the application?
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Fri, 25 Nov 2016 at 11:09 Frank Lyaruu  wrote:
> > >
> > > > Hi y'all,
> > > >
> > > > I have a reasonably simple KafkaStream application, which merges
> about
> > 20
> > > > topics a few times.
> > > > The thing is, some of those topic datasets are pretty big, about 10M
> > > > messages. In total I've got
> > > > about 200Gb worth of state in RocksDB, the largest topic is 38 Gb.
> > > >
> > > > I had set the MAX_POLL_INTERVAL_MS_CONFIG to one hour to cover the
> > > > initialization time,
> > > > but that does not seem nearly enough, I'm looking at more than two
> hour
> > > > startup times, and
> > > > that starts to be a bit ridiculous.
> > > >
> > > > Any tips / experiences on how to deal with this case? Move away from
> > > Rocks
> > > > and use an external
> > > > data store? Any tuning tips on how to tune Rocks to be a bit more
> > useful
> > > > here?
> > > >
> > > > regards, Frank
> > > >
> > >
> >
>


Re: Initializing StateStores takes *really* long for large datasets

2016-11-25 Thread Frank Lyaruu
@Damian:

Yes, it ran before, and it has that 200gb blob worth of Rocksdb stuff

@Svente: It's on a pretty high end san in a managed private cloud, I'm
unsure what the ultimate storage is, but I doubt there is a performance
problem there.

On Fri, 25 Nov 2016 at 13:37, Svante Karlsson 
wrote:

> What kind of disk are you using for the rocksdb store? ie spinning or ssd?
>
> 2016-11-25 12:51 GMT+01:00 Damian Guy :
>
> > Hi Frank,
> >
> > Is this on a restart of the application?
> >
> > Thanks,
> > Damian
> >
> > On Fri, 25 Nov 2016 at 11:09 Frank Lyaruu  wrote:
> >
> > > Hi y'all,
> > >
> > > I have a reasonably simple KafkaStream application, which merges about
> 20
> > > topics a few times.
> > > The thing is, some of those topic datasets are pretty big, about 10M
> > > messages. In total I've got
> > > about 200Gb worth of state in RocksDB, the largest topic is 38 Gb.
> > >
> > > I had set the MAX_POLL_INTERVAL_MS_CONFIG to one hour to cover the
> > > initialization time,
> > > but that does not seem nearly enough, I'm looking at more than two hour
> > > startup times, and
> > > that starts to be a bit ridiculous.
> > >
> > > Any tips / experiences on how to deal with this case? Move away from
> > Rocks
> > > and use an external
> > > data store? Any tuning tips on how to tune Rocks to be a bit more
> useful
> > > here?
> > >
> > > regards, Frank
> > >
> >
>


Re: Initializing StateStores takes *really* long for large datasets

2016-11-25 Thread Svante Karlsson
What kind of disk are you using for the rocksdb store? ie spinning or ssd?

2016-11-25 12:51 GMT+01:00 Damian Guy :

> Hi Frank,
>
> Is this on a restart of the application?
>
> Thanks,
> Damian
>
> On Fri, 25 Nov 2016 at 11:09 Frank Lyaruu  wrote:
>
> > Hi y'all,
> >
> > I have a reasonably simple KafkaStream application, which merges about 20
> > topics a few times.
> > The thing is, some of those topic datasets are pretty big, about 10M
> > messages. In total I've got
> > about 200Gb worth of state in RocksDB, the largest topic is 38 Gb.
> >
> > I had set the MAX_POLL_INTERVAL_MS_CONFIG to one hour to cover the
> > initialization time,
> > but that does not seem nearly enough, I'm looking at more than two hour
> > startup times, and
> > that starts to be a bit ridiculous.
> >
> > Any tips / experiences on how to deal with this case? Move away from
> Rocks
> > and use an external
> > data store? Any tuning tips on how to tune Rocks to be a bit more useful
> > here?
> >
> > regards, Frank
> >
>


Re: Initializing StateStores takes *really* long for large datasets

2016-11-25 Thread Damian Guy
Hi Frank,

Is this on a restart of the application?

Thanks,
Damian

On Fri, 25 Nov 2016 at 11:09 Frank Lyaruu  wrote:

> Hi y'all,
>
> I have a reasonably simple KafkaStream application, which merges about 20
> topics a few times.
> The thing is, some of those topic datasets are pretty big, about 10M
> messages. In total I've got
> about 200Gb worth of state in RocksDB, the largest topic is 38 Gb.
>
> I had set the MAX_POLL_INTERVAL_MS_CONFIG to one hour to cover the
> initialization time,
> but that does not seem nearly enough, I'm looking at more than two hour
> startup times, and
> that starts to be a bit ridiculous.
>
> Any tips / experiences on how to deal with this case? Move away from Rocks
> and use an external
> data store? Any tuning tips on how to tune Rocks to be a bit more useful
> here?
>
> regards, Frank
>


Initializing StateStores takes *really* long for large datasets

2016-11-25 Thread Frank Lyaruu
Hi y'all,

I have a reasonably simple KafkaStream application, which merges about 20
topics a few times.
The thing is, some of those topic datasets are pretty big, about 10M
messages. In total I've got
about 200Gb worth of state in RocksDB, the largest topic is 38 Gb.

I had set the MAX_POLL_INTERVAL_MS_CONFIG to one hour to cover the
initialization time,
but that does not seem nearly enough, I'm looking at more than two hour
startup times, and
that starts to be a bit ridiculous.

Any tips / experiences on how to deal with this case? Move away from Rocks
and use an external
data store? Any tuning tips on how to tune Rocks to be a bit more useful
here?

regards, Frank