Also, filed https://issues.apache.org/jira/browse/KAFKA-1721 for this since it either requires an updated version of the upstream library, a workaround by us, or at a bare minimum clear documentation of the issue.
On Mon, Oct 20, 2014, at 06:23 PM, Ewen Cheslack-Postava wrote: > I took a quick look at this since I noticed the same issue when testing > your code for the issues you filed. I think the issue is that you're > using all your producers across a thread pool and the snappy library > uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, > they may be allocated from the same thread (e.g. one of your MyProducer > classes calls Producer.send() on multiple producers from the same > thread) and therefore use the same BufferRecycler. Eventually you hit > the code in the stacktrace, and if two producer send threads hit it > concurrently they improperly share the unsynchronized BufferRecycler. > > This seems like a pain to fix -- it's really a deficiency of the snappy > library and as far as I can see there's no external control over > BufferRecycler in their API. One possibility is to record the thread ID > when we generate a new stream in Compressor and use that to synchronize > access to ensure no concurrent BufferRecycler access. That could be made > specific to snappy so it wouldn't impact other codecs. Not exactly > ideal, but it would work. Unfortunately I can't think of any way for you > to protect against this in your own code since the problem arises in the > producer send thread, which your code should never know about. > > Another option would be to setup your producers differently to avoid the > possibility of unsynchronized access from multiple threads (i.e. don't > use the same thread pool approach), but whether you can do that will > depend on your use case. > > -Ewen > > On Mon, Oct 20, 2014, at 12:19 PM, Bhavesh Mistry wrote: > > Hi Kafka Dev, > > > > I am getting following issue with Snappy Library. I checked code for > > Snappy lib it seems to be fine. Have you guys seen this issue ? > > > > 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR > > org.apache.kafka.clients.producer.internals.Sender - Uncaught error in > > kafka producer I/O thread: > > *java.lang.NullPointerException* > > at > > org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) > > at > > org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) > > at java.io.FilterOutputStream.close(FilterOutputStream.java:160) > > at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) > > at > > org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) > > at > > org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) > > at > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) > > at > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) > > at java.lang.Thread.run(Thread.java:744) > > > > > > Here is code for Snappy > > http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153 > > : > > > > 153 > > <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153> > > *if* (inputBuffer > > <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer> > > == *null* || (buffer != *null* && buffer.length > inputBuffer > > <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer>.length > > <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer>)) > > { > > > > > > Thanks, > > > > Bhavesh