The OOME issue may be caused by org.apache.kafka.clients.producer.internals.ErrorLoggingCallback holding unnecessary byte[] value. Can you apply the patch in below JIRA and try again?
https://issues.apache.org/jira/browse/KAFKA-2281 On Wed, 15 Jul 2015 at 06:42 francesco vigotti <vigotti.france...@gmail.com> wrote: > I've not explained why only 10 partitions, anyway this is due to the fact > that this does not speedup producer and also having this memory-monitoring > problem and because I have no problems on the consumers side at the moment > (10 should be enough even if I've not fully tested it yet ) and because the > solution would be this partitions-separated-queues 10 seemed a fair number > for me... > also I'm considering the setup of a kind of producer-proxy instances, > something that can be pushed with a batch of messages from a producer that > I see that have communication problems to a leader and send this batch for > him, this could help to mitigate the producer latency during such > networking problems.. there isn't something like this yet right? an option > in the producer that try to use a non-leader broker as broxy for the > partition-leader broker would be optimal :) maybe with > producer-to-singlebroker latency stats :) > > Thanks again and sorry for the verbosity :) > > On Tue, Jul 14, 2015 at 10:14 PM, francesco vigotti < > vigotti.france...@gmail.com> wrote: > > > Hi, > > I'm playing with kafka new producer to see if it could fit my use case, > > kafka version 8.2.1 > > I'll probably end up having a kafka cluster of 5 nodes on multiple > > datacenter > > with one topic, with a replication factor of 2, and at least 10 > partitions > > required for consumer performance , ( I'll explain why only 10 later.. ) > > my producers are 10 or more also distributed on multiple datacenters > > around the globe each producing about 2k messages/sec > > message size is about 500 bytes*message > > > > during my tests I have seen that when there is a network issue between > one > > of the producer and one of the partition leaders, the producer start to > > accumulate data much more than the configured buffer.memory (configured > at > > about 40mb, which crash the jvm because Garbage collector cannot make > space > > to the application to work ( jvm memory is about 2 gb, working at about > 1gb > > usage always except when this problem occurr) > > what i see is that i have about 500mb used by char[], > > > > during this events, yes, the buffer usage increase from about 5 mb to > > about 30mb, and batches sends reach their maximum size to help to speedup > > the transmission but seems to be not enought for memory usage / > performance > > / resilience requirements, > > > > > > > > kafka.producer.acks=1 > > kafka.producer.buffer.memory=40000000 > > kafka.producer.compression.type=gzip > > kafka.producer.retries=1 > > kafka.producer.batch.size=10000 > > kafka.producer.max.request.size=10000000 > > kafka.producer.send.buffer.bytes=1000000 > > kafka.producer.timeout.ms=10000 > > kafka.producer.reconnect.backoff.ms=500 > > kafka.producer.retry.backoff.ms=500 > > kafka.producer.block.on.buffer.full=false > > kafka.producer.linger.ms=1000 > > kafka.producer.maxinflight=2 > > kafka.producer.serializer.class=kafka.serializer.DefaultEncoder > > > > the main issue anyway is that when 1 producer get connections path issues > > to 1 broker (packet loss/connection slowdown) ( which doesn't mean that > the > > broker is down , other producer could reach it , it's just a networking > > problem between the two which coul last just 1 hours during peak time in > > some routing paths) it crash the whole application due to jvm ram usage > > > > I have found no solution playing with available params, > > > > now reading around I have understood that description of internal produer > > work: > > producer get messages and return an async future ( but during first > > connection it blocks before returing the future due to metadata fetching > , > > but that's another strange story :) ) , then the message get queued in 1 > > queue per partition, then in a random order, but sequentially brokers are > > contacted and all queue for the partitions assigned to that brokers are > > sent, waiting the end of transmission before proceeding to the next > broker > > but if the transmission to 1 broker hangs /slow down it does everything > ( > > other queues grows ) and I don't understood where the buffer memory will > be > > used in this whole process, because to me seems that it there is > something > > else that is using memory when this happens > > > > *to get more finer grained control over memory usage of kafka producer :* > > a simple file-backed solutions that monitor the callbacks to monitor when > > the producer hangs and stop sending to kafka producer more data when I > know > > that there are more than N messages around could be a partial solution, > ie > > could solve the memory usage but it's strange because for that the buffer > > should be enought... is the buffer internally multiplied by partitions or > > brokers ? or there is something else I'm not considering that could use > so > > much ram (10x the buffer size) ? > > > > but also this solution will slow down the whole producer in transmissions > > to all partitions leaders, > > > > where N is the number of the partitions in the queue, > > I'm going to build something like N threaded solution where each thread > > handle one queue for one partition ( handing 1 leader per thread would be > > optimal but to avoid the need to handling of leader > assignment/reelection 1 > > queue per partition is easier ) each thread have one producer which > > obviously will have less batch-performance improvements because > > multiple-partition-batches doesn't get aggregated to the assigned leader > > but at least will have more batching than the sync producer, and if the > > callback-tracking count of in-transmission messages reach some threshold > I > > start using the local-disk-storage as persistence for message to not give > > the producer too much messages, and using the disk-fifo as sources for > the > > producer til it's size reach 0 again... at the end each thread will have > > his file backed queue when slow down , and will process everything on ram > > queues when run OK, but each producer will never be overloaded of > messages , > > having a more finer grained control on producer memory usage could avoid > > the callback-inflying-message-counter but the disk queue is anyway > required > > to avoid message discards on buffer full, and multiple producer are > > required to avoid slow down of 1 producer when the problem is only > between > > that producer and one partition leader... > > > > does all this make sense ? > > > > Thank you :), > > Francesco Vigotti > > >