Hi Jun,

Thanks for the info, we'll keep you posted on the above topic.
Surprisingly in broker controller logs we also see somewhat similar
exceptions at the same line BoundedByteBufferReceive.scala:54. (Not
sure if there is any correlation here.)

----------------8<----------------------
[2014-09-26 01:07:34,727] ERROR [ReplicaFetcherThread-0-2], Error in
fetch Name: FetchRequest; Version: 0; CorrelationId: 354; ClientId:
ReplicaFetcherThread-0-2; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1
bytes; RequestInfo:
[Topic0001,0] -> PartitionFetchInfo(0,1048576)
....
[Topic0001,7] -> PartitionFetchInfo(0,1048576)
....
[Topic1000,0] -> PartitionFetchInfo(0,1048576)
...
[Topic1000,7] -> PartitionFetchInfo(0,1048576)
(kafka.server.ReplicaFetcherThread)
java.net.SocketTimeoutException
        at 
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
        at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
        at kafka.utils.Utils$.read(Utils.scala:375)
        at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
        at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
        at 
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
        at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
-----------------8<----------------

On Wed, Sep 24, 2014 at 9:39 PM, Jun Rao <jun...@gmail.com> wrote:
> You can enable some trace/debug level logging to see if the thread is
> indeed hanging in BoundedByteBufferReceive.
>
> Thanks,
>
> Jun
>
> On Wed, Sep 24, 2014 at 8:30 AM, Jagbir Hooda <jho...@gmail.com> wrote:
>
>> Hi Jun,
>>
>> Thanks for looking into it. We use the following steps to start the
>> consumer.
>>
>> 1) Create consumer connector
>>    ConsumerConnector consumerConnector =
>> kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig())
>>
>> 2) Create message streams
>>    List<KafkaStream<byte[], byte[]>> streamList =
>> consumerConnector.createMessageStreamsByFilter("topic1,topic2,etc.");
>>
>> 3) Get Stream iterator
>>     KafkaStream<byte[], byte[]> stream = streamList.get(0);
>>     ConsumerIterator<byte[], byte[]> it = stream.iterator();
>>
>> 4) Get messages
>>     while(it.hasNext()) {
>>           MessageAndMetadata<byte[], byte[]> messageAndMeta = it.peek();
>>           // consume message
>>           it.next();
>>     }
>>
>> I'm still at loss to understand the correlation between the consumer
>> code and the BoundedByteBufferReceive
>> being RUNNABLE at "read += Utils.read(channel, sizeBuffer)". We took
>> multiple thread dumps at different times (when CPU was busy) and the
>> thread was always RUNNABLE at the same location. Intuitively reading
>> the sizeBuffer should have been much less CPU bound then reading the
>> contentBuffer.
>>
>> Thanks,
>> Jagbir
>>
>> On Mon, Sep 22, 2014 at 9:26 PM, Jun Rao <jun...@gmail.com> wrote:
>> > We allocate a new BoundedByteBufferReceive for every fetch request. Are
>> you
>> > using SimpleConsumer directly? It seems it's started by the high level
>> > consumer through the FetchFetcher thread.
>> >
>> > Thanks,
>> >
>> > Jun
>> >
>> > On Mon, Sep 22, 2014 at 11:41 AM, Jagbir Hooda <jho...@gmail.com> wrote:
>> >
>> >> Note:  Re-posting the older message from another account due to
>> >> formatting issues.
>> >>
>> >>
>> >> Folks,
>> >>
>> >> Recently in one of our SimpleConsumer based client applications
>> (0.8.1.1),
>> >> we spotted a very busy CPU with almost no traffic in/out from the client
>> >> and Kafka broker (1broker+1zookeeper) (the stack trace is attached at
>> the
>> >> end).
>> >>
>> >> The busy thread was invoked in a while loop anchored at the readFrom
>> >> function
>> >>
>> >> ---scala/kafka/network/Transmission.scala:55-59-----
>> >>     .....
>> >>     while(!complete) {
>> >>       val read = readFrom(channel)
>> >>       trace(read + " bytes read.")
>> >>       totalRead += read
>> >>     }
>> >>     ....
>> >> ----------------------------------------------------------------------
>> >>
>> >> The readFrom funtion and the associated thread was RUNNABLE at
>> >> "read += Utils.read(channel, sizeBuffer)" (see below)
>> >>
>> >> ---scala/kafka/network/BoundedByteBufferReceive.scala:49-95--------
>> >>   ....
>> >>   def readFrom(channel: ReadableByteChannel): Int = {
>> >>     expectIncomplete()
>> >>     var read = 0
>> >>     // have we read the request size yet?
>> >>     if(sizeBuffer.remaining > 0)
>> >>       read += Utils.read(channel, sizeBuffer)
>> >>     // have we allocated the request buffer yet?
>> >>     if(contentBuffer == null && !sizeBuffer.hasRemaining) {
>> >>       sizeBuffer.rewind()
>> >>       val size = sizeBuffer.getInt()
>> >>       if(size <= 0)
>> >>         throw new InvalidRequestException("%d is not a valid request
>> >> size.".format(size))
>> >>       if(size > maxSize)
>> >>         throw new InvalidRequestException("Request of length %d is not
>> >> valid, it is larger than the maximum size of %d bytes.".format(size,
>> >> maxSize))
>> >>       contentBuffer = byteBufferAllocate(size)
>> >>     }
>> >>     // if we have a buffer read some stuff into it
>> >>     if(contentBuffer != null) {
>> >>       read = Utils.read(channel, contentBuffer)
>> >>       // did we get everything?
>> >>       if(!contentBuffer.hasRemaining) {
>> >>         contentBuffer.rewind()
>> >>         complete = true
>> >>       }
>> >>     }
>> >>     read
>> >>   }
>> >>   .....
>> >>
>> >>
>> ------------------------------------------------------------------------------
>> >>
>> >> It looks like contentBuffer size is initialized only once in
>> >> SimpleConsumer life-cycle (we keep
>> >> SimpleConsumer alive until the app is restarted).
>> >>
>> >> Wondering what's the communication pattern between the client and
>> broker.
>> >>
>> >> Is our assumption that contentBuffer 'size' is only negotiated when
>> >> SimpleConsumer
>> >> is created true?
>> >>
>> >> If above is true then why the thread is BUSY at "read +=
>> >> Utils.read(channel, sizeBuffer)"?
>> >> (Our application was running fine for over three days and on the third
>> >> day we noticed
>> >> the busy CPU and this behavior.)
>> >>
>> >> If contentBuffer size is negotiated every message (or more frequently)
>> >> then contentBuffer
>> >> variable must be set to "null" somewhere to allow it to reconfigured
>> >> (as per existing codebase).
>> >>
>> >> Another question is about the broker configuration setting which
>> >> controls the contentBuffer size.
>> >>
>> >> I'll really appreciate if someone can help us in figuring it out.
>> >>
>> >> Thanks,
>> >> jaguar
>> >>
>> >> ----------------BUSY THREAD--------------------------------
>> >> "ConsumerFetcherThread-aaaa_ubuntu-1410940574038-305417a6-0-0" prio=10
>> >> tid=0x00007f7e4c04e800 nid=0x738c runnable [0x00007f7e4a3e1000]
>> >> java.lang.Thread.State: RUNNABLE
>> >>  at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>> >>  at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>> >>  at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>> >>  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>> >>  - locked <0x00000000e583e670> (a sun.nio.ch.Util$2)
>> >>  - locked <0x00000000e583e660> (a java.util.Collections$UnmodifiableSet)
>> >>  - locked <0x00000000e583e1d8> (a sun.nio.ch.EPollSelectorImpl)
>> >>  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
>> >>  at
>> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221)
>> >>  - locked <0x00000000e59b69a8> (a java.lang.Object)
>> >>  at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
>> >>  - locked <0x00000000e59b6b30> (a
>> >> sun.nio.ch.SocketAdaptor$SocketInputStream)
>> >>  at
>> >>
>> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
>> >>  - locked <0x00000000e59cc4f0> (a java.lang.Object)
>> >>  at kafka.utils.Utils$.read(Utils.scala:375)
>> >>  at
>> >>
>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>> >>  at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>> >>  at
>> >>
>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>> >>  at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>> >>  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
>> >>  at
>> >>
>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>> >>  - locked <0x00000000e591cb68> (a java.lang.Object)
>> >>  at
>> >>
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>> >>  at
>> >>
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> >>  at
>> >>
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> >>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >>  at
>> >>
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>> >>  at
>> >>
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>> >>  at
>> >>
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>> >>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >>  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>> >>  at
>> >>
>> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>> >>  at
>> >>
>> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>> >>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>> >>
>> >>
>> ------------------------------------8<----------------------------------------
>> >>
>>

Reply via email to