The OOME issue may be caused
by org.apache.kafka.clients.producer.internals.ErrorLoggingCallback holding
unnecessary byte[] value. Can you apply the patch in below JIRA and try
again?

https://issues.apache.org/jira/browse/KAFKA-2281


On Wed, 15 Jul 2015 at 06:42 francesco vigotti <vigotti.france...@gmail.com>
wrote:

> I've not explained why only 10 partitions, anyway this is due to the fact
> that this does not speedup producer and also having this memory-monitoring
> problem and because I have no problems on the consumers side at the moment
> (10 should be enough even if I've not fully tested it yet ) and because the
> solution would be this partitions-separated-queues 10 seemed a fair number
> for me...
> also I'm considering the setup of a kind of producer-proxy instances,
> something that can be pushed with a batch of messages from a producer that
> I see that have communication problems to a leader and send this batch for
> him, this could help to mitigate the producer latency during such
> networking problems.. there isn't something like this yet right? an option
> in the producer that try to use a non-leader broker as broxy for the
> partition-leader broker would be optimal :) maybe with
> producer-to-singlebroker latency stats :)
>
> Thanks again and sorry for the verbosity :)
>
> On Tue, Jul 14, 2015 at 10:14 PM, francesco vigotti <
> vigotti.france...@gmail.com> wrote:
>
> > Hi,
> > I'm playing with kafka new producer to see if it could fit my use case,
> > kafka version 8.2.1
> > I'll probably end up having a kafka cluster of 5 nodes on multiple
> > datacenter
> > with one topic, with a replication factor of 2, and at least 10
> partitions
> > required for consumer performance , ( I'll explain why only 10 later.. )
> > my producers are 10 or more also distributed on multiple datacenters
> > around the globe each producing about 2k messages/sec
> > message size is about 500 bytes*message
> >
> > during my tests I have seen that when there is a network issue between
> one
> > of the producer and one of the partition leaders, the producer start to
> > accumulate data much more than the configured buffer.memory (configured
> at
> > about 40mb, which crash the jvm because Garbage collector cannot make
> space
> > to the application to work ( jvm memory is about 2 gb, working at about
> 1gb
> > usage always except when this problem occurr)
> > what i see is that i have about 500mb used by char[],
> >
> > during this events, yes, the buffer usage increase from about 5 mb to
> > about 30mb, and batches sends reach their maximum size to help to speedup
> > the transmission but seems to be not enought for memory usage /
> performance
> > / resilience requirements,
> >
> >
> >
> > kafka.producer.acks=1
> > kafka.producer.buffer.memory=40000000
> > kafka.producer.compression.type=gzip
> > kafka.producer.retries=1
> > kafka.producer.batch.size=10000
> > kafka.producer.max.request.size=10000000
> > kafka.producer.send.buffer.bytes=1000000
> > kafka.producer.timeout.ms=10000
> > kafka.producer.reconnect.backoff.ms=500
> > kafka.producer.retry.backoff.ms=500
> > kafka.producer.block.on.buffer.full=false
> > kafka.producer.linger.ms=1000
> > kafka.producer.maxinflight=2
> > kafka.producer.serializer.class=kafka.serializer.DefaultEncoder
> >
> > the main issue anyway is that when 1 producer get connections path issues
> > to 1 broker (packet loss/connection slowdown) ( which doesn't mean that
> the
> > broker is down , other producer could reach it , it's just a networking
> > problem between the two which coul last just 1 hours during peak time in
> > some routing paths) it crash the whole application due to jvm ram usage
> >
> > I have found no solution playing with available params,
> >
> > now reading around I have understood that description of internal produer
> > work:
> > producer get messages and return an async future ( but during first
> > connection it blocks before returing the future due to metadata fetching
> ,
> > but that's another strange story :) ) , then the message get queued in 1
> > queue per partition, then in a random order, but sequentially brokers are
> > contacted and all queue for the partitions assigned to that brokers are
> > sent, waiting the end of transmission before proceeding to the next
> broker
> >  but if the transmission to 1 broker hangs /slow down it does everything
> (
> > other queues grows ) and I don't understood where the buffer memory will
> be
> > used in this whole process, because to me seems that it there is
> something
> > else that is using memory when this happens
> >
> > *to get more finer grained control over memory usage of kafka producer :*
> > a simple file-backed solutions that monitor the callbacks to monitor when
> > the producer hangs and stop sending to kafka producer more data when I
> know
> > that there are more than N messages around could be a partial solution,
> ie
> > could solve the memory usage but it's strange because for that the buffer
> > should be enought... is the buffer internally multiplied by partitions or
> > brokers ? or there is something else I'm not considering that could use
> so
> > much ram (10x the buffer size) ?
> >
> > but also this solution will slow down the whole producer in transmissions
> > to all partitions leaders,
> >
> > where N is the number of the partitions in  the queue,
> > I'm going to build something like N threaded solution where each thread
> > handle one queue for one partition ( handing 1 leader per thread would be
> > optimal but to avoid the need to handling of leader
> assignment/reelection 1
> > queue per partition is easier ) each thread have one producer which
> > obviously will have less batch-performance improvements because
> > multiple-partition-batches doesn't get aggregated to the assigned leader
> > but at least will have more batching than the sync producer, and if the
> > callback-tracking count of in-transmission messages reach some threshold
> I
> > start using the local-disk-storage as persistence for message to not give
> > the producer too much messages, and using the disk-fifo as sources for
> the
> > producer til it's size reach 0 again... at the end each thread will have
> > his file backed queue when slow down , and will process everything on ram
> > queues when run OK, but each producer will never be overloaded of
> messages ,
> > having a more finer grained control on producer memory usage could avoid
> > the callback-inflying-message-counter but the disk queue is anyway
> required
> > to avoid message discards on buffer full, and multiple producer are
> > required to avoid slow down of 1 producer when the problem is only
> between
> > that producer and one partition leader...
> >
> > does all this make sense ?
> >
> > Thank you :),
> > Francesco Vigotti
> >
>

Reply via email to