Hi Jun, Thanks for the info, we'll keep you posted on the above topic. Surprisingly in broker controller logs we also see somewhat similar exceptions at the same line BoundedByteBufferReceive.scala:54. (Not sure if there is any correlation here.)
----------------8<---------------------- [2014-09-26 01:07:34,727] ERROR [ReplicaFetcherThread-0-2], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 354; ClientId: ReplicaFetcherThread-0-2; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [Topic0001,0] -> PartitionFetchInfo(0,1048576) .... [Topic0001,7] -> PartitionFetchInfo(0,1048576) .... [Topic1000,0] -> PartitionFetchInfo(0,1048576) ... [Topic1000,7] -> PartitionFetchInfo(0,1048576) (kafka.server.ReplicaFetcherThread) java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) at kafka.utils.Utils$.read(Utils.scala:375) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) -----------------8<---------------- On Wed, Sep 24, 2014 at 9:39 PM, Jun Rao <jun...@gmail.com> wrote: > You can enable some trace/debug level logging to see if the thread is > indeed hanging in BoundedByteBufferReceive. > > Thanks, > > Jun > > On Wed, Sep 24, 2014 at 8:30 AM, Jagbir Hooda <jho...@gmail.com> wrote: > >> Hi Jun, >> >> Thanks for looking into it. We use the following steps to start the >> consumer. >> >> 1) Create consumer connector >> ConsumerConnector consumerConnector = >> kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()) >> >> 2) Create message streams >> List<KafkaStream<byte[], byte[]>> streamList = >> consumerConnector.createMessageStreamsByFilter("topic1,topic2,etc."); >> >> 3) Get Stream iterator >> KafkaStream<byte[], byte[]> stream = streamList.get(0); >> ConsumerIterator<byte[], byte[]> it = stream.iterator(); >> >> 4) Get messages >> while(it.hasNext()) { >> MessageAndMetadata<byte[], byte[]> messageAndMeta = it.peek(); >> // consume message >> it.next(); >> } >> >> I'm still at loss to understand the correlation between the consumer >> code and the BoundedByteBufferReceive >> being RUNNABLE at "read += Utils.read(channel, sizeBuffer)". We took >> multiple thread dumps at different times (when CPU was busy) and the >> thread was always RUNNABLE at the same location. Intuitively reading >> the sizeBuffer should have been much less CPU bound then reading the >> contentBuffer. >> >> Thanks, >> Jagbir >> >> On Mon, Sep 22, 2014 at 9:26 PM, Jun Rao <jun...@gmail.com> wrote: >> > We allocate a new BoundedByteBufferReceive for every fetch request. Are >> you >> > using SimpleConsumer directly? It seems it's started by the high level >> > consumer through the FetchFetcher thread. >> > >> > Thanks, >> > >> > Jun >> > >> > On Mon, Sep 22, 2014 at 11:41 AM, Jagbir Hooda <jho...@gmail.com> wrote: >> > >> >> Note: Re-posting the older message from another account due to >> >> formatting issues. >> >> >> >> >> >> Folks, >> >> >> >> Recently in one of our SimpleConsumer based client applications >> (0.8.1.1), >> >> we spotted a very busy CPU with almost no traffic in/out from the client >> >> and Kafka broker (1broker+1zookeeper) (the stack trace is attached at >> the >> >> end). >> >> >> >> The busy thread was invoked in a while loop anchored at the readFrom >> >> function >> >> >> >> ---scala/kafka/network/Transmission.scala:55-59----- >> >> ..... >> >> while(!complete) { >> >> val read = readFrom(channel) >> >> trace(read + " bytes read.") >> >> totalRead += read >> >> } >> >> .... >> >> ---------------------------------------------------------------------- >> >> >> >> The readFrom funtion and the associated thread was RUNNABLE at >> >> "read += Utils.read(channel, sizeBuffer)" (see below) >> >> >> >> ---scala/kafka/network/BoundedByteBufferReceive.scala:49-95-------- >> >> .... >> >> def readFrom(channel: ReadableByteChannel): Int = { >> >> expectIncomplete() >> >> var read = 0 >> >> // have we read the request size yet? >> >> if(sizeBuffer.remaining > 0) >> >> read += Utils.read(channel, sizeBuffer) >> >> // have we allocated the request buffer yet? >> >> if(contentBuffer == null && !sizeBuffer.hasRemaining) { >> >> sizeBuffer.rewind() >> >> val size = sizeBuffer.getInt() >> >> if(size <= 0) >> >> throw new InvalidRequestException("%d is not a valid request >> >> size.".format(size)) >> >> if(size > maxSize) >> >> throw new InvalidRequestException("Request of length %d is not >> >> valid, it is larger than the maximum size of %d bytes.".format(size, >> >> maxSize)) >> >> contentBuffer = byteBufferAllocate(size) >> >> } >> >> // if we have a buffer read some stuff into it >> >> if(contentBuffer != null) { >> >> read = Utils.read(channel, contentBuffer) >> >> // did we get everything? >> >> if(!contentBuffer.hasRemaining) { >> >> contentBuffer.rewind() >> >> complete = true >> >> } >> >> } >> >> read >> >> } >> >> ..... >> >> >> >> >> ------------------------------------------------------------------------------ >> >> >> >> It looks like contentBuffer size is initialized only once in >> >> SimpleConsumer life-cycle (we keep >> >> SimpleConsumer alive until the app is restarted). >> >> >> >> Wondering what's the communication pattern between the client and >> broker. >> >> >> >> Is our assumption that contentBuffer 'size' is only negotiated when >> >> SimpleConsumer >> >> is created true? >> >> >> >> If above is true then why the thread is BUSY at "read += >> >> Utils.read(channel, sizeBuffer)"? >> >> (Our application was running fine for over three days and on the third >> >> day we noticed >> >> the busy CPU and this behavior.) >> >> >> >> If contentBuffer size is negotiated every message (or more frequently) >> >> then contentBuffer >> >> variable must be set to "null" somewhere to allow it to reconfigured >> >> (as per existing codebase). >> >> >> >> Another question is about the broker configuration setting which >> >> controls the contentBuffer size. >> >> >> >> I'll really appreciate if someone can help us in figuring it out. >> >> >> >> Thanks, >> >> jaguar >> >> >> >> ----------------BUSY THREAD-------------------------------- >> >> "ConsumerFetcherThread-aaaa_ubuntu-1410940574038-305417a6-0-0" prio=10 >> >> tid=0x00007f7e4c04e800 nid=0x738c runnable [0x00007f7e4a3e1000] >> >> java.lang.Thread.State: RUNNABLE >> >> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) >> >> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) >> >> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) >> >> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) >> >> - locked <0x00000000e583e670> (a sun.nio.ch.Util$2) >> >> - locked <0x00000000e583e660> (a java.util.Collections$UnmodifiableSet) >> >> - locked <0x00000000e583e1d8> (a sun.nio.ch.EPollSelectorImpl) >> >> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) >> >> at >> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221) >> >> - locked <0x00000000e59b69a8> (a java.lang.Object) >> >> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) >> >> - locked <0x00000000e59b6b30> (a >> >> sun.nio.ch.SocketAdaptor$SocketInputStream) >> >> at >> >> >> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) >> >> - locked <0x00000000e59cc4f0> (a java.lang.Object) >> >> at kafka.utils.Utils$.read(Utils.scala:375) >> >> at >> >> >> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) >> >> at kafka.network.Receive$class.readCompletely(Transmission.scala:56) >> >> at >> >> >> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) >> >> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) >> >> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73) >> >> at >> >> >> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) >> >> - locked <0x00000000e591cb68> (a java.lang.Object) >> >> at >> >> >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) >> >> at >> >> >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) >> >> at >> >> >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) >> >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >> >> at >> >> >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) >> >> at >> >> >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) >> >> at >> >> >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) >> >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >> >> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) >> >> at >> >> >> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) >> >> at >> >> >> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) >> >> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) >> >> >> >> >> ------------------------------------8<---------------------------------------- >> >> >>