Hi Ewen,

It seems Leo has fixed the snappy lib for this issue.  Here are changes:
https://github.com/xerial/snappy-java/commit/7b86642f75c280debf3c1983053ea7f8635b48a5


Here is Jar with fix:
https://oss.sonatype.org/content/repositories/snapshots/org/xerial/snappy/snappy-java/1.1.1.4-SNAPSHOT/


I will try this today afternoon.  If it works, would you be able to upgrade
Kafka trunk with this version.

Thanks,

Bhavesh

On Mon, Oct 20, 2014 at 9:53 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com>
wrote:

> Hi Ewen,
>
> Thanks for doing the deep analysis on this issue.  I have file this issue
> with Snappy project and linked this Kafka Issues.  Here is details about
> the git hub issue:  https://github.com/xerial/snappy-java/issues/88
>
> I will follow-up with snappy guys to figure out how to solve this
> problem.  For us, this is typical use case of running web app J2EE
> container  with thread pool and recycled threads.
>
> Thanks,
>
> Bhavesh
>
> On Mon, Oct 20, 2014 at 6:56 PM, Ewen Cheslack-Postava <m...@ewencp.org>
> wrote:
>
>> Also, filed https://issues.apache.org/jira/browse/KAFKA-1721 for this
>> since it either requires an updated version of the upstream library, a
>> workaround by us, or at a bare minimum clear documentation of the issue.
>>
>> On Mon, Oct 20, 2014, at 06:23 PM, Ewen Cheslack-Postava wrote:
>> > I took a quick look at this since I noticed the same issue when testing
>> > your code for the issues you filed. I think the issue is that you're
>> > using all your producers across a thread pool and the snappy library
>> > uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
>> > they may be allocated from the same thread (e.g. one of your MyProducer
>> > classes calls Producer.send() on multiple producers from the same
>> > thread) and therefore use the same BufferRecycler. Eventually you hit
>> > the code in the stacktrace, and if two producer send threads hit it
>> > concurrently they improperly share the unsynchronized BufferRecycler.
>> >
>> > This seems like a pain to fix -- it's really a deficiency of the snappy
>> > library and as far as I can see there's no external control over
>> > BufferRecycler in their API. One possibility is to record the thread ID
>> > when we generate a new stream in Compressor and use that to synchronize
>> > access to ensure no concurrent BufferRecycler access. That could be made
>> > specific to snappy so it wouldn't impact other codecs. Not exactly
>> > ideal, but it would work. Unfortunately I can't think of any way for you
>> > to protect against this in your own code since the problem arises in the
>> > producer send thread, which your code should never know about.
>> >
>> > Another option would be to setup your producers differently to avoid the
>> > possibility of unsynchronized access from multiple threads (i.e. don't
>> > use the same thread pool approach), but whether you can do that will
>> > depend on your use case.
>> >
>> > -Ewen
>> >
>> > On Mon, Oct 20, 2014, at 12:19 PM, Bhavesh Mistry wrote:
>> > > Hi Kafka Dev,
>> > >
>> > > I am getting following issue with Snappy Library.  I checked code for
>> > > Snappy lib it seems to be fine.  Have you guys seen this issue ?
>> > >
>> > > 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
>> > > org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
>> > > kafka producer I/O thread:
>> > > *java.lang.NullPointerException*
>> > > at
>> > >
>> org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
>> > > at
>> > >
>> org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
>> > > at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
>> > > at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
>> > > at
>> > >
>> org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
>> > > at
>> > >
>> org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
>> > > at
>> > >
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
>> > > at
>> > >
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
>> > > at java.lang.Thread.run(Thread.java:744)
>> > >
>> > >
>> > > Here is code for Snappy
>> > >
>> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
>> > > :
>> > >
>> > > 153
>> > > <
>> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
>> >
>> > >  *if* (inputBuffer
>> > > <
>> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
>> >
>> > > == *null* || (buffer != *null* && buffer.length > inputBuffer
>> > > <
>> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
>> >.length
>> > > <
>> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
>> >))
>> > > {
>> > >
>> > >
>> > > Thanks,
>> > >
>> > > Bhavesh
>>
>
>

Reply via email to