Also, filed https://issues.apache.org/jira/browse/KAFKA-1721 for this
since it either requires an updated version of the upstream library, a
workaround by us, or at a bare minimum clear documentation of the issue.

On Mon, Oct 20, 2014, at 06:23 PM, Ewen Cheslack-Postava wrote:
> I took a quick look at this since I noticed the same issue when testing
> your code for the issues you filed. I think the issue is that you're
> using all your producers across a thread pool and the snappy library
> uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
> they may be allocated from the same thread (e.g. one of your MyProducer
> classes calls Producer.send() on multiple producers from the same
> thread) and therefore use the same BufferRecycler. Eventually you hit
> the code in the stacktrace, and if two producer send threads hit it
> concurrently they improperly share the unsynchronized BufferRecycler.
> 
> This seems like a pain to fix -- it's really a deficiency of the snappy
> library and as far as I can see there's no external control over
> BufferRecycler in their API. One possibility is to record the thread ID
> when we generate a new stream in Compressor and use that to synchronize
> access to ensure no concurrent BufferRecycler access. That could be made
> specific to snappy so it wouldn't impact other codecs. Not exactly
> ideal, but it would work. Unfortunately I can't think of any way for you
> to protect against this in your own code since the problem arises in the
> producer send thread, which your code should never know about.
> 
> Another option would be to setup your producers differently to avoid the
> possibility of unsynchronized access from multiple threads (i.e. don't
> use the same thread pool approach), but whether you can do that will
> depend on your use case.
> 
> -Ewen
> 
> On Mon, Oct 20, 2014, at 12:19 PM, Bhavesh Mistry wrote:
> > Hi Kafka Dev,
> > 
> > I am getting following issue with Snappy Library.  I checked code for
> > Snappy lib it seems to be fine.  Have you guys seen this issue ?
> > 
> > 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
> > org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
> > kafka producer I/O thread:
> > *java.lang.NullPointerException*
> > at
> > org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
> > at
> > org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
> > at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
> > at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
> > at
> > org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
> > at
> > org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
> > at
> > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
> > at
> > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> > at java.lang.Thread.run(Thread.java:744)
> > 
> > 
> > Here is code for Snappy
> > http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
> > :
> > 
> > 153
> > <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153>
> >  *if* (inputBuffer
> > <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer>
> > == *null* || (buffer != *null* && buffer.length > inputBuffer
> > <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer>.length
> > <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer>))
> > {
> > 
> > 
> > Thanks,
> > 
> > Bhavesh

Reply via email to