Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files
Sorry for reposting the previous message as the images didn’t come thru.. pasting as text. I have changed both system and user limits. To completely isolate the problem, I have tried the application in Centos 7 environment. Set the ulimit to 1million and system limits to 10million open files. Now 3 kafka nodes are running in separate servers each and streaming application is running in a dedicated VM. Now the application is not explicitly throwing “too many open files” error but automatically aborted with the message terminate called after throwing an instance of 'std::system_error' what(): Resource temporarily unavailable ./bin/start.sh: line 42: 1100 Aborted Here is the last few lines from strace output which shows the aborted message. 25721 14:58:35 open("/home/devops/data/kafka-streams/RawLog_Processor/393_7/MAIL.INCOMING_FILTER_BY_USER_MAIL_INCOMING_DOMAIN_TRAFFIC_DETAIL.DETECTED_CONTENT_FILTER_RECIPIENT_DOMAIN/MAIL.INCOMING_FILTER_BY_USER_MAIL_INCOMING_DOMAIN_TRAFFIC_DETAIL.DETECTED_CONTENT_FILTER_RECIPIENT_DOMAIN.155520/MANIFEST-07", O_RDONLY|O_CLOEXEC) = 12505 25721 14:58:35 open("/sys/devices/virtual/block/dm-2/queue/logical_block_size", O_RDONLY) = 12506 25721 14:58:35 read(12506, "512\n", 4096) = 4 25721 14:58:35 close(12506) = 0 25721 14:58:35 write(12502, "s.advise_random_on_open: 0\n2019/"..., 4096) = 4096 25721 14:58:35 write(12502, "ions.comparator: leveldb.Bytewis"..., 4096) = 4096 25721 14:58:35 read(12505, "V\371\270\370\34\0\1\1\32leveldb.BytewiseCompara"..., 32768) = 192 25721 14:58:35 read(12505, "", 28672) = 0 25721 14:58:35 close(12505) = 0 17701 14:58:35 open("/home/devops/data/kafka-streams/RawLog_Processor/393_7/MAIL.INCOMING_FILTER_BY_USER_MAIL_INCOMING_DOMAIN_TRAFFIC_DETAIL.DETECTED_CONTENT_FILTER_RECIPIENT_DOMAIN/MAIL.INCOMING_FILTER_BY_USER_MAIL_INCOMING_DOMAIN_TRAFFIC_DETAIL.DETECTED_CONTENT_FILTER_RECIPIENT_DOMAIN.155520/06.sst", O_RDONLY|O_CLOEXEC) = 12505 17702 14:58:35 +++ exited with 0 +++ 25721 14:58:35 write(2, "terminate called after throwing "..., 48) = 48 25721 14:58:35 write(2, "std::system_error", 17) = 17 25721 14:58:35 write(2, "'\n", 2) = 2 25721 14:58:35 write(2, " what(): ", 11) = 11 25721 14:58:35 write(2, "Resource temporarily unavailable", 32) = 32 25721 14:58:35 write(2, "\n", 1)= 1 17701 14:58:35 open("/sys/devices/virtual/block/dm-2/queue/logical_block_size", O_RDONLY) = 12506 25721 14:58:35 write(12502, "ction_dynamic_level_bytes: 0\n201"..., 3672) = 3672 25721 14:58:35 --- SIGABRT {si_signo=SIGABRT, si_code=SI_TKILL, si_pid=25697, si_uid=1000} --- 17701 14:58:35 read(12506, 17701 14:58:36 +++ killed by SIGABRT +++ 17700 14:58:36 +++ killed by SIGABRT +++ 17699 14:58:36 +++ killed by SIGABRT +++ As I can see from the open files they are way lower (45201) than the max limit and hence can we rule out the “open files” is the root cause? I have also noticed there is lot of EAGAIN (count 3334) and associated “Resource temporarily unavailable” messages as well 25732 14:49:23 open("/sys/fs/cgroup/memory/memory.limit_in_bytes", O_RDONLY) = 59 25732 14:49:23 read(59, "9223372036854775807\n", 4096) = 20 25732 14:49:23 close(59)= 0 25709 14:49:23 open("/sys/fs/cgroup/memory/memory.limit_in_bytes", O_RDONLY) = 59 25709 14:49:23 read(59, "9223372036854775807\n", 4096) = 20 25709 14:49:23 close(59)= 0 25721 14:49:23 write(50, "\0", 1) = 1 25709 14:49:23 open("/sys/fs/cgroup/memory/memory.limit_in_bytes", O_RDONLY) = 59 25709 14:49:23 read(59, "9223372036854775807\n", 4096) = 20 25709 14:49:23 close(59)= 0 25721 14:49:23 read(17, "PK\3\4\n\0\0\10\10\0\343SHNM\n4>Q\2\0\0\265\5\0\0A\0\0\0", 30) = 30 25721 14:49:23 read(17, "\245T]O\23A\24=\323\226\326-\253\255\250\365\v\4\265ji\221E\344\305\200M\f\201\304\244j"..., 593) = 593 25709 14:49:23 open("/sys/fs/cgroup/memory/memory.limit_in_bytes", O_RDONLY) = 59 25709 14:49:23 read(59, "9223372036854775807\n", 4096) = 20 25709 14:49:23 close(59)= 0 25732 14:49:23 open("/sys/fs/cgroup/memory/memory.limit_in_bytes", O_RDONLY) = 59 25732 14:49:23 read(59, "9223372036854775807\n", 4096) = 20 25732 14:49:23 close(59)= 0 25721 14:49:23 read(49, "\0", 16) = 1 25709 14:49:23 open("/sys/fs/cgroup/memory/memory.limit_in_bytes", O_RDONLY) = 59 25709 14:49:23 read(59, "9223372036854775807\n", 4096) = 20 25709 14:49:23 close(59)= 0 25732 14:49:23 open("/sys/fs/cgroup/memory/memory.limit_in_bytes", O_RDONLY) = 59 25732 14:49:23 read(59, "9223372036854775807\n", 4096) = 20 25732 14:49:23 close(59)= 0 25709 14:49:23 open("/sys/fs/cgroup/memory/memory.limit_in_bytes", O_RDONLY) = 59 25709 14:49:23 read(59, "9223372036854775807\n", 4096) = 20 25709 14:49:23 close(59)= 0 25721 14:49:23 read(54, "\0\1dx", 4)= 4 25721 14:49:23 read(54, "\0\0\0\v\0\0\0\0\0\0\0\1dj\0\0\0\0\1\212\0\201RawLog_Pro"...,
Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files
Also, when you say you changed the OS limit are you referring to the system limit or the user limit or both? If you increased one but not the other you may still be hitting the lower limit. On Wed, Jul 3, 2019 at 1:53 AM Patrik Kleindl wrote: > Hi > Try to set it really low like Sophie suggested. > You can verify if the settings take effect by checking the files in the > rocksdb directories, I think it should be somewhere in OPTIONS or LOG > br, Patrik > > > Am 03.07.2019 um 09:37 schrieb Thameem Ansari : > > > > Tried setting the open files to 100 and 50 but the results are same. I > checked the total open files while the streaming application was busy > running just before getting the “too many open files” message it was around > 41756 which is same as what we have got when we set to -1. > > > > VisualVM shows that there is no abnormality with the threads / memory or > heap. > > > > Thanks > > Thameem > > > >> On Jul 3, 2019, at 11:50 AM, Sophie Blee-Goldman > wrote: > >> > >> How sure are you that the open file count never goes beyond 50K? Are > those > >> numbers just from a snapshot after it crashed? It's possible rocks is > >> creating a large number of files just for a short period of time (maybe > >> while compacting) that causes the open file count to spike and go back > down. > >> > >> For things to try, you should set the rocks config max.open.files to > >> something less than infinity...if you're OS limit is 1 million and you > have > >> (rounding up) ~5k rocks instances, set this to 1 million / 5k = 200. If > you > >> set a lower limit and still hit this error, we can go from there > >> > >> On Tue, Jul 2, 2019 at 11:10 PM emailtokir...@gmail.com < > >> emailtokir...@gmail.com> wrote: > >> > >>> > >>> > On 2019/07/03 05:46:45, Sophie Blee-Goldman > wrote: > It sounds like rocksdb *is* honoring your configs -- the > max.open.files > config is an internal restriction that tells rocksdb how many open > files > >>> it > is allowed to have, so if that's set to -1 (infinite) it won't ever > try > >>> to > limit its open files and you may hit the OS limit. > > Think of it this way: if you have 100 rocksdb instances and a OS > limit of > 500, you should set max.open.files to 5 to avoid hitting this limit > (assuming there are no other open files on the system, in reality > you'd > want some extra room there) > > On Tue, Jul 2, 2019 at 7:53 PM emailtokir...@gmail.com < > emailtokir...@gmail.com> wrote: > > > > > > >> On 2019/06/28 23:29:16, John Roesler wrote: > >> Hey all, > >> > >> If you want to figure it out theoretically, if you print out the > >> topology description, you'll have some number of state stores listed > >> in there. The number of Rocks instances should just be > >> (#global_state_stores + > >> sum(#partitions_of_topic_per_local_state_store)) . The number of > >> stream threads isn't relevant here. > >> > >> You can also figure it out empirically: the first level of > >> subdirectories in the state dir are Tasks, and then within that, the > >> next level is Stores. You should see the store directory names match > >> up with the stores listed in the topology description. The number of > >> Store directories is exactly the number of RocksDB instances you > >>> have. > >> > >> There are also metrics corresponding to each of the state stores, so > >> you can compute it from what you find in the metrics. > >> > >> Hope that helps, > >> -john > >> > >> On Thu, Jun 27, 2019 at 6:46 AM Patrik Kleindl > > wrote: > >>> > >>> Hi Kiran > >>> Without much research my guess would be "num_stream_threads * > >>> (#global_state_stores + > > sum(#partitions_of_topic_per_local_state_store))" > >>> So 10 stores (regardless if explicitly defined or implicitely > >>> because > > of > >>> some stateful operation) with 10 partitions each should result in > >>> 100 > >>> Rocksdb instances if you are running at the default of > > num_stream_threads=1. > >>> > >>> As I wrote before, start with 100. > >>> If the error persists, half the number, if not, double it ;-) > >>> Repeat as > >>> needed. > >>> > >>> If you reach the single-digit-range and the error still shows up, > >>> start > >>> searching for any iterators over a store you might not have closed. > >>> > >>> br, Patrik > >>> > >>> On Thu, 27 Jun 2019 at 13:11, emailtokir...@gmail.com < > >>> emailtokir...@gmail.com> wrote: > >>> > > > On 2019/06/27 09:02:39, Patrik Kleindl > >>> wrote: > > Hello Kiran > > > > First, the value for maxOpenFiles is per RocksDB instance, and > >>> the > > number > > of those can get high if you have a lot of topic partitions > >>> etc. > > Check the directory (state dir) to see how m
Re: PR review
https://github.com/apache/kafka/pull/6771 Bouncing both users and dev to get some activity going. We are waiting for a while to get this KIP pr merged. Could someone please review? Thanks, On Sun, 30 Jun 2019 at 08:59, M. Manna wrote: > https://github.com/apache/kafka/pull/6771 > > Hello, > > Could the above PR can be reviewed? This has been waiting for a long time. > > Just to mention, the package name should have "internal". Round-robin > partitioning should have been supported without/without a key from the > beginning. It provides user a guaranteed round-robin partitioning without > having to regard for key values (e.g. null/not null). From our business > side, this is a Kafka internal logic. Hence, the placement inside > "internal" package. > > Thanks, >
Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files
Hi Try to set it really low like Sophie suggested. You can verify if the settings take effect by checking the files in the rocksdb directories, I think it should be somewhere in OPTIONS or LOG br, Patrik > Am 03.07.2019 um 09:37 schrieb Thameem Ansari : > > Tried setting the open files to 100 and 50 but the results are same. I > checked the total open files while the streaming application was busy running > just before getting the “too many open files” message it was around 41756 > which is same as what we have got when we set to -1. > > VisualVM shows that there is no abnormality with the threads / memory or > heap. > > Thanks > Thameem > >> On Jul 3, 2019, at 11:50 AM, Sophie Blee-Goldman wrote: >> >> How sure are you that the open file count never goes beyond 50K? Are those >> numbers just from a snapshot after it crashed? It's possible rocks is >> creating a large number of files just for a short period of time (maybe >> while compacting) that causes the open file count to spike and go back down. >> >> For things to try, you should set the rocks config max.open.files to >> something less than infinity...if you're OS limit is 1 million and you have >> (rounding up) ~5k rocks instances, set this to 1 million / 5k = 200. If you >> set a lower limit and still hit this error, we can go from there >> >> On Tue, Jul 2, 2019 at 11:10 PM emailtokir...@gmail.com < >> emailtokir...@gmail.com> wrote: >> >>> >>> On 2019/07/03 05:46:45, Sophie Blee-Goldman wrote: It sounds like rocksdb *is* honoring your configs -- the max.open.files config is an internal restriction that tells rocksdb how many open files >>> it is allowed to have, so if that's set to -1 (infinite) it won't ever try >>> to limit its open files and you may hit the OS limit. Think of it this way: if you have 100 rocksdb instances and a OS limit of 500, you should set max.open.files to 5 to avoid hitting this limit (assuming there are no other open files on the system, in reality you'd want some extra room there) On Tue, Jul 2, 2019 at 7:53 PM emailtokir...@gmail.com < emailtokir...@gmail.com> wrote: > > >> On 2019/06/28 23:29:16, John Roesler wrote: >> Hey all, >> >> If you want to figure it out theoretically, if you print out the >> topology description, you'll have some number of state stores listed >> in there. The number of Rocks instances should just be >> (#global_state_stores + >> sum(#partitions_of_topic_per_local_state_store)) . The number of >> stream threads isn't relevant here. >> >> You can also figure it out empirically: the first level of >> subdirectories in the state dir are Tasks, and then within that, the >> next level is Stores. You should see the store directory names match >> up with the stores listed in the topology description. The number of >> Store directories is exactly the number of RocksDB instances you >>> have. >> >> There are also metrics corresponding to each of the state stores, so >> you can compute it from what you find in the metrics. >> >> Hope that helps, >> -john >> >> On Thu, Jun 27, 2019 at 6:46 AM Patrik Kleindl > wrote: >>> >>> Hi Kiran >>> Without much research my guess would be "num_stream_threads * >>> (#global_state_stores + > sum(#partitions_of_topic_per_local_state_store))" >>> So 10 stores (regardless if explicitly defined or implicitely >>> because > of >>> some stateful operation) with 10 partitions each should result in >>> 100 >>> Rocksdb instances if you are running at the default of > num_stream_threads=1. >>> >>> As I wrote before, start with 100. >>> If the error persists, half the number, if not, double it ;-) >>> Repeat as >>> needed. >>> >>> If you reach the single-digit-range and the error still shows up, >>> start >>> searching for any iterators over a store you might not have closed. >>> >>> br, Patrik >>> >>> On Thu, 27 Jun 2019 at 13:11, emailtokir...@gmail.com < >>> emailtokir...@gmail.com> wrote: >>> On 2019/06/27 09:02:39, Patrik Kleindl >>> wrote: > Hello Kiran > > First, the value for maxOpenFiles is per RocksDB instance, and >>> the > number > of those can get high if you have a lot of topic partitions >>> etc. > Check the directory (state dir) to see how many there are. > Start with a low value (100) and see if that has some effect. > > Second, because I just found out, you should use > BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) > options.tableFormatConfig(); > tableConfig.setBlockCacheSize(100*1024*1024L); > tableConfig.setBlockSize(8*1024L); > instead of creating a new object to prevent accidently messing >>> u
Re: Does anyone fixed Producer TimeoutException problem ?
Yes I tried with console-producer , it is working fine. But its a plain message i.e. String .. but i have to send a 20 field json as message.
Re: Does anyone fixed Producer TimeoutException problem ?
It depends on the use case. Look at lingerMs (10ms) So the latency is 10ms. ` Received invalid metadata error in produce request on partition inbound_topic-1 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now` - Check your broker logs. My question remains same , Did you try to produce records using console producer in your setup ? On Wed, Jul 3, 2019 at 1:55 PM Shyam P wrote: > thank you. > Senthil , > Why batch.size=65536 & retries=100, its big number right ? do we need > this much for streaming applications ? > > Regards, > Shyam > > On Wed, Jul 3, 2019 at 1:30 PM SenthilKumar K > wrote: > >> `*Partition = -1` - *This explains why are you getting timeout error. >> >> Why dont you use Default Partitioner ?: >> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java >> >> Try only with below producer properties : >> bootstrap.servers=<<>>> >> acks =1 >> retries=100 >> batch.size=65536 >> linger.ms=10 >> key.serializer=<<>> >> value.serializer=<<>>> >> compression.type=<> >> buffer.memory=104857600 >> >> --Senthil >> >> >> On Wed, Jul 3, 2019 at 1:18 PM Shyam P wrote: >> >>> Hi SenthilKumar, >>> thanks a lot . >>> >>> Yeah I set up local set up and print the log with partition info which >>> shows as below >>> >>> 2019-07-03 02:48:28.822 INFO 7092 --- [lt-dispatcher-2] >>> c.s.c.p.p.CompanyInfoPartitioner : Topic : inbound_topic Key = 597736248- >>> Entropy Cayman Solar Ltd.-null-null-null *Partition = -1* 2019-07-03 >>> 02:48:28.931 ERROR 7092 --- [ad | producer-1] >>> o.s.k.support.LoggingProducerListener : Exception thrown when sending a >>> message with key='597736248- Entropy Cayman Solar Ltd.-null-null-null' and >>> payload='com.spgmi.ca.prescore.model.Company@8b12343' to topic >>> inbound_topic : >>> >>> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) >>> for inbound_topic --1: 104 ms has passed since batch creation plus linger >>> time >>> >>> My topics inbound_topic has two partitions as you see below >>> C:\Software\kafka\kafka_2.11-2.1.1\bin\windows>kafka-topics.bat --describe >>> --zookeeper localhost:2181 --topic inbound_topic Topic:inbound_topic >>> PartitionCount:2 ReplicationFactor:1 Configs: Topic: inbound_topic >>> Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: inbound_topic Partition: 1 >>> Leader: 0 Replicas: 0 Isr: 0 >>> >>> *But my producer seems to trying to send to Partition = -1.* >>> >>> My partition logic is as below >>> >>> int p = (((String)key).hashCode() * Integer.MAX_VALUE) % numPartitions; >>> logger.info("Topic : "+ topic + "\t Key = " + (String)key + " >>> Partition = " + p ); >>> >>> On key i am doing hashCode(). What need to be corrected here to avoid >>> this negative number partition number ? i.e. Partition = -1 >>> >>> What should be my partition key logic like ? >>> >>> any help highly appreciated. >>> Regards, >>> Shyam >>> >>> On Tue, Jul 2, 2019 at 8:48 PM SenthilKumar K >>> wrote: >>> Does it happen to all partitions or only few partitions ? Can you make sure your local setup working fine ? Were you able to produce using console-producer ? Example : EVERE: Expiring 7 record(s) for topic-9{partition:9}: 30022 ms has passed since last append Expiring 9 record(s) for topic-2{partition:2}: 30015 ms has passed since batch creation plus linger time --Senthil On Tue, Jul 2, 2019 at 5:34 PM Shyam P wrote: > Thanks a lot Senthil for quick reply. > I am using kafka_2.11-2.1.1 . > In your case Kafka Producer Client in One DataCenter and Kafka Broker > in other DataCenter but in my case I installed Kafka on the same machine > where Producer is running. > i.e. currently I am in development mode , so everything now on my > local for timebeing ...i.e. Kafka broker , zk and my producer code in > eclipse. > > If is is a set up issue at least it should run fine in my local right. > I tried several producer configurations combinations as explained in > the SOF link. > > So not sure now what is the issue and how to fix it ? > > Is in your case the issue fixed ? > > Regards, > Shyam > > On Tue, Jul 2, 2019 at 5:12 PM SenthilKumar K > wrote: > >> Hi Shyam, We also faced `TimeoutException: Expiring 1 record(s)` >> issue in >> our Kafka Producer Client. As described here >> < >> https://stackoverflow.com/questions/56807188/how-to-fix-kafka-common-errors-timeoutexception-expiring-1-records-xxx-ms-has >> > >> , >> first we tried increasing request timeout but that didn't help. We >> had >> setup like Kafka Producer Client in One DataCenter and Kafka Broker in >> other DataCenter & thats why the producer failed to push records to >> brokers >> on time due to ne
Re: Does anyone fixed Producer TimeoutException problem ?
thank you. Senthil , Why batch.size=65536 & retries=100, its big number right ? do we need this much for streaming applications ? Regards, Shyam On Wed, Jul 3, 2019 at 1:30 PM SenthilKumar K wrote: > `*Partition = -1` - *This explains why are you getting timeout error. > > Why dont you use Default Partitioner ?: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java > > Try only with below producer properties : > bootstrap.servers=<<>>> > acks =1 > retries=100 > batch.size=65536 > linger.ms=10 > key.serializer=<<>> > value.serializer=<<>>> > compression.type=<> > buffer.memory=104857600 > > --Senthil > > > On Wed, Jul 3, 2019 at 1:18 PM Shyam P wrote: > >> Hi SenthilKumar, >> thanks a lot . >> >> Yeah I set up local set up and print the log with partition info which >> shows as below >> >> 2019-07-03 02:48:28.822 INFO 7092 --- [lt-dispatcher-2] >> c.s.c.p.p.CompanyInfoPartitioner : Topic : inbound_topic Key = 597736248- >> Entropy Cayman Solar Ltd.-null-null-null *Partition = -1* 2019-07-03 >> 02:48:28.931 ERROR 7092 --- [ad | producer-1] >> o.s.k.support.LoggingProducerListener : Exception thrown when sending a >> message with key='597736248- Entropy Cayman Solar Ltd.-null-null-null' and >> payload='com.spgmi.ca.prescore.model.Company@8b12343' to topic >> inbound_topic : >> >> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for >> inbound_topic --1: 104 ms has passed since batch creation plus linger time >> >> My topics inbound_topic has two partitions as you see below >> C:\Software\kafka\kafka_2.11-2.1.1\bin\windows>kafka-topics.bat --describe >> --zookeeper localhost:2181 --topic inbound_topic Topic:inbound_topic >> PartitionCount:2 ReplicationFactor:1 Configs: Topic: inbound_topic >> Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: inbound_topic Partition: 1 >> Leader: 0 Replicas: 0 Isr: 0 >> >> *But my producer seems to trying to send to Partition = -1.* >> >> My partition logic is as below >> >> int p = (((String)key).hashCode() * Integer.MAX_VALUE) % numPartitions; >> logger.info("Topic : "+ topic + "\t Key = " + (String)key + " >> Partition = " + p ); >> >> On key i am doing hashCode(). What need to be corrected here to avoid >> this negative number partition number ? i.e. Partition = -1 >> >> What should be my partition key logic like ? >> >> any help highly appreciated. >> Regards, >> Shyam >> >> On Tue, Jul 2, 2019 at 8:48 PM SenthilKumar K >> wrote: >> >>> Does it happen to all partitions or only few partitions ? Can you make >>> sure your local setup working fine ? Were you able to produce using >>> console-producer ? >>> >>> Example : >>> EVERE: Expiring 7 record(s) for topic-9{partition:9}: 30022 ms has >>> passed since last append >>> Expiring 9 record(s) for topic-2{partition:2}: 30015 ms has passed >>> since batch creation plus linger time >>> >>> --Senthil >>> >>> On Tue, Jul 2, 2019 at 5:34 PM Shyam P wrote: >>> Thanks a lot Senthil for quick reply. I am using kafka_2.11-2.1.1 . In your case Kafka Producer Client in One DataCenter and Kafka Broker in other DataCenter but in my case I installed Kafka on the same machine where Producer is running. i.e. currently I am in development mode , so everything now on my local for timebeing ...i.e. Kafka broker , zk and my producer code in eclipse. If is is a set up issue at least it should run fine in my local right. I tried several producer configurations combinations as explained in the SOF link. So not sure now what is the issue and how to fix it ? Is in your case the issue fixed ? Regards, Shyam On Tue, Jul 2, 2019 at 5:12 PM SenthilKumar K wrote: > Hi Shyam, We also faced `TimeoutException: Expiring 1 record(s)` issue > in > our Kafka Producer Client. As described here > < > https://stackoverflow.com/questions/56807188/how-to-fix-kafka-common-errors-timeoutexception-expiring-1-records-xxx-ms-has > > > , > first we tried increasing request timeout but that didn't help. We had > setup like Kafka Producer Client in One DataCenter and Kafka Broker in > other DataCenter & thats why the producer failed to push records to > brokers > on time due to network issue. In your case , Could be setup issue ? > > --Senthil > > On Tue, Jul 2, 2019 at 3:57 PM Shyam P > wrote: > > > Hi, > > I am facing the below issue. > > > > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) > > for 229 ms has passed since batch creation plus linger > > time > > > > > > I tried many producer configuration settings. more details below : > > > > > https://stackoverflow.com/questions/56807188/how-to-fix-kafka-common-errors-timeoutexception-expiring-1-records-xxx-ms-has > > > > But nothing working. >>
Re: Does anyone fixed Producer TimeoutException problem ?
Hi, I fixed the earlier issue using returning a valid partition number. Now I am facing a different error 2019-07-03 04:16:44.334 WARN 1524 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-1] Received invalid metadata error in produce request on partition inbound_topic-1 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now 2019-07-03 04:16:44.556 ERROR 1524 --- [ad | producer-1] o.s.k.support.LoggingProducerListener: Exception thrown when sending a message with key='378513965-4Energia S.r.l.-Milan-MI-20124' and payload='com.spgmi.ca.prescore.model.Company@6bd123de' to topic inbound_topic: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for inbound_topic-1: 122 ms has passed since last attempt plus backoff time I am using below settings. acks: 1 retries: 1 batchSize: 10 lingerMs: 5 bufferMemory: 33554432 requestTimeoutMs: 90 What should be corrected here to fix above error. On Wed, Jul 3, 2019 at 1:18 PM Shyam P wrote: > Hi SenthilKumar, > thanks a lot . > > Yeah I set up local set up and print the log with partition info which > shows as below > > 2019-07-03 02:48:28.822 INFO 7092 --- [lt-dispatcher-2] > c.s.c.p.p.CompanyInfoPartitioner : Topic : inbound_topic Key = 597736248- > Entropy Cayman Solar Ltd.-null-null-null *Partition = -1* 2019-07-03 > 02:48:28.931 ERROR 7092 --- [ad | producer-1] > o.s.k.support.LoggingProducerListener : Exception thrown when sending a > message with key='597736248- Entropy Cayman Solar Ltd.-null-null-null' and > payload='com.spgmi.ca.prescore.model.Company@8b12343' to topic > inbound_topic : > > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > inbound_topic --1: 104 ms has passed since batch creation plus linger time > > My topics inbound_topic has two partitions as you see below > C:\Software\kafka\kafka_2.11-2.1.1\bin\windows>kafka-topics.bat --describe > --zookeeper localhost:2181 --topic inbound_topic Topic:inbound_topic > PartitionCount:2 ReplicationFactor:1 Configs: Topic: inbound_topic > Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: inbound_topic Partition: 1 > Leader: 0 Replicas: 0 Isr: 0 > > *But my producer seems to trying to send to Partition = -1.* > > My partition logic is as below > > int p = (((String)key).hashCode() * Integer.MAX_VALUE) % numPartitions; > logger.info("Topic : "+ topic + "\t Key = " + (String)key + " > Partition = " + p ); > > On key i am doing hashCode(). What need to be corrected here to avoid this > negative number partition number ? i.e. Partition = -1 > > What should be my partition key logic like ? > > any help highly appreciated. > Regards, > Shyam > > On Tue, Jul 2, 2019 at 8:48 PM SenthilKumar K > wrote: > >> Does it happen to all partitions or only few partitions ? Can you make >> sure your local setup working fine ? Were you able to produce using >> console-producer ? >> >> Example : >> EVERE: Expiring 7 record(s) for topic-9{partition:9}: 30022 ms has passed >> since last append >> Expiring 9 record(s) for topic-2{partition:2}: 30015 ms has passed since >> batch creation plus linger time >> >> --Senthil >> >> On Tue, Jul 2, 2019 at 5:34 PM Shyam P wrote: >> >>> Thanks a lot Senthil for quick reply. >>> I am using kafka_2.11-2.1.1 . >>> In your case Kafka Producer Client in One DataCenter and Kafka Broker >>> in other DataCenter but in my case I installed Kafka on the same machine >>> where Producer is running. >>> i.e. currently I am in development mode , so everything now on my local >>> for timebeing ...i.e. Kafka broker , zk and my producer code in eclipse. >>> >>> If is is a set up issue at least it should run fine in my local right. >>> I tried several producer configurations combinations as explained in the >>> SOF link. >>> >>> So not sure now what is the issue and how to fix it ? >>> >>> Is in your case the issue fixed ? >>> >>> Regards, >>> Shyam >>> >>> On Tue, Jul 2, 2019 at 5:12 PM SenthilKumar K >>> wrote: >>> Hi Shyam, We also faced `TimeoutException: Expiring 1 record(s)` issue in our Kafka Producer Client. As described here < https://stackoverflow.com/questions/56807188/how-to-fix-kafka-common-errors-timeoutexception-expiring-1-records-xxx-ms-has > , first we tried increasing request timeout but that didn't help. We had setup like Kafka Producer Client in One DataCenter and Kafka Broker in other DataCenter & thats why the producer failed to push records to brokers on time due to network issue. In your case , Could be setup issue ? --Senthil On Tue, Jul 2, 2019 at 3:57 PM Shyam P wrote: > Hi, > I am facing the below issue. > > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) > for 229 ms has passed since batch creation plus linger > time >
Re: Does anyone fixed Producer TimeoutException problem ?
`*Partition = -1` - *This explains why are you getting timeout error. Why dont you use Default Partitioner ?: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java Try only with below producer properties : bootstrap.servers=<<>>> acks =1 retries=100 batch.size=65536 linger.ms=10 key.serializer=<<>> value.serializer=<<>>> compression.type=<> buffer.memory=104857600 --Senthil On Wed, Jul 3, 2019 at 1:18 PM Shyam P wrote: > Hi SenthilKumar, > thanks a lot . > > Yeah I set up local set up and print the log with partition info which > shows as below > > 2019-07-03 02:48:28.822 INFO 7092 --- [lt-dispatcher-2] > c.s.c.p.p.CompanyInfoPartitioner : Topic : inbound_topic Key = 597736248- > Entropy Cayman Solar Ltd.-null-null-null *Partition = -1* 2019-07-03 > 02:48:28.931 ERROR 7092 --- [ad | producer-1] > o.s.k.support.LoggingProducerListener : Exception thrown when sending a > message with key='597736248- Entropy Cayman Solar Ltd.-null-null-null' and > payload='com.spgmi.ca.prescore.model.Company@8b12343' to topic > inbound_topic : > > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > inbound_topic --1: 104 ms has passed since batch creation plus linger time > > My topics inbound_topic has two partitions as you see below > C:\Software\kafka\kafka_2.11-2.1.1\bin\windows>kafka-topics.bat --describe > --zookeeper localhost:2181 --topic inbound_topic Topic:inbound_topic > PartitionCount:2 ReplicationFactor:1 Configs: Topic: inbound_topic > Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: inbound_topic Partition: 1 > Leader: 0 Replicas: 0 Isr: 0 > > *But my producer seems to trying to send to Partition = -1.* > > My partition logic is as below > > int p = (((String)key).hashCode() * Integer.MAX_VALUE) % numPartitions; > logger.info("Topic : "+ topic + "\t Key = " + (String)key + " > Partition = " + p ); > > On key i am doing hashCode(). What need to be corrected here to avoid this > negative number partition number ? i.e. Partition = -1 > > What should be my partition key logic like ? > > any help highly appreciated. > Regards, > Shyam > > On Tue, Jul 2, 2019 at 8:48 PM SenthilKumar K > wrote: > >> Does it happen to all partitions or only few partitions ? Can you make >> sure your local setup working fine ? Were you able to produce using >> console-producer ? >> >> Example : >> EVERE: Expiring 7 record(s) for topic-9{partition:9}: 30022 ms has passed >> since last append >> Expiring 9 record(s) for topic-2{partition:2}: 30015 ms has passed since >> batch creation plus linger time >> >> --Senthil >> >> On Tue, Jul 2, 2019 at 5:34 PM Shyam P wrote: >> >>> Thanks a lot Senthil for quick reply. >>> I am using kafka_2.11-2.1.1 . >>> In your case Kafka Producer Client in One DataCenter and Kafka Broker >>> in other DataCenter but in my case I installed Kafka on the same machine >>> where Producer is running. >>> i.e. currently I am in development mode , so everything now on my local >>> for timebeing ...i.e. Kafka broker , zk and my producer code in eclipse. >>> >>> If is is a set up issue at least it should run fine in my local right. >>> I tried several producer configurations combinations as explained in the >>> SOF link. >>> >>> So not sure now what is the issue and how to fix it ? >>> >>> Is in your case the issue fixed ? >>> >>> Regards, >>> Shyam >>> >>> On Tue, Jul 2, 2019 at 5:12 PM SenthilKumar K >>> wrote: >>> Hi Shyam, We also faced `TimeoutException: Expiring 1 record(s)` issue in our Kafka Producer Client. As described here < https://stackoverflow.com/questions/56807188/how-to-fix-kafka-common-errors-timeoutexception-expiring-1-records-xxx-ms-has > , first we tried increasing request timeout but that didn't help. We had setup like Kafka Producer Client in One DataCenter and Kafka Broker in other DataCenter & thats why the producer failed to push records to brokers on time due to network issue. In your case , Could be setup issue ? --Senthil On Tue, Jul 2, 2019 at 3:57 PM Shyam P wrote: > Hi, > I am facing the below issue. > > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) > for 229 ms has passed since batch creation plus linger > time > > > I tried many producer configuration settings. more details below : > > https://stackoverflow.com/questions/56807188/how-to-fix-kafka-common-errors-timeoutexception-expiring-1-records-xxx-ms-has > > But nothing working. > > Can anyone plz help me , what is wrong here and how to fix it ? > > thanks, > Shyam > >>>
Re: Does anyone fixed Producer TimeoutException problem ?
Hi SenthilKumar, thanks a lot . Yeah I set up local set up and print the log with partition info which shows as below 2019-07-03 02:48:28.822 INFO 7092 --- [lt-dispatcher-2] c.s.c.p.p.CompanyInfoPartitioner : Topic : inbound_topic Key = 597736248- Entropy Cayman Solar Ltd.-null-null-null *Partition = -1* 2019-07-03 02:48:28.931 ERROR 7092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='597736248- Entropy Cayman Solar Ltd.-null-null-null' and payload='com.spgmi.ca.prescore.model.Company@8b12343' to topic inbound_topic : org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for inbound_topic --1: 104 ms has passed since batch creation plus linger time My topics inbound_topic has two partitions as you see below C:\Software\kafka\kafka_2.11-2.1.1\bin\windows>kafka-topics.bat --describe --zookeeper localhost:2181 --topic inbound_topic Topic:inbound_topic PartitionCount:2 ReplicationFactor:1 Configs: Topic: inbound_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: inbound_topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0 *But my producer seems to trying to send to Partition = -1.* My partition logic is as below int p = (((String)key).hashCode() * Integer.MAX_VALUE) % numPartitions; logger.info("Topic : "+ topic + "\t Key = " + (String)key + " Partition = " + p ); On key i am doing hashCode(). What need to be corrected here to avoid this negative number partition number ? i.e. Partition = -1 What should be my partition key logic like ? any help highly appreciated. Regards, Shyam On Tue, Jul 2, 2019 at 8:48 PM SenthilKumar K wrote: > Does it happen to all partitions or only few partitions ? Can you make > sure your local setup working fine ? Were you able to produce using > console-producer ? > > Example : > EVERE: Expiring 7 record(s) for topic-9{partition:9}: 30022 ms has passed > since last append > Expiring 9 record(s) for topic-2{partition:2}: 30015 ms has passed since > batch creation plus linger time > > --Senthil > > On Tue, Jul 2, 2019 at 5:34 PM Shyam P wrote: > >> Thanks a lot Senthil for quick reply. >> I am using kafka_2.11-2.1.1 . >> In your case Kafka Producer Client in One DataCenter and Kafka Broker in >> other DataCenter but in my case I installed Kafka on the same machine >> where Producer is running. >> i.e. currently I am in development mode , so everything now on my local >> for timebeing ...i.e. Kafka broker , zk and my producer code in eclipse. >> >> If is is a set up issue at least it should run fine in my local right. >> I tried several producer configurations combinations as explained in the >> SOF link. >> >> So not sure now what is the issue and how to fix it ? >> >> Is in your case the issue fixed ? >> >> Regards, >> Shyam >> >> On Tue, Jul 2, 2019 at 5:12 PM SenthilKumar K >> wrote: >> >>> Hi Shyam, We also faced `TimeoutException: Expiring 1 record(s)` issue in >>> our Kafka Producer Client. As described here >>> < >>> https://stackoverflow.com/questions/56807188/how-to-fix-kafka-common-errors-timeoutexception-expiring-1-records-xxx-ms-has >>> > >>> , >>> first we tried increasing request timeout but that didn't help. We had >>> setup like Kafka Producer Client in One DataCenter and Kafka Broker in >>> other DataCenter & thats why the producer failed to push records to >>> brokers >>> on time due to network issue. In your case , Could be setup issue ? >>> >>> --Senthil >>> >>> On Tue, Jul 2, 2019 at 3:57 PM Shyam P wrote: >>> >>> > Hi, >>> > I am facing the below issue. >>> > >>> > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) >>> > for 229 ms has passed since batch creation plus linger >>> > time >>> > >>> > >>> > I tried many producer configuration settings. more details below : >>> > >>> > >>> https://stackoverflow.com/questions/56807188/how-to-fix-kafka-common-errors-timeoutexception-expiring-1-records-xxx-ms-has >>> > >>> > But nothing working. >>> > >>> > Can anyone plz help me , what is wrong here and how to fix it ? >>> > >>> > thanks, >>> > Shyam >>> > >>> >>
Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files
Tried setting the open files to 100 and 50 but the results are same. I checked the total open files while the streaming application was busy running just before getting the “too many open files” message it was around 41756 which is same as what we have got when we set to -1. VisualVM shows that there is no abnormality with the threads / memory or heap. Thanks Thameem > On Jul 3, 2019, at 11:50 AM, Sophie Blee-Goldman wrote: > > How sure are you that the open file count never goes beyond 50K? Are those > numbers just from a snapshot after it crashed? It's possible rocks is > creating a large number of files just for a short period of time (maybe > while compacting) that causes the open file count to spike and go back down. > > For things to try, you should set the rocks config max.open.files to > something less than infinity...if you're OS limit is 1 million and you have > (rounding up) ~5k rocks instances, set this to 1 million / 5k = 200. If you > set a lower limit and still hit this error, we can go from there > > On Tue, Jul 2, 2019 at 11:10 PM emailtokir...@gmail.com < > emailtokir...@gmail.com> wrote: > >> >> >> On 2019/07/03 05:46:45, Sophie Blee-Goldman wrote: >>> It sounds like rocksdb *is* honoring your configs -- the max.open.files >>> config is an internal restriction that tells rocksdb how many open files >> it >>> is allowed to have, so if that's set to -1 (infinite) it won't ever try >> to >>> limit its open files and you may hit the OS limit. >>> >>> Think of it this way: if you have 100 rocksdb instances and a OS limit of >>> 500, you should set max.open.files to 5 to avoid hitting this limit >>> (assuming there are no other open files on the system, in reality you'd >>> want some extra room there) >>> >>> On Tue, Jul 2, 2019 at 7:53 PM emailtokir...@gmail.com < >>> emailtokir...@gmail.com> wrote: >>> On 2019/06/28 23:29:16, John Roesler wrote: > Hey all, > > If you want to figure it out theoretically, if you print out the > topology description, you'll have some number of state stores listed > in there. The number of Rocks instances should just be > (#global_state_stores + > sum(#partitions_of_topic_per_local_state_store)) . The number of > stream threads isn't relevant here. > > You can also figure it out empirically: the first level of > subdirectories in the state dir are Tasks, and then within that, the > next level is Stores. You should see the store directory names match > up with the stores listed in the topology description. The number of > Store directories is exactly the number of RocksDB instances you >> have. > > There are also metrics corresponding to each of the state stores, so > you can compute it from what you find in the metrics. > > Hope that helps, > -john > > On Thu, Jun 27, 2019 at 6:46 AM Patrik Kleindl wrote: >> >> Hi Kiran >> Without much research my guess would be "num_stream_threads * >> (#global_state_stores + sum(#partitions_of_topic_per_local_state_store))" >> So 10 stores (regardless if explicitly defined or implicitely >> because of >> some stateful operation) with 10 partitions each should result in >> 100 >> Rocksdb instances if you are running at the default of num_stream_threads=1. >> >> As I wrote before, start with 100. >> If the error persists, half the number, if not, double it ;-) >> Repeat as >> needed. >> >> If you reach the single-digit-range and the error still shows up, >> start >> searching for any iterators over a store you might not have closed. >> >> br, Patrik >> >> On Thu, 27 Jun 2019 at 13:11, emailtokir...@gmail.com < >> emailtokir...@gmail.com> wrote: >> >>> >>> >>> On 2019/06/27 09:02:39, Patrik Kleindl >> wrote: Hello Kiran First, the value for maxOpenFiles is per RocksDB instance, and >> the number of those can get high if you have a lot of topic partitions >> etc. Check the directory (state dir) to see how many there are. Start with a low value (100) and see if that has some effect. Second, because I just found out, you should use BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig(); tableConfig.setBlockCacheSize(100*1024*1024L); tableConfig.setBlockSize(8*1024L); instead of creating a new object to prevent accidently messing >> up references. Hope that helps best regards Patrik On Thu, 27 Jun 2019 at 10:46, emailtokir...@gmail.com < emailtokir...@gmail.com> wrote: > > > On 2019/06/26 21:58:02, Patrik Kleindl wrote: >> Hi Kiran >> You can use the RocksDBConfigSetter and pass >