Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-07-02 Thread Sophie Blee-Goldman
How sure are you that the open file count never goes beyond 50K? Are those
numbers just from a snapshot after it crashed? It's possible rocks is
creating a large number of files just for a short period of time (maybe
while compacting) that causes the open file count to spike and go back down.

For things to try, you should set the rocks config max.open.files to
something less than infinity...if you're OS limit is 1 million and you have
(rounding up) ~5k rocks instances, set this to 1 million / 5k = 200. If you
set a lower limit and still hit this error, we can go from there

On Tue, Jul 2, 2019 at 11:10 PM emailtokir...@gmail.com <
emailtokir...@gmail.com> wrote:

>
>
> On 2019/07/03 05:46:45, Sophie Blee-Goldman  wrote:
> > It sounds like rocksdb *is* honoring your configs -- the max.open.files
> > config is an internal restriction that tells rocksdb how many open files
> it
> > is allowed to have, so if that's set to -1 (infinite) it won't ever try
> to
> > limit its open files and you may hit the OS limit.
> >
> > Think of it this way: if you have 100 rocksdb instances and a OS limit of
> > 500, you should set max.open.files to 5  to avoid hitting this limit
> > (assuming there are no other open files on the system, in reality you'd
> > want some extra room there)
> >
> > On Tue, Jul 2, 2019 at 7:53 PM emailtokir...@gmail.com <
> > emailtokir...@gmail.com> wrote:
> >
> > >
> > >
> > > On 2019/06/28 23:29:16, John Roesler  wrote:
> > > > Hey all,
> > > >
> > > > If you want to figure it out theoretically, if you print out the
> > > > topology description, you'll have some number of state stores listed
> > > > in there. The number of Rocks instances should just be
> > > > (#global_state_stores +
> > > > sum(#partitions_of_topic_per_local_state_store)) . The number of
> > > > stream threads isn't relevant here.
> > > >
> > > > You can also figure it out empirically: the first level of
> > > > subdirectories in the state dir are Tasks, and then within that, the
> > > > next level is Stores. You should see the store directory names match
> > > > up with the stores listed in the topology description. The number of
> > > > Store directories is exactly the number of RocksDB instances you
> have.
> > > >
> > > > There are also metrics corresponding to each of the state stores, so
> > > > you can compute it from what you find in the metrics.
> > > >
> > > > Hope that helps,
> > > > -john
> > > >
> > > > On Thu, Jun 27, 2019 at 6:46 AM Patrik Kleindl 
> > > wrote:
> > > > >
> > > > > Hi Kiran
> > > > > Without much research my guess would be "num_stream_threads *
> > > > > (#global_state_stores +
> > > sum(#partitions_of_topic_per_local_state_store))"
> > > > > So 10 stores (regardless if explicitly defined or implicitely
> because
> > > of
> > > > > some stateful operation) with 10 partitions each should result in
> 100
> > > > > Rocksdb instances if you are running at the default of
> > > num_stream_threads=1.
> > > > >
> > > > > As I wrote before, start with 100.
> > > > > If the error persists, half the number, if not, double it ;-)
> Repeat as
> > > > > needed.
> > > > >
> > > > > If you reach the single-digit-range and the error still shows up,
> start
> > > > > searching for any iterators over a store you might not have closed.
> > > > >
> > > > > br, Patrik
> > > > >
> > > > > On Thu, 27 Jun 2019 at 13:11, emailtokir...@gmail.com <
> > > > > emailtokir...@gmail.com> wrote:
> > > > >
> > > > > >
> > > > > >
> > > > > > On 2019/06/27 09:02:39, Patrik Kleindl 
> wrote:
> > > > > > > Hello Kiran
> > > > > > >
> > > > > > > First, the value for maxOpenFiles is per RocksDB instance, and
> the
> > > number
> > > > > > > of those can get high if you have a lot of topic partitions
> etc.
> > > > > > > Check the directory (state dir) to see how many there are.
> > > > > > > Start with a low value (100) and see if that has some effect.
> > > > > > >
> > > > > > > Second, because I just found out, you should use
> > > > > > > BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
> > > > > > > options.tableFormatConfig();
> > > > > > > tableConfig.setBlockCacheSize(100*1024*1024L);
> > > > > > > tableConfig.setBlockSize(8*1024L);
> > > > > > > instead of creating a new object to prevent accidently messing
> up
> > > > > > > references.
> > > > > > >
> > > > > > > Hope that helps
> > > > > > > best regards
> > > > > > > Patrik
> > > > > > >
> > > > > > > On Thu, 27 Jun 2019 at 10:46, emailtokir...@gmail.com <
> > > > > > > emailtokir...@gmail.com> wrote:
> > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On 2019/06/26 21:58:02, Patrik Kleindl 
> > > wrote:
> > > > > > > > > Hi Kiran
> > > > > > > > > You can use the RocksDBConfigSetter and pass
> > > > > > > > >
> > > > > > > > > options.setMaxOpenFiles(100);
> > > > > > > > >
> > > > > > > > > to all RocksDBs for the Streams application which limits
> how
> > > many are
> > > > > > > > > kept open at the same time.
> > > > > > > > >
> >

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-07-02 Thread emailtokir...@gmail.com



On 2019/07/03 05:46:45, Sophie Blee-Goldman  wrote: 
> It sounds like rocksdb *is* honoring your configs -- the max.open.files
> config is an internal restriction that tells rocksdb how many open files it
> is allowed to have, so if that's set to -1 (infinite) it won't ever try to
> limit its open files and you may hit the OS limit.
> 
> Think of it this way: if you have 100 rocksdb instances and a OS limit of
> 500, you should set max.open.files to 5  to avoid hitting this limit
> (assuming there are no other open files on the system, in reality you'd
> want some extra room there)
> 
> On Tue, Jul 2, 2019 at 7:53 PM emailtokir...@gmail.com <
> emailtokir...@gmail.com> wrote:
> 
> >
> >
> > On 2019/06/28 23:29:16, John Roesler  wrote:
> > > Hey all,
> > >
> > > If you want to figure it out theoretically, if you print out the
> > > topology description, you'll have some number of state stores listed
> > > in there. The number of Rocks instances should just be
> > > (#global_state_stores +
> > > sum(#partitions_of_topic_per_local_state_store)) . The number of
> > > stream threads isn't relevant here.
> > >
> > > You can also figure it out empirically: the first level of
> > > subdirectories in the state dir are Tasks, and then within that, the
> > > next level is Stores. You should see the store directory names match
> > > up with the stores listed in the topology description. The number of
> > > Store directories is exactly the number of RocksDB instances you have.
> > >
> > > There are also metrics corresponding to each of the state stores, so
> > > you can compute it from what you find in the metrics.
> > >
> > > Hope that helps,
> > > -john
> > >
> > > On Thu, Jun 27, 2019 at 6:46 AM Patrik Kleindl 
> > wrote:
> > > >
> > > > Hi Kiran
> > > > Without much research my guess would be "num_stream_threads *
> > > > (#global_state_stores +
> > sum(#partitions_of_topic_per_local_state_store))"
> > > > So 10 stores (regardless if explicitly defined or implicitely because
> > of
> > > > some stateful operation) with 10 partitions each should result in 100
> > > > Rocksdb instances if you are running at the default of
> > num_stream_threads=1.
> > > >
> > > > As I wrote before, start with 100.
> > > > If the error persists, half the number, if not, double it ;-) Repeat as
> > > > needed.
> > > >
> > > > If you reach the single-digit-range and the error still shows up, start
> > > > searching for any iterators over a store you might not have closed.
> > > >
> > > > br, Patrik
> > > >
> > > > On Thu, 27 Jun 2019 at 13:11, emailtokir...@gmail.com <
> > > > emailtokir...@gmail.com> wrote:
> > > >
> > > > >
> > > > >
> > > > > On 2019/06/27 09:02:39, Patrik Kleindl  wrote:
> > > > > > Hello Kiran
> > > > > >
> > > > > > First, the value for maxOpenFiles is per RocksDB instance, and the
> > number
> > > > > > of those can get high if you have a lot of topic partitions etc.
> > > > > > Check the directory (state dir) to see how many there are.
> > > > > > Start with a low value (100) and see if that has some effect.
> > > > > >
> > > > > > Second, because I just found out, you should use
> > > > > > BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
> > > > > > options.tableFormatConfig();
> > > > > > tableConfig.setBlockCacheSize(100*1024*1024L);
> > > > > > tableConfig.setBlockSize(8*1024L);
> > > > > > instead of creating a new object to prevent accidently messing up
> > > > > > references.
> > > > > >
> > > > > > Hope that helps
> > > > > > best regards
> > > > > > Patrik
> > > > > >
> > > > > > On Thu, 27 Jun 2019 at 10:46, emailtokir...@gmail.com <
> > > > > > emailtokir...@gmail.com> wrote:
> > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On 2019/06/26 21:58:02, Patrik Kleindl 
> > wrote:
> > > > > > > > Hi Kiran
> > > > > > > > You can use the RocksDBConfigSetter and pass
> > > > > > > >
> > > > > > > > options.setMaxOpenFiles(100);
> > > > > > > >
> > > > > > > > to all RocksDBs for the Streams application which limits how
> > many are
> > > > > > > > kept open at the same time.
> > > > > > > >
> > > > > > > > best regards
> > > > > > > >
> > > > > > > > Patrik
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, 26 Jun 2019 at 16:14, emailtokir...@gmail.com <
> > > > > > > > emailtokir...@gmail.com> wrote:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > >
> > > > > > > > > We are using Kafka streams DSL APIs for doing some counter
> > > > > aggregations
> > > > > > > > > (running on OpenJDK 11.0.2). Our topology has some 400 sub
> > > > > topologies
> > > > > > > & we
> > > > > > > > > are using 8 partitions in source topic. When we start
> > pumping more
> > > > > > > load, we
> > > > > > > > > start getting RockDBException stating "too many open files".
> > > > > > > > >
> > > > > > > > > Here are the stack trace samples:
> > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > >
> > ---

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-07-02 Thread Thameem Ansari
As I mentioned, tried setting the OS limit to 600K & 1Million on the shell and 
tried to start the application on the same shell but still the problem exists. 
Tried rebooting the laptop and the results are same. So, need a way to find out 
what exactly is causing this issue when we hit close to 42K system limit. 

Thanks
Thameem


> On Jul 3, 2019, at 11:16 AM, Sophie Blee-Goldman  > wrote:
> 
> It sounds like rocksdb *is* honoring your configs -- the max.open.files
> config is an internal restriction that tells rocksdb how many open files it
> is allowed to have, so if that's set to -1 (infinite) it won't ever try to
> limit its open files and you may hit the OS limit.
> 
> Think of it this way: if you have 100 rocksdb instances and a OS limit of
> 500, you should set max.open.files to 5  to avoid hitting this limit
> (assuming there are no other open files on the system, in reality you'd
> want some extra room there)
> 
> On Tue, Jul 2, 2019 at 7:53 PM emailtokir...@gmail.com 
>  <
> emailtokir...@gmail.com > wrote:
> 
>> 
>> 
>> On 2019/06/28 23:29:16, John Roesler > > wrote:
>>> Hey all,
>>> 
>>> If you want to figure it out theoretically, if you print out the
>>> topology description, you'll have some number of state stores listed
>>> in there. The number of Rocks instances should just be
>>> (#global_state_stores +
>>> sum(#partitions_of_topic_per_local_state_store)) . The number of
>>> stream threads isn't relevant here.
>>> 
>>> You can also figure it out empirically: the first level of
>>> subdirectories in the state dir are Tasks, and then within that, the
>>> next level is Stores. You should see the store directory names match
>>> up with the stores listed in the topology description. The number of
>>> Store directories is exactly the number of RocksDB instances you have.
>>> 
>>> There are also metrics corresponding to each of the state stores, so
>>> you can compute it from what you find in the metrics.
>>> 
>>> Hope that helps,
>>> -john
>>> 
>>> On Thu, Jun 27, 2019 at 6:46 AM Patrik Kleindl >> >
>> wrote:
 
 Hi Kiran
 Without much research my guess would be "num_stream_threads *
 (#global_state_stores +
>> sum(#partitions_of_topic_per_local_state_store))"
 So 10 stores (regardless if explicitly defined or implicitely because
>> of
 some stateful operation) with 10 partitions each should result in 100
 Rocksdb instances if you are running at the default of
>> num_stream_threads=1.
 
 As I wrote before, start with 100.
 If the error persists, half the number, if not, double it ;-) Repeat as
 needed.
 
 If you reach the single-digit-range and the error still shows up, start
 searching for any iterators over a store you might not have closed.
 
 br, Patrik
 
 On Thu, 27 Jun 2019 at 13:11, emailtokir...@gmail.com 
  <
 emailtokir...@gmail.com > wrote:
 
> 
> 
> On 2019/06/27 09:02:39, Patrik Kleindl  > wrote:
>> Hello Kiran
>> 
>> First, the value for maxOpenFiles is per RocksDB instance, and the
>> number
>> of those can get high if you have a lot of topic partitions etc.
>> Check the directory (state dir) to see how many there are.
>> Start with a low value (100) and see if that has some effect.
>> 
>> Second, because I just found out, you should use
>> BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
>> options.tableFormatConfig();
>>tableConfig.setBlockCacheSize(100*1024*1024L);
>>tableConfig.setBlockSize(8*1024L);
>> instead of creating a new object to prevent accidently messing up
>> references.
>> 
>> Hope that helps
>> best regards
>> Patrik
>> 
>> On Thu, 27 Jun 2019 at 10:46, emailtokir...@gmail.com 
>>  <
>> emailtokir...@gmail.com > wrote:
>> 
>>> 
>>> 
>>> On 2019/06/26 21:58:02, Patrik Kleindl >> >
>> wrote:
 Hi Kiran
 You can use the RocksDBConfigSetter and pass
 
 options.setMaxOpenFiles(100);
 
 to all RocksDBs for the Streams application which limits how
>> many are
 kept open at the same time.
 
 best regards
 
 Patrik
 
 
 On Wed, 26 Jun 2019 at 16:14, emailtokir...@gmail.com 
  <
 emailtokir...@gmail.com > wrote:
 
> Hi,
> 
> We are using Kafka streams DSL APIs for doing some counter
> aggregations
> (running on OpenJDK 11.0.2). Our topology has some 400 sub
> topologies
>>> & we
>>>

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-07-02 Thread Sophie Blee-Goldman
It sounds like rocksdb *is* honoring your configs -- the max.open.files
config is an internal restriction that tells rocksdb how many open files it
is allowed to have, so if that's set to -1 (infinite) it won't ever try to
limit its open files and you may hit the OS limit.

Think of it this way: if you have 100 rocksdb instances and a OS limit of
500, you should set max.open.files to 5  to avoid hitting this limit
(assuming there are no other open files on the system, in reality you'd
want some extra room there)

On Tue, Jul 2, 2019 at 7:53 PM emailtokir...@gmail.com <
emailtokir...@gmail.com> wrote:

>
>
> On 2019/06/28 23:29:16, John Roesler  wrote:
> > Hey all,
> >
> > If you want to figure it out theoretically, if you print out the
> > topology description, you'll have some number of state stores listed
> > in there. The number of Rocks instances should just be
> > (#global_state_stores +
> > sum(#partitions_of_topic_per_local_state_store)) . The number of
> > stream threads isn't relevant here.
> >
> > You can also figure it out empirically: the first level of
> > subdirectories in the state dir are Tasks, and then within that, the
> > next level is Stores. You should see the store directory names match
> > up with the stores listed in the topology description. The number of
> > Store directories is exactly the number of RocksDB instances you have.
> >
> > There are also metrics corresponding to each of the state stores, so
> > you can compute it from what you find in the metrics.
> >
> > Hope that helps,
> > -john
> >
> > On Thu, Jun 27, 2019 at 6:46 AM Patrik Kleindl 
> wrote:
> > >
> > > Hi Kiran
> > > Without much research my guess would be "num_stream_threads *
> > > (#global_state_stores +
> sum(#partitions_of_topic_per_local_state_store))"
> > > So 10 stores (regardless if explicitly defined or implicitely because
> of
> > > some stateful operation) with 10 partitions each should result in 100
> > > Rocksdb instances if you are running at the default of
> num_stream_threads=1.
> > >
> > > As I wrote before, start with 100.
> > > If the error persists, half the number, if not, double it ;-) Repeat as
> > > needed.
> > >
> > > If you reach the single-digit-range and the error still shows up, start
> > > searching for any iterators over a store you might not have closed.
> > >
> > > br, Patrik
> > >
> > > On Thu, 27 Jun 2019 at 13:11, emailtokir...@gmail.com <
> > > emailtokir...@gmail.com> wrote:
> > >
> > > >
> > > >
> > > > On 2019/06/27 09:02:39, Patrik Kleindl  wrote:
> > > > > Hello Kiran
> > > > >
> > > > > First, the value for maxOpenFiles is per RocksDB instance, and the
> number
> > > > > of those can get high if you have a lot of topic partitions etc.
> > > > > Check the directory (state dir) to see how many there are.
> > > > > Start with a low value (100) and see if that has some effect.
> > > > >
> > > > > Second, because I just found out, you should use
> > > > > BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
> > > > > options.tableFormatConfig();
> > > > > tableConfig.setBlockCacheSize(100*1024*1024L);
> > > > > tableConfig.setBlockSize(8*1024L);
> > > > > instead of creating a new object to prevent accidently messing up
> > > > > references.
> > > > >
> > > > > Hope that helps
> > > > > best regards
> > > > > Patrik
> > > > >
> > > > > On Thu, 27 Jun 2019 at 10:46, emailtokir...@gmail.com <
> > > > > emailtokir...@gmail.com> wrote:
> > > > >
> > > > > >
> > > > > >
> > > > > > On 2019/06/26 21:58:02, Patrik Kleindl 
> wrote:
> > > > > > > Hi Kiran
> > > > > > > You can use the RocksDBConfigSetter and pass
> > > > > > >
> > > > > > > options.setMaxOpenFiles(100);
> > > > > > >
> > > > > > > to all RocksDBs for the Streams application which limits how
> many are
> > > > > > > kept open at the same time.
> > > > > > >
> > > > > > > best regards
> > > > > > >
> > > > > > > Patrik
> > > > > > >
> > > > > > >
> > > > > > > On Wed, 26 Jun 2019 at 16:14, emailtokir...@gmail.com <
> > > > > > > emailtokir...@gmail.com> wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > We are using Kafka streams DSL APIs for doing some counter
> > > > aggregations
> > > > > > > > (running on OpenJDK 11.0.2). Our topology has some 400 sub
> > > > topologies
> > > > > > & we
> > > > > > > > are using 8 partitions in source topic. When we start
> pumping more
> > > > > > load, we
> > > > > > > > start getting RockDBException stating "too many open files".
> > > > > > > >
> > > > > > > > Here are the stack trace samples:
> > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> --
> > > > > > > > Caused by: org.rocksdb.RocksDBException: while open a file
> for
> > > > lock:
> > > > > > > > PPP.151200/LOCK: Too many open files
> > > > > > > > at org.rocksdb.RocksDB.open(Native Method)
> > > > > > > > at org.rocksdb.RocksDB.open(RocksDB.java:235)
> > >

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-07-02 Thread Thameem Ansari
Many places it is mentioned that closing the iterator is fixing the issue but 
this is true only if we use Processor APIs. But in DSL there is no iterator 
explicitly available and we are using wrapper methods like aggregate, map, 
groupBy, etc. 

Here is the snapshot of the issue with exact statistics observed from the 
recent run in Mac with Mojave. 
 
- Number of state directories are 3154 and hence there will be 3154 rocksdb 
instances 
 - OS openfiles limit was set to 1Million, here is the break out on number of 
open files: 
Total open files from the system: 41645
Open files from stream application: 16545
Open files related to state directories: 16025
So, if we do the math 16025/3154 ~ 5 files per instance 

 - Following parameters were used but still problem exists 
cache.index.and.filter.blocks=false 
block.cache.size=100MB
code block.size=8MB
max.write.buffer.number=2 
table.cache.numshardbits=8 
max.open.files=-1 
compaction.readahead.size=256MB 
skip.stats.update_on_db_open=true 
write.buffer.size=32MB
 - Topic has 8 partitions and the streaming application is running as SINGLE 
instance with SINGLE thread 
 - Noticed these rocksdb properties have been consumed by the app but not 
working as expected (or defined in the documentation)
 - Observed no issues reported related to memory  

Thanks
Thameem


On 2019/07/03 02:53:20, "e...@gmail.com> wrote: 
> 
> 
> On 2019/06/28 23:29:16, John Roesler  wrote: > 
> > Hey all,> 
> > > 
> > If you want to figure it out theoretically, if you print out the> 
> > topology description, you'll have some number of state stores listed> 
> > in there. The number of Rocks instances should just be> 
> > (#global_state_stores +> 
> > sum(#partitions_of_topic_per_local_state_store)) . The number of> 
> > stream threads isn't relevant here.> 
> > > 
> > You can also figure it out empirically: the first level of> 
> > subdirectories in the state dir are Tasks, and then within that, the> 
> > next level is Stores. You should see the store directory names match> 
> > up with the stores listed in the topology description. The number of> 
> > Store directories is exactly the number of RocksDB instances you have.> 
> > > 
> > There are also metrics corresponding to each of the state stores, so> 
> > you can compute it from what you find in the metrics.> 
> > > 
> > Hope that helps,> 
> > -john> 
> > > 
> > On Thu, Jun 27, 2019 at 6:46 AM Patrik Kleindl  wrote:> 
> > >> 
> > > Hi Kiran> 
> > > Without much research my guess would be "num_stream_threads *> 
> > > (#global_state_stores + 
> > > sum(#partitions_of_topic_per_local_state_store))"> 
> > > So 10 stores (regardless if explicitly defined or implicitely because of> 
> > > some stateful operation) with 10 partitions each should result in 100> 
> > > Rocksdb instances if you are running at the default of 
> > > num_stream_threads=1.> 
> > >> 
> > > As I wrote before, start with 100.> 
> > > If the error persists, half the number, if not, double it ;-) Repeat as> 
> > > needed.> 
> > >> 
> > > If you reach the single-digit-range and the error still shows up, start> 
> > > searching for any iterators over a store you might not have closed.> 
> > >> 
> > > br, Patrik> 
> > >> 
> > > On Thu, 27 Jun 2019 at 13:11, emailtokir...@gmail.com <> 
> > > emailtokir...@gmail.com> wrote:> 
> > >> 
> > > >> 
> > > >> 
> > > > On 2019/06/27 09:02:39, Patrik Kleindl  wrote:> 
> > > > > Hello Kiran> 
> > > > >> 
> > > > > First, the value for maxOpenFiles is per RocksDB instance, and the 
> > > > > number> 
> > > > > of those can get high if you have a lot of topic partitions etc.> 
> > > > > Check the directory (state dir) to see how many there are.> 
> > > > > Start with a low value (100) and see if that has some effect.> 
> > > > >> 
> > > > > Second, because I just found out, you should use> 
> > > > > BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)> 
> > > > > options.tableFormatConfig();> 
> > > > > tableConfig.setBlockCacheSize(100*1024*1024L);> 
> > > > > tableConfig.setBlockSize(8*1024L);> 
> > > > > instead of creating a new object to prevent accidently messing up> 
> > > > > references.> 
> > > > >> 
> > > > > Hope that helps> 
> > > > > best regards> 
> > > > > Patrik> 
> > > > >> 
> > > > > On Thu, 27 Jun 2019 at 10:46, emailtokir...@gmail.com <> 
> > > > > emailtokir...@gmail.com> wrote:> 
> > > > >> 
> > > > > >> 
> > > > > >> 
> > > > > > On 2019/06/26 21:58:02, Patrik Kleindl  wrote:> 
> > > > > > > Hi Kiran> 
> > > > > > > You can use the RocksDBConfigSetter and pass> 
> > > > > > >> 
> > > > > > > options.setMaxOpenFiles(100);> 
> > > > > > >> 
> > > > > > > to all RocksDBs for the Streams application which limits how many 
> > > > > > > are> 
> > > > > > > kept open at the same time.> 
> > > > > > >> 
> > > > > > > best regards> 
> > > > > > >> 
> > > > > > > Patrik> 
> > > > > > >

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-07-02 Thread emailtokir...@gmail.com



On 2019/06/28 23:29:16, John Roesler  wrote: 
> Hey all,
> 
> If you want to figure it out theoretically, if you print out the
> topology description, you'll have some number of state stores listed
> in there. The number of Rocks instances should just be
> (#global_state_stores +
> sum(#partitions_of_topic_per_local_state_store)) . The number of
> stream threads isn't relevant here.
> 
> You can also figure it out empirically: the first level of
> subdirectories in the state dir are Tasks, and then within that, the
> next level is Stores. You should see the store directory names match
> up with the stores listed in the topology description. The number of
> Store directories is exactly the number of RocksDB instances you have.
> 
> There are also metrics corresponding to each of the state stores, so
> you can compute it from what you find in the metrics.
> 
> Hope that helps,
> -john
> 
> On Thu, Jun 27, 2019 at 6:46 AM Patrik Kleindl  wrote:
> >
> > Hi Kiran
> > Without much research my guess would be "num_stream_threads *
> > (#global_state_stores + sum(#partitions_of_topic_per_local_state_store))"
> > So 10 stores (regardless if explicitly defined or implicitely because of
> > some stateful operation) with 10 partitions each should result in 100
> > Rocksdb instances if you are running at the default of num_stream_threads=1.
> >
> > As I wrote before, start with 100.
> > If the error persists, half the number, if not, double it ;-) Repeat as
> > needed.
> >
> > If you reach the single-digit-range and the error still shows up, start
> > searching for any iterators over a store you might not have closed.
> >
> > br, Patrik
> >
> > On Thu, 27 Jun 2019 at 13:11, emailtokir...@gmail.com <
> > emailtokir...@gmail.com> wrote:
> >
> > >
> > >
> > > On 2019/06/27 09:02:39, Patrik Kleindl  wrote:
> > > > Hello Kiran
> > > >
> > > > First, the value for maxOpenFiles is per RocksDB instance, and the 
> > > > number
> > > > of those can get high if you have a lot of topic partitions etc.
> > > > Check the directory (state dir) to see how many there are.
> > > > Start with a low value (100) and see if that has some effect.
> > > >
> > > > Second, because I just found out, you should use
> > > > BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
> > > > options.tableFormatConfig();
> > > > tableConfig.setBlockCacheSize(100*1024*1024L);
> > > > tableConfig.setBlockSize(8*1024L);
> > > > instead of creating a new object to prevent accidently messing up
> > > > references.
> > > >
> > > > Hope that helps
> > > > best regards
> > > > Patrik
> > > >
> > > > On Thu, 27 Jun 2019 at 10:46, emailtokir...@gmail.com <
> > > > emailtokir...@gmail.com> wrote:
> > > >
> > > > >
> > > > >
> > > > > On 2019/06/26 21:58:02, Patrik Kleindl  wrote:
> > > > > > Hi Kiran
> > > > > > You can use the RocksDBConfigSetter and pass
> > > > > >
> > > > > > options.setMaxOpenFiles(100);
> > > > > >
> > > > > > to all RocksDBs for the Streams application which limits how many 
> > > > > > are
> > > > > > kept open at the same time.
> > > > > >
> > > > > > best regards
> > > > > >
> > > > > > Patrik
> > > > > >
> > > > > >
> > > > > > On Wed, 26 Jun 2019 at 16:14, emailtokir...@gmail.com <
> > > > > > emailtokir...@gmail.com> wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > We are using Kafka streams DSL APIs for doing some counter
> > > aggregations
> > > > > > > (running on OpenJDK 11.0.2). Our topology has some 400 sub
> > > topologies
> > > > > & we
> > > > > > > are using 8 partitions in source topic. When we start pumping more
> > > > > load, we
> > > > > > > start getting RockDBException stating "too many open files".
> > > > > > >
> > > > > > > Here are the stack trace samples:
> > > > > > >
> > > > > > >
> > > > >
> > > --
> > > > > > > Caused by: org.rocksdb.RocksDBException: while open a file for
> > > lock:
> > > > > > > PPP.151200/LOCK: Too many open files
> > > > > > > at org.rocksdb.RocksDB.open(Native Method)
> > > > > > > at org.rocksdb.RocksDB.open(RocksDB.java:235)
> > > > > > > at
> > > > > > >
> > > > >
> > > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:156)
> > > > > > > ... 24 common frames omitted
> > > > > > >
> > > > > > >
> > > > > > > Caused by: 
> > > > > > > org.apache.kafka.streams.errors.ProcessorStateException:
> > > > > Error
> > > > > > > while executing flush from store XXX.151200
> > > > > > > at
> > > > > > >
> > > > >
> > > org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:397)
> > > > > > > at
> > > > > > >
> > > > >
> > > org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:388)
> > > > > > > at
> > > > > > >
> > > > >
> > > org.apache.kafka.streams.state.internals.Segments.flush(Segments.java:163)
> > > > 

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-07-02 Thread Sophie Blee-Goldman
This can also happen if you have any open iterators that you forget to
close (for example using IQ), although that's probably not what's going on
here since 3152 is certainly a lot of rocks instances for a single fs.

There's no default number of open files per instance, since rocks creates
new files on new levels as you add more data. To understand the impact you
should check out the description of *max_open_files* here

--
but you will probably want to increase your system limit in addition to
constraining the number of open files per instance

On Mon, Jul 1, 2019 at 9:58 AM emailtokir...@gmail.com <
emailtokir...@gmail.com> wrote:

>
>
> On 2019/06/28 23:29:16, John Roesler  wrote:
> > Hey all,
> >
> > If you want to figure it out theoretically, if you print out the
> > topology description, you'll have some number of state stores listed
> > in there. The number of Rocks instances should just be
> > (#global_state_stores +
> > sum(#partitions_of_topic_per_local_state_store)) . The number of
> > stream threads isn't relevant here.
> >
> > You can also figure it out empirically: the first level of
> > subdirectories in the state dir are Tasks, and then within that, the
> > next level is Stores. You should see the store directory names match
> > up with the stores listed in the topology description. The number of
> > Store directories is exactly the number of RocksDB instances you have.
> >
> > There are also metrics corresponding to each of the state stores, so
> > you can compute it from what you find in the metrics.
> >
> > Hope that helps,
> > -john
> >
> > On Thu, Jun 27, 2019 at 6:46 AM Patrik Kleindl 
> wrote:
> > >
> > > Hi Kiran
> > > Without much research my guess would be "num_stream_threads *
> > > (#global_state_stores +
> sum(#partitions_of_topic_per_local_state_store))"
> > > So 10 stores (regardless if explicitly defined or implicitely because
> of
> > > some stateful operation) with 10 partitions each should result in 100
> > > Rocksdb instances if you are running at the default of
> num_stream_threads=1.
> > >
> > > As I wrote before, start with 100.
> > > If the error persists, half the number, if not, double it ;-) Repeat as
> > > needed.
> > >
> > > If you reach the single-digit-range and the error still shows up, start
> > > searching for any iterators over a store you might not have closed.
> > >
> > > br, Patrik
> > >
> > > On Thu, 27 Jun 2019 at 13:11, emailtokir...@gmail.com <
> > > emailtokir...@gmail.com> wrote:
> > >
> > > >
> > > >
> > > > On 2019/06/27 09:02:39, Patrik Kleindl  wrote:
> > > > > Hello Kiran
> > > > >
> > > > > First, the value for maxOpenFiles is per RocksDB instance, and the
> number
> > > > > of those can get high if you have a lot of topic partitions etc.
> > > > > Check the directory (state dir) to see how many there are.
> > > > > Start with a low value (100) and see if that has some effect.
> > > > >
> > > > > Second, because I just found out, you should use
> > > > > BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
> > > > > options.tableFormatConfig();
> > > > > tableConfig.setBlockCacheSize(100*1024*1024L);
> > > > > tableConfig.setBlockSize(8*1024L);
> > > > > instead of creating a new object to prevent accidently messing up
> > > > > references.
> > > > >
> > > > > Hope that helps
> > > > > best regards
> > > > > Patrik
> > > > >
> > > > > On Thu, 27 Jun 2019 at 10:46, emailtokir...@gmail.com <
> > > > > emailtokir...@gmail.com> wrote:
> > > > >
> > > > > >
> > > > > >
> > > > > > On 2019/06/26 21:58:02, Patrik Kleindl 
> wrote:
> > > > > > > Hi Kiran
> > > > > > > You can use the RocksDBConfigSetter and pass
> > > > > > >
> > > > > > > options.setMaxOpenFiles(100);
> > > > > > >
> > > > > > > to all RocksDBs for the Streams application which limits how
> many are
> > > > > > > kept open at the same time.
> > > > > > >
> > > > > > > best regards
> > > > > > >
> > > > > > > Patrik
> > > > > > >
> > > > > > >
> > > > > > > On Wed, 26 Jun 2019 at 16:14, emailtokir...@gmail.com <
> > > > > > > emailtokir...@gmail.com> wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > We are using Kafka streams DSL APIs for doing some counter
> > > > aggregations
> > > > > > > > (running on OpenJDK 11.0.2). Our topology has some 400 sub
> > > > topologies
> > > > > > & we
> > > > > > > > are using 8 partitions in source topic. When we start
> pumping more
> > > > > > load, we
> > > > > > > > start getting RockDBException stating "too many open files".
> > > > > > > >
> > > > > > > > Here are the stack trace samples:
> > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> --
> > > > > > > > Caused by: org.rocksdb.RocksDBException: while open a file
> for
> > > > lock:
> > > > > > > > PPP.151200/LOCK: Too many open files
> > > > > > > > at or

Re: Does anyone fixed Producer TimeoutException problem ?

2019-07-02 Thread SenthilKumar K
Does it happen to all partitions or only few partitions ? Can you make sure
your local setup working fine ? Were you able to produce using
console-producer ?

Example :
EVERE: Expiring 7 record(s) for topic-9{partition:9}: 30022 ms has passed
since last append
Expiring 9 record(s) for topic-2{partition:2}: 30015 ms has passed since
batch creation plus linger time

--Senthil

On Tue, Jul 2, 2019 at 5:34 PM Shyam P  wrote:

> Thanks a lot Senthil for quick reply.
> I am using  kafka_2.11-2.1.1 .
> In your case  Kafka Producer Client in One DataCenter and Kafka Broker in
> other DataCenter  but in my case I installed Kafka on the same machine
> where Producer is running.
> i.e. currently I am in development mode , so everything now on my local
> for timebeing ...i.e. Kafka broker , zk and my producer code in eclipse.
>
> If is is a set up issue at least it should run fine in my local right.
> I tried several producer configurations combinations as explained in the
> SOF link.
>
> So not sure now what is the issue and how to fix it ?
>
> Is in your case the issue fixed ?
>
> Regards,
> Shyam
>
> On Tue, Jul 2, 2019 at 5:12 PM SenthilKumar K 
> wrote:
>
>> Hi Shyam, We also faced `TimeoutException: Expiring 1 record(s)` issue in
>> our Kafka Producer Client. As described here
>> <
>> https://stackoverflow.com/questions/56807188/how-to-fix-kafka-common-errors-timeoutexception-expiring-1-records-xxx-ms-has
>> >
>> ,
>> first we tried increasing request timeout but that didn't help.  We had
>> setup like Kafka Producer Client in One DataCenter and Kafka Broker in
>> other DataCenter & thats why the producer failed to push records to
>> brokers
>> on time due to network issue. In your case , Could be setup issue ?
>>
>> --Senthil
>>
>> On Tue, Jul 2, 2019 at 3:57 PM Shyam P  wrote:
>>
>> > Hi,
>> >  I am facing the below issue.
>> >
>> > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
>> > for  229 ms has passed since batch creation plus linger
>> > time
>> >
>> >
>> > I tried many producer configuration settings. more details below :
>> >
>> >
>> https://stackoverflow.com/questions/56807188/how-to-fix-kafka-common-errors-timeoutexception-expiring-1-records-xxx-ms-has
>> >
>> > But nothing working.
>> >
>> > Can anyone plz help me , what is wrong here and how to fix it ?
>> >
>> > thanks,
>> > Shyam
>> >
>>
>


Replica movement between log directories

2019-07-02 Thread Karolis Pocius
Not having much luck with replica movement between directories, so I'd
appreciate if someone validated the steps that I'm taking:

1. Create topics to move json file (with a single topic)
2. Generate a candidate partition reassignment
3. Take the above and replace all instances of "any" with
"/path-to-log-dir" (I want certain partitions moved to a specific log dir
that is the same on each of the five brokers in the cluster)
4. Create reassignment json with the data from step #3
5. Execute reassignment with an increased timeout, just to be safe

What happens next is that some partitions reassign just fine, while others
throw a warning and get stuck forever. Here's the full log for one of the
attempted reassignments:

[2019-07-02 13:58:40,330] INFO [Log partition=topic.0-7, dir=/kafka-data-2]
Loading producer state till offset 0 with message format version 2
(kafka.log.Log)
[2019-07-02 13:58:40,330] INFO [Log partition=topic.0-7, dir=/kafka-data-2]
Completed load of log with 1 segments, log start offset 0 and log end
offset 0 in 1 ms (kafka.log.Log)
[2019-07-02 13:58:40,330] INFO Created log for partition topic.0-7 in
/kafka-data-2 with properties {compression.type -> producer,
message.downconversion.enable -> true, min.insync.replicas -> 2,
segment.jitter.ms -> 0, cleanup.policy -> delete, flush.ms ->
9223372036854775807, segment.bytes -> 1073741824, retention.ms ->
60480, flush.messages -> 9223372036854775807, message.format.version ->
2.2-IV1, file.delete.delay.ms -> 6, max.compaction.lag.ms ->
9223372036854775807, max.message.bytes -> 52428800, min.compaction.lag.ms
-> 0, message.timestamp.type -> LogAppendTime, preallocate -> false,
min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096,
unclean.leader.election.enable -> false, retention.bytes -> -1,
delete.retention.ms -> 8640, segment.ms -> 60480,
message.timestamp.difference.max.ms -> 9223372036854775807,
segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2019-07-02 13:58:40,331] INFO [Partition topic.0-7 broker=2] No
checkpointed highwatermark is found for partition topic.0-7
(kafka.cluster.Partition)
[2019-07-02 13:58:40,331] INFO Replica loaded for partition topic.0-7 with
initial high watermark 0 (kafka.cluster.Replica)
[2019-07-02 13:58:40,331] INFO [ReplicaAlterLogDirsManager on broker 2]
Added fetcher to broker BrokerEndPoint(id=2, host=localhost:-1) for
partitions Map(topic.0-7 -> (offset=0, leaderEpoch=84))
(kafka.server.ReplicaAlterLogDirsManager)
[2019-07-02 13:58:40,389] INFO [ReplicaAlterLogDirsThread-0]: Truncating
partition topic.0-7 to local high watermark 0
(kafka.server.ReplicaAlterLogDirsThread)
[2019-07-02 13:58:40,389] INFO [Log partition=topic.0-7, dir=/kafka-data-2]
Truncating to 0 has no effect as the largest offset in the log is -1
(kafka.log.Log)
[2019-07-02 13:58:41,043] INFO [ReplicaFetcherManager on broker 2] Removed
fetcher for partitions Set(topic.0-7) (kafka.server.ReplicaFetcherManager)
[2019-07-02 13:58:41,043] INFO [ReplicaFetcherManager on broker 2] Added
fetcher to broker BrokerEndPoint(id=0,
host=.ec2.internal:9092) for partitions Map(topic.0-7 ->
(offset=59338, leaderEpoch=85)) (kafka.server.ReplicaFetcherManager)
[2019-07-02 13:58:41,203] INFO [Log partition=topic.0-7, dir=/kafka-data-1]
Truncating to 59338 has no effect as the largest offset in the log is 59337
(kafka.log.Log)
[2019-07-02 13:58:41,227] INFO [ReplicaAlterLogDirsThread-0]: Truncating
partition topic.0-7 to local high watermark 0
(kafka.server.ReplicaAlterLogDirsThread)
[2019-07-02 13:58:41,227] INFO [Log partition=topic.0-7, dir=/kafka-data-2]
Truncating to 0 has no effect as the largest offset in the log is -1
(kafka.log.Log)
[2019-07-02 13:58:41,229] INFO [ReplicaAlterLogDirsThread-0]:
Beginning/resuming copy of partition topic.0-7 from offset 0. Including
this partition, there are 5 remaining partitions to copy by this thread.
(kafka.server.ReplicaAlterLogDirsThread)
[2019-07-02 13:58:41,229] INFO [ReplicaAlterLogDirsThread-0]: Partition
topic.0-7 has an older epoch (84) than the current leader. Will await the
new LeaderAndIsr state before resuming fetching.
(kafka.server.ReplicaAlterLogDirsThread)
[2019-07-02 13:58:41,229] WARN [ReplicaAlterLogDirsThread-0]: Partition
topic.0-7 marked as failed (kafka.server.ReplicaAlterLogDirsThread)
[2019-07-02 13:58:41,667] INFO [ReplicaAlterLogDirsThread-0]: Shutting down
(kafka.server.ReplicaAlterLogDirsThread)
[2019-07-02 13:58:41,667] INFO [ReplicaAlterLogDirsThread-0]: Shutdown
completed (kafka.server.ReplicaAlterLogDirsThread)
[2019-07-02 13:58:41,667] INFO [ReplicaAlterLogDirsThread-0]: Stopped
(kafka.server.ReplicaAlterLogDirsThread)

I have upgraded from 2.2.1 to 2.3.0 (haven't changed inter.broker.protocol
yet) hoping that KAFKA-8346 would somehow improve the situation, but it
seems that it just keeps the thread from dying.

Any pointers to what might be going wrong here would be appreciated.


Re: Kafka Streams - Getting exception org.apache.kafka.common.network.InvalidReceiveException exception in cloud

2019-07-02 Thread Vigneswaran Gunasekaran (vicky86)
Hi Jason,

Thanks for your reply.
I have no idea what the "client_id' field means. Because I am not having this 
field anywhere else. 
For the corrupted data, we are receiving the data properly and we are getting 
this exception intermediately. And after two to three days application stops 
working because of org.apache.kafka.common.errors.TimeoutException.

We couldn't able to debug this issue in local, because this happens in live and 
its real time.

Thanks,
Vigneswaran

On 02/07/19, 5:54 PM, "Jason Turim"  wrote:

>
> [2019-06-29 21:19:43,050] ERROR Exception while processing request from
> 172.21.46.208:9092-172.21.4.208:38368-2446 (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error parsing
> request header. Our best guess of the apiKey is: -32767
> Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error
> reading field 'client_id': Error reading string of length 27759, only 73
> bytes available
> at org.apache.kafka.common.protocol.types.Schema.read(Schema.
> java:77)
>

It looks like corrupt data is being introduced, maybe a null character in
the `client_id` field?



On Tue, Jul 2, 2019 at 8:13 AM Vigneswaran Gunasekaran (vicky86) <
vick...@cisco.com> wrote:

> Can anybody help me on this issue?
>
> Thanks,
> Vigneswaran
>
> From: "Vigneswaran Gunasekaran (vicky86)" 
> Date: Monday, 1 July 2019 at 12:45 PM
> To: "users@kafka.apache.org" 
> Subject: Re: Kafka Streams - Getting exception
> org.apache.kafka.common.network.InvalidReceiveException exception in cloud
>
> Hi Team,
>
> I am using kafka streams in my application and I am running in cloud with
> 5 kafka instances. I am getting below exception in kafka machines and 
after
> some point my application stop working because of
> org.apache.kafka.common.errors.TimeoutException. Please help me on this as
> we couldn’t  move further.
>
> server.properties(kafka performance configuration):
> num.network.threads=8
> socket.receive.buffer.bytes=-1
> socket.send.buffer.bytes=-1
> socket.request.max.bytes=2147483647
>
>
> Kafka Exception:
> [2019-06-29 21:17:57,649] INFO [Log
> partition=location-analytics-live-v2-visit-processing-5,
> dir=/mnt/data/kafka/kafka-logs] Incrementing log start offset to 263555323
> (kafka.log.Log)
> [2019-06-29 21:18:18,992] WARN [SocketServer brokerId=0] Unexpected error
> from /172.21.4.208; closing connection
> (org.apache.kafka.common.network.Selector)
> org.apache.kafka.common.network.InvalidReceiveException: Invalid receive
> (size = -1145372416)
> at
> 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102)
> at
> 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381)
> at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342)
> at
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609)
> at
> 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541)
> at 
org.apache.kafka.common.network.Selector.poll(Selector.java:467)
> at kafka.network.Processor.poll(SocketServer.scala:689)
> at kafka.network.Processor.run(SocketServer.scala:594)
> at java.lang.Thread.run(Thread.java:748)
> [2019-06-29 21:18:18,993] WARN [SocketServer brokerId=0] Unexpected error
> from /172.21.4.208; closing connection
> (org.apache.kafka.common.network.Selector)
> org.apache.kafka.common.network.InvalidReceiveException: Invalid receive
> (size = -1145372671)
> at
> 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102)
> at
> 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381)
> at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342)
> at
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609)
> at
> 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541)
> at 
org.apache.kafka.common.network.Selector.poll(Selector.java:467)
> at kafka.network.Processor.poll(SocketServer.scala:689)
> at kafka.network.Processor.run(SocketServer.scala:594)
> at java.lang.Thread.run(Thread.java:748)
> [2019-06-29 21:18:57,649] INFO [Log
> partition=location-analytics-live-v2-visit-processing-5,
> dir=/mnt/data/kafka/kafka-logs] Deleting segment 256880856 (kafka.log.Log)
> [2019-06-29 21:18:57,676] INFO Deleted log
> 
/mnt/data/kafka/kafka-logs/location-analytics-live-v2-visit-processing-5/000256880856.log.deleted.
> (kafka.log.LogSegment)
> [2019-06-29 21:18:5

Re: Kafka Streams - Getting exception org.apache.kafka.common.network.InvalidReceiveException exception in cloud

2019-07-02 Thread Jason Turim
>
> [2019-06-29 21:19:43,050] ERROR Exception while processing request from
> 172.21.46.208:9092-172.21.4.208:38368-2446 (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error parsing
> request header. Our best guess of the apiKey is: -32767
> Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error
> reading field 'client_id': Error reading string of length 27759, only 73
> bytes available
> at org.apache.kafka.common.protocol.types.Schema.read(Schema.
> java:77)
>

It looks like corrupt data is being introduced, maybe a null character in
the `client_id` field?



On Tue, Jul 2, 2019 at 8:13 AM Vigneswaran Gunasekaran (vicky86) <
vick...@cisco.com> wrote:

> Can anybody help me on this issue?
>
> Thanks,
> Vigneswaran
>
> From: "Vigneswaran Gunasekaran (vicky86)" 
> Date: Monday, 1 July 2019 at 12:45 PM
> To: "users@kafka.apache.org" 
> Subject: Re: Kafka Streams - Getting exception
> org.apache.kafka.common.network.InvalidReceiveException exception in cloud
>
> Hi Team,
>
> I am using kafka streams in my application and I am running in cloud with
> 5 kafka instances. I am getting below exception in kafka machines and after
> some point my application stop working because of
> org.apache.kafka.common.errors.TimeoutException. Please help me on this as
> we couldn’t  move further.
>
> server.properties(kafka performance configuration):
> num.network.threads=8
> socket.receive.buffer.bytes=-1
> socket.send.buffer.bytes=-1
> socket.request.max.bytes=2147483647
>
>
> Kafka Exception:
> [2019-06-29 21:17:57,649] INFO [Log
> partition=location-analytics-live-v2-visit-processing-5,
> dir=/mnt/data/kafka/kafka-logs] Incrementing log start offset to 263555323
> (kafka.log.Log)
> [2019-06-29 21:18:18,992] WARN [SocketServer brokerId=0] Unexpected error
> from /172.21.4.208; closing connection
> (org.apache.kafka.common.network.Selector)
> org.apache.kafka.common.network.InvalidReceiveException: Invalid receive
> (size = -1145372416)
> at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102)
> at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381)
> at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342)
> at
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609)
> at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
> at kafka.network.Processor.poll(SocketServer.scala:689)
> at kafka.network.Processor.run(SocketServer.scala:594)
> at java.lang.Thread.run(Thread.java:748)
> [2019-06-29 21:18:18,993] WARN [SocketServer brokerId=0] Unexpected error
> from /172.21.4.208; closing connection
> (org.apache.kafka.common.network.Selector)
> org.apache.kafka.common.network.InvalidReceiveException: Invalid receive
> (size = -1145372671)
> at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102)
> at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381)
> at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342)
> at
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609)
> at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
> at kafka.network.Processor.poll(SocketServer.scala:689)
> at kafka.network.Processor.run(SocketServer.scala:594)
> at java.lang.Thread.run(Thread.java:748)
> [2019-06-29 21:18:57,649] INFO [Log
> partition=location-analytics-live-v2-visit-processing-5,
> dir=/mnt/data/kafka/kafka-logs] Deleting segment 256880856 (kafka.log.Log)
> [2019-06-29 21:18:57,676] INFO Deleted log
> /mnt/data/kafka/kafka-logs/location-analytics-live-v2-visit-processing-5/000256880856.log.deleted.
> (kafka.log.LogSegment)
> [2019-06-29 21:18:57,676] INFO Deleted offset index
> /mnt/data/kafka/kafka-logs/location-analytics-live-v2-visit-processing-5/000256880856.index.deleted.
> (kafka.log.LogSegment)
> [2019-06-29 21:18:57,676] INFO Deleted time index
> /mnt/data/kafka/kafka-logs/location-analytics-live-v2-visit-processing-5/000256880856.timeindex.deleted.
> (kafka.log.LogSegment)
> [2019-06-29 21:19:19,034] ERROR Closing socket for 172.21.46.208:9092
> -172.21.4.208:36798-2446 because of error (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error parsing
> request header. Our best guess of the apiKey is: 1032
> [2019-06-29 21:19:19,034] ERROR Closing socket for 172.21.46.208:9092
> -172.21.4.208:36798-2446 because of error (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error parsing
> request header. Our best guess of the apiKey is: 1032
> Caused by: java.

Re: Kafka Streams - Getting exception org.apache.kafka.common.network.InvalidReceiveException exception in cloud

2019-07-02 Thread Vigneswaran Gunasekaran (vicky86)
Can anybody help me on this issue?

Thanks,
Vigneswaran

From: "Vigneswaran Gunasekaran (vicky86)" 
Date: Monday, 1 July 2019 at 12:45 PM
To: "users@kafka.apache.org" 
Subject: Re: Kafka Streams - Getting exception 
org.apache.kafka.common.network.InvalidReceiveException exception in cloud

Hi Team,

I am using kafka streams in my application and I am running in cloud with 5 
kafka instances. I am getting below exception in kafka machines and after some 
point my application stop working because of 
org.apache.kafka.common.errors.TimeoutException. Please help me on this as we 
couldn’t  move further.

server.properties(kafka performance configuration):
num.network.threads=8
socket.receive.buffer.bytes=-1
socket.send.buffer.bytes=-1
socket.request.max.bytes=2147483647


Kafka Exception:
[2019-06-29 21:17:57,649] INFO [Log 
partition=location-analytics-live-v2-visit-processing-5, 
dir=/mnt/data/kafka/kafka-logs] Incrementing log start offset to 263555323 
(kafka.log.Log)
[2019-06-29 21:18:18,992] WARN [SocketServer brokerId=0] Unexpected error from 
/172.21.4.208; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size 
= -1145372416)
at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102)
at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381)
at 
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342)
at 
org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541)
at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
at kafka.network.Processor.poll(SocketServer.scala:689)
at kafka.network.Processor.run(SocketServer.scala:594)
at java.lang.Thread.run(Thread.java:748)
[2019-06-29 21:18:18,993] WARN [SocketServer brokerId=0] Unexpected error from 
/172.21.4.208; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size 
= -1145372671)
at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102)
at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381)
at 
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342)
at 
org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541)
at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
at kafka.network.Processor.poll(SocketServer.scala:689)
at kafka.network.Processor.run(SocketServer.scala:594)
at java.lang.Thread.run(Thread.java:748)
[2019-06-29 21:18:57,649] INFO [Log 
partition=location-analytics-live-v2-visit-processing-5, 
dir=/mnt/data/kafka/kafka-logs] Deleting segment 256880856 (kafka.log.Log)
[2019-06-29 21:18:57,676] INFO Deleted log 
/mnt/data/kafka/kafka-logs/location-analytics-live-v2-visit-processing-5/000256880856.log.deleted.
 (kafka.log.LogSegment)
