Actually that fetch call blocks on the server side. That is, if there is no data, the server will wait until data arrives or the timeout occurs to send a response. This is done to help simplify the client development. If that isn't happening it is likely a bug or a configuration change in the timeout.
I think we should try to ascertain how widespread this issue is, it could be pretty serious if it is always happening. Mattias, could you share your server configuration? -Jay On Sun, Feb 1, 2015 at 11:17 PM, Jaikiran Pai <jai.forums2...@gmail.com> wrote: > Hi Mathias, > > Looking at that thread dump, I think the potential culprit is this one: > > TRACE 303545: (thread=200049) > sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:Unknown > line) > sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) > sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) > sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) > sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221) > sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) > kafka.utils.Utils$.read(Utils.scala:380) > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive. > scala:67) > kafka.network.Receive$class.readCompletely(Transmission.scala:56) > kafka.network.BoundedByteBufferReceive.readCompletely( > BoundedByteBufferReceive.scala:29) > kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72) > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$ > $sendRequest(SimpleConsumer.scala:69) > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ > apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113) > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ > apply$mcV$sp$1.apply(SimpleConsumer.scala:113) > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ > apply$mcV$sp$1.apply(SimpleConsumer.scala:113) > kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp( > SimpleConsumer.scala:112) > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply( > SimpleConsumer.scala:112) > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply( > SimpleConsumer.scala:112) > kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111) > kafka.server.AbstractFetcherThread.processFetchRequest( > AbstractFetcherThread.scala:97) > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89) > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > > > I see many such threads all triggered through the SimpleConsumer and > ending up polling. Looking at the code, in theory, I can see why there > might be a busy CPU loop generated by that code path. If my guess is right, > it could be because of an issue in the implementation of how data is read > off a channel in a blocking manner and I think this patch might help > overcome that problem: > > diff --git a/core/src/main/scala/kafka/network/Transmission.scala > b/core/src/main/scala/kafka/network/Transmission.scala > index 2827103..0bab9ed 100644 > --- a/core/src/main/scala/kafka/network/Transmission.scala > +++ b/core/src/main/scala/kafka/network/Transmission.scala > @@ -54,8 +54,15 @@ trait Receive extends Transmission { > var totalRead = 0 > while(!complete) { > val read = readFrom(channel) > - trace(read + " bytes read.") > - totalRead += read > + if (read > 0) { > + trace(read + " bytes read.") > + totalRead += read > + } else if (read == 0) { > + // it's possible that nothing was read (see javadoc of > ReadableByteChannel#read), from the backing channel, > + // so we wait for a while before polling again, so that we don't > end up with a busy CPU loop > + // TODO: For now, this 30 milli seconds is a random value. > + Thread.sleep(30) > + } > } > totalRead > } > > Is this something that you would be able to apply against the latest 0.8.2 > branch of Kafka, build the Kafka binary, try it out and see if it improves > the situation? > > -Jaikiran > > On Monday 26 January 2015 11:35 PM, Mathias Söderberg wrote: > >> Hi Neha, >> >> I sent an e-mail earlier today, but noticed now that it didn't actually >> go through. >> >> Anyhow, I've attached two files, one with output from a 10 minute run and >> one with output from a 30 minute run. Realized that maybe I should've done >> one or two runs with 0.8.1.1 as well, but nevertheless. >> >> I upgraded our staging cluster to 0.8.2.0-rc2, and I'm seeing the same >> CPU usage as with the beta version (basically pegging all cores). If I >> manage to find the time I'll do another run with hprof on the rc2 version >> later today. >> >> Best regards, >> Mathias >> >> On Tue Dec 09 2014 at 10:08:21 PM Neha Narkhede <n...@confluent.io >> <mailto:n...@confluent.io>> wrote: >> >> The following should be sufficient >> >> java >> -agentlib:hprof=cpu=samples,depth=100,interval=20,lineno= >> y,thread=y,file=kafka.hprof >> <classname> >> >> You would need to start the Kafka server with the settings above for >> sometime until you observe the problem. >> >> On Tue, Dec 9, 2014 at 3:47 AM, Mathias Söderberg < >> mathias.soederb...@gmail.com >> <mailto:mathias.soederb...@gmail.com>> wrote: >> >> > Hi Neha, >> > >> > Yeah sure. I'm not familiar with hprof, so any particular >> options I should >> > include or just run with defaults? >> > >> > Best regards, >> > Mathias >> > >> > On Mon Dec 08 2014 at 7:41:32 PM Neha Narkhede >> <n...@confluent.io <mailto:n...@confluent.io>> wrote: >> > >> > > Thanks for reporting the issue. Would you mind running hprof >> and sending >> > > the output? >> > > >> > > On Mon, Dec 8, 2014 at 1:25 AM, Mathias Söderberg < >> > > mathias.soederb...@gmail.com >> <mailto:mathias.soederb...@gmail.com>> wrote: >> > > >> > > > Good day, >> > > > >> > > > I upgraded a Kafka cluster from v0.8.1.1 to v0.8.2-beta and >> noticed >> > that >> > > > the CPU usage on the broker machines went up by roughly 40%, >> from ~60% >> > to >> > > > ~100% and am wondering if anyone else has experienced something >> > similar? >> > > > The load average also went up by 2x-3x. >> > > > >> > > > We're running on EC2 and the cluster currently consists of four >> > > m1.xlarge, >> > > > with roughly 1100 topics / 4000 partitions. Using Java 7 >> (1.7.0_65 to >> > be >> > > > exact) and Scala 2.9.2. Configurations can be found over here: >> > > > https://gist.github.com/mthssdrbrg/7df34a795e07eef10262. >> > > > >> > > > I'm assuming that this is not expected behaviour for 0.8.2-beta? >> > > > >> > > > Best regards, >> > > > Mathias >> > > > >> > > >> > > >> > > >> > > -- >> > > Thanks, >> > > Neha >> > > >> > >> >> >> >> -- >> Thanks, >> Neha >> >> >