Hi Ewen, It seems Leo has fixed the snappy lib for this issue. Here are changes: https://github.com/xerial/snappy-java/commit/7b86642f75c280debf3c1983053ea7f8635b48a5
Here is Jar with fix: https://oss.sonatype.org/content/repositories/snapshots/org/xerial/snappy/snappy-java/1.1.1.4-SNAPSHOT/ I will try this today afternoon. If it works, would you be able to upgrade Kafka trunk with this version. Thanks, Bhavesh On Mon, Oct 20, 2014 at 9:53 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com> wrote: > Hi Ewen, > > Thanks for doing the deep analysis on this issue. I have file this issue > with Snappy project and linked this Kafka Issues. Here is details about > the git hub issue: https://github.com/xerial/snappy-java/issues/88 > > I will follow-up with snappy guys to figure out how to solve this > problem. For us, this is typical use case of running web app J2EE > container with thread pool and recycled threads. > > Thanks, > > Bhavesh > > On Mon, Oct 20, 2014 at 6:56 PM, Ewen Cheslack-Postava <m...@ewencp.org> > wrote: > >> Also, filed https://issues.apache.org/jira/browse/KAFKA-1721 for this >> since it either requires an updated version of the upstream library, a >> workaround by us, or at a bare minimum clear documentation of the issue. >> >> On Mon, Oct 20, 2014, at 06:23 PM, Ewen Cheslack-Postava wrote: >> > I took a quick look at this since I noticed the same issue when testing >> > your code for the issues you filed. I think the issue is that you're >> > using all your producers across a thread pool and the snappy library >> > uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, >> > they may be allocated from the same thread (e.g. one of your MyProducer >> > classes calls Producer.send() on multiple producers from the same >> > thread) and therefore use the same BufferRecycler. Eventually you hit >> > the code in the stacktrace, and if two producer send threads hit it >> > concurrently they improperly share the unsynchronized BufferRecycler. >> > >> > This seems like a pain to fix -- it's really a deficiency of the snappy >> > library and as far as I can see there's no external control over >> > BufferRecycler in their API. One possibility is to record the thread ID >> > when we generate a new stream in Compressor and use that to synchronize >> > access to ensure no concurrent BufferRecycler access. That could be made >> > specific to snappy so it wouldn't impact other codecs. Not exactly >> > ideal, but it would work. Unfortunately I can't think of any way for you >> > to protect against this in your own code since the problem arises in the >> > producer send thread, which your code should never know about. >> > >> > Another option would be to setup your producers differently to avoid the >> > possibility of unsynchronized access from multiple threads (i.e. don't >> > use the same thread pool approach), but whether you can do that will >> > depend on your use case. >> > >> > -Ewen >> > >> > On Mon, Oct 20, 2014, at 12:19 PM, Bhavesh Mistry wrote: >> > > Hi Kafka Dev, >> > > >> > > I am getting following issue with Snappy Library. I checked code for >> > > Snappy lib it seems to be fine. Have you guys seen this issue ? >> > > >> > > 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR >> > > org.apache.kafka.clients.producer.internals.Sender - Uncaught error in >> > > kafka producer I/O thread: >> > > *java.lang.NullPointerException* >> > > at >> > > >> org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) >> > > at >> > > >> org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) >> > > at java.io.FilterOutputStream.close(FilterOutputStream.java:160) >> > > at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) >> > > at >> > > >> org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) >> > > at >> > > >> org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) >> > > at >> > > >> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) >> > > at >> > > >> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) >> > > at java.lang.Thread.run(Thread.java:744) >> > > >> > > >> > > Here is code for Snappy >> > > >> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153 >> > > : >> > > >> > > 153 >> > > < >> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153 >> > >> > > *if* (inputBuffer >> > > < >> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer >> > >> > > == *null* || (buffer != *null* && buffer.length > inputBuffer >> > > < >> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer >> >.length >> > > < >> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer >> >)) >> > > { >> > > >> > > >> > > Thanks, >> > > >> > > Bhavesh >> > >