I took a quick look at this since I noticed the same issue when testing your code for the issues you filed. I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler.
This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. -Ewen On Mon, Oct 20, 2014, at 12:19 PM, Bhavesh Mistry wrote: > Hi Kafka Dev, > > I am getting following issue with Snappy Library. I checked code for > Snappy lib it seems to be fine. Have you guys seen this issue ? > > 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR > org.apache.kafka.clients.producer.internals.Sender - Uncaught error in > kafka producer I/O thread: > *java.lang.NullPointerException* > at > org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) > at > org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) > at java.io.FilterOutputStream.close(FilterOutputStream.java:160) > at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) > at > org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) > at java.lang.Thread.run(Thread.java:744) > > > Here is code for Snappy > http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153 > : > > 153 > <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153> > *if* (inputBuffer > <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer> > == *null* || (buffer != *null* && buffer.length > inputBuffer > <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer>.length > <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer>)) > { > > > Thanks, > > Bhavesh