[2019-06-29 21:18:57,676] INFO Deleted offset index 
/mnt/data/kafka/kafka-logs/location-analytics-live-v2-visit-processing-5/000256880856.index.deleted.
 (kafka.log.LogSegment)
[2019-06-29 21:18:57,676] INFO Deleted time index 
/mnt/data/kafka/kafka-logs/location-analytics-live-v2-visit-processing-5/000256880856.timeindex.deleted.
 (kafka.log.LogSegment)
[2019-06-29 21:19:19,034] ERROR Closing socket for 
172.21.46.208:9092-172.21.4.208:36798-2446 because of error 
(kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error parsing request 
header. Our best guess of the apiKey is: 1032
[2019-06-29 21:19:19,034] ERROR Closing socket for 
172.21.46.208:9092-172.21.4.208:36798-2446 because of error 
(kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error parsing request 
header. Our best guess of the apiKey is: 1032
Caused by: java.nio.BufferUnderflowException
at java.nio.Buffer.nextGetIndex(Buffer.java:506)
at java.nio.HeapByteBuffer.getShort(HeapByteBuffer.java:310)
at 
org.apache.kafka.common.requests.RequestHeader.parse(RequestHeader.java:118)
at 
kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:703)
at 
kafka.network.Processor.$anonfun$processCompletedReceives$1$adapted(SocketServer.scala:699)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54

Re: Does anyone fixed Producer TimeoutException problem ?

2019-07-02 Thread Shyam P
Thanks a lot Senthil for quick reply.
I am using  kafka_2.11-2.1.1 .
In your case  Kafka Producer Client in One DataCenter and Kafka Broker in
other DataCenter  but in my case I installed Kafka on the same machine
where Producer is running.
i.e. currently I am in development mode , so everything now on my local for
timebeing ...i.e. Kafka broker , zk and my producer code in eclipse.

If is is a set up issue at least it should run fine in my local right.
I tried several producer configurations combinations as explained in the
SOF link.

So not sure now what is the issue and how to fix it ?

Is in your case the issue fixed ?

Regards,
Shyam

On Tue, Jul 2, 2019 at 5:12 PM SenthilKumar K 
wrote:

> Hi Shyam, We also faced `TimeoutException: Expiring 1 record(s)` issue in
> our Kafka Producer Client. As described here
> <
> https://stackoverflow.com/questions/56807188/how-to-fix-kafka-common-errors-timeoutexception-expiring-1-records-xxx-ms-has
> >
> ,
> first we tried increasing request timeout but that didn't help.  We had
> setup like Kafka Producer Client in One DataCenter and Kafka Broker in
> other DataCenter & thats why the producer failed to push records to brokers
> on time due to network issue. In your case , Could be setup issue ?
>
> --Senthil
>
> On Tue, Jul 2, 2019 at 3:57 PM Shyam P  wrote:
>
> > Hi,
> >  I am facing the below issue.
> >
> > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
> > for  229 ms has passed since batch creation plus linger
> > time
> >
> >
> > I tried many producer configuration settings. more details below :
> >
> >
> https://stackoverflow.com/questions/56807188/how-to-fix-kafka-common-errors-timeoutexception-expiring-1-records-xxx-ms-has
> >
> > But nothing working.
> >
> > Can anyone plz help me , what is wrong here and how to fix it ?
> >
> > thanks,
> > Shyam
> >
>


Re: Does anyone fixed Producer TimeoutException problem ?

2019-07-02 Thread SenthilKumar K
Hi Shyam, We also faced `TimeoutException: Expiring 1 record(s)` issue in
our Kafka Producer Client. As described here

,
first we tried increasing request timeout but that didn't help.  We had
setup like Kafka Producer Client in One DataCenter and Kafka Broker in
other DataCenter & thats why the producer failed to push records to brokers
on time due to network issue. In your case , Could be setup issue ?

--Senthil

On Tue, Jul 2, 2019 at 3:57 PM Shyam P  wrote:

> Hi,
>  I am facing the below issue.
>
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
> for  229 ms has passed since batch creation plus linger
> time
>
>
> I tried many producer configuration settings. more details below :
>
> https://stackoverflow.com/questions/56807188/how-to-fix-kafka-common-errors-timeoutexception-expiring-1-records-xxx-ms-has
>
> But nothing working.
>
> Can anyone plz help me , what is wrong here and how to fix it ?
>
> thanks,
> Shyam
>


Incjecting custom classes to Kafka / custom LoginModule or custom CallbackHandler

2019-07-02 Thread Filip Stysiak
Hello everyone,

I am currently working on implementing simple authentication to a system
that manages topics and ACLs in our Kafka. The plan is to use a simple
login/password system, but instead of storing the user/password pairs
directly in JAAS configuation file we intend to store it in a PostgreSQL
database and check for them there;

I've found that the way to do it is to either implement custom class that
implements LoginModule and  point to it in the JAAS file passed in
KAFKA_OPTS; or to implement custom class for a CallbackHandler and set
sasl.client.callback.handler.class property.

However, I cannot achieve any kind of success with either of these
approaches. The container I run Kafka in won't start, telling me it
couldn't find the class.

My custom classes reside in a jar that I:
- put in the $(kafka_home)/libs directory
- pointed directly and indirectly in PATH and (or) CLASSPATH


So my question is:
- which of the approaches is the correct one - custom LoginModule or custom
CallbackHandler?
- how do you inject your own classes into Kafka the right way?

Regards,
Filip

-- 
_Uwaga: Wiadomość wraz z załącznikami może zawierać informacje poufne 
_stanowiące tajemnicę przedsiębiorstwa Altalog Sp. z. o.o., przeznaczone 
tylko dla jej adresata.__
_Dostęp osób trzecich do tej wiadomości jest 
zabroniony. Jeśli nie jesteś adresatem niniejszej wiadomości, informujemy,_

_że jej rozpowszechnianie, kopiowanie, rozprowadzanie lub inne działanie o 
podobnym charakterze jest zabronione_
_i może być nielegalne. Prosimy o 
poinformowanie nadawcy o niewłaściwym otrzymaniu wiadomości oraz jej trwałe 
usunięcie bez otwierania załączników._
_
_
_*Note: The message with 
attachments may contain confidential information constituting the secret of 
the company Altalog Ltd., intended only for its addressee.*_
_*Access by 
third parties to this message is prohibited. If you are not the recipient 
of this message,*_
_*we would like to inform you that its dissemination, 
copying, distribution or other similar activities is prohibited*_
_*and may 
be illegal. Please inform the sender about the improper receipt of the 
message and its permanent removal without opening attachments.*_


Fwd: Does anyone fixed Producer TimeoutException problem ?

2019-07-02 Thread Shyam P
Hi,
 I am facing the below issue.

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
for  229 ms has passed since batch creation plus linger
time


I tried many producer configuration settings. more details below :
https://stackoverflow.com/questions/56807188/how-to-fix-kafka-common-errors-timeoutexception-expiring-1-records-xxx-ms-has

But nothing working.

Can anyone plz help me , what is wrong here and how to fix it ?

thanks,
Shyam


Does anyone fixed Producer TimeoutException problem ?

2019-07-02 Thread Shyam P
Hi,
 I am facing the below issue.

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
for  229 ms has passed since batch creation plus linger
time


I tried many producer configuration settings. more details below :
https://stackoverflow.com/questions/56807188/how-to-fix-kafka-common-errors-timeoutexception-expiring-1-records-xxx-ms-has

But nothing working.

Can anyone plz help me , what is wrong here and how to fix it ?

thanks,
Shyam