Actually that fetch call blocks on the server side. That is, if there is no
data, the server will wait until data arrives or the timeout occurs to send
a response. This is done to help simplify the client development. If that
isn't happening it is likely a bug or a configuration change in the timeout.

I think we should try to ascertain how widespread this issue is, it could
be pretty serious if it is always happening.

Mattias, could you share your server configuration?

-Jay

On Sun, Feb 1, 2015 at 11:17 PM, Jaikiran Pai <jai.forums2...@gmail.com>
wrote:

> Hi Mathias,
>
> Looking at that thread dump, I think the potential culprit is this one:
>
> TRACE 303545: (thread=200049)
> sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:Unknown
> line)
>     sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
>     sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
>     sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221)
> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
>     kafka.utils.Utils$.read(Utils.scala:380)
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.
> scala:67)
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> kafka.network.BoundedByteBufferReceive.readCompletely(
> BoundedByteBufferReceive.scala:29)
> kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
> $sendRequest(SimpleConsumer.scala:69)
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113)
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
>     kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
> SimpleConsumer.scala:112)
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
> SimpleConsumer.scala:112)
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
> SimpleConsumer.scala:112)
>     kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>     kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111)
> kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:97)
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
>
> I see many such threads all triggered through the SimpleConsumer and
> ending up polling. Looking at the code, in theory, I can see why there
> might be a busy CPU loop generated by that code path. If my guess is right,
> it could be because of an issue in the implementation of how data is read
> off a channel in a blocking manner and I think this patch might help
> overcome that problem:
>
> diff --git a/core/src/main/scala/kafka/network/Transmission.scala
> b/core/src/main/scala/kafka/network/Transmission.scala
> index 2827103..0bab9ed 100644
> --- a/core/src/main/scala/kafka/network/Transmission.scala
> +++ b/core/src/main/scala/kafka/network/Transmission.scala
> @@ -54,8 +54,15 @@ trait Receive extends Transmission {
>      var totalRead = 0
>      while(!complete) {
>        val read = readFrom(channel)
> -      trace(read + " bytes read.")
> -      totalRead += read
> +      if (read > 0) {
> +        trace(read + " bytes read.")
> +        totalRead += read
> +      } else if (read == 0) {
> +        // it's possible that nothing was read (see javadoc of
> ReadableByteChannel#read), from the backing channel,
> +        // so we wait for a while before polling again, so that we don't
> end up with a busy CPU loop
> +        // TODO: For now, this 30 milli seconds is a random value.
> +        Thread.sleep(30)
> +      }
>      }
>      totalRead
>    }
>
> Is this something that you would be able to apply against the latest 0.8.2
> branch of Kafka, build the Kafka binary, try it out and see if it improves
> the situation?
>
> -Jaikiran
>
> On Monday 26 January 2015 11:35 PM, Mathias Söderberg wrote:
>
>> Hi Neha,
>>
>> I sent an e-mail earlier today, but noticed now that it didn't actually
>> go through.
>>
>> Anyhow, I've attached two files, one with output from a 10 minute run and
>> one with output from a 30 minute run. Realized that maybe I should've done
>> one or two runs with 0.8.1.1 as well, but nevertheless.
>>
>> I upgraded our staging cluster to 0.8.2.0-rc2, and I'm seeing the same
>> CPU usage as with the beta version (basically pegging all cores). If I
>> manage to find the time I'll do another run with hprof on the rc2 version
>> later today.
>>
>> Best regards,
>> Mathias
>>
>> On Tue Dec 09 2014 at 10:08:21 PM Neha Narkhede <n...@confluent.io
>> <mailto:n...@confluent.io>> wrote:
>>
>>     The following should be sufficient
>>
>>     java
>>     -agentlib:hprof=cpu=samples,depth=100,interval=20,lineno=
>> y,thread=y,file=kafka.hprof
>>     <classname>
>>
>>     You would need to start the Kafka server with the settings above for
>>     sometime until you observe the problem.
>>
>>     On Tue, Dec 9, 2014 at 3:47 AM, Mathias Söderberg <
>>     mathias.soederb...@gmail.com
>>     <mailto:mathias.soederb...@gmail.com>> wrote:
>>
>>     > Hi Neha,
>>     >
>>     > Yeah sure. I'm not familiar with hprof, so any particular
>>     options I should
>>     > include or just run with defaults?
>>     >
>>     > Best regards,
>>     > Mathias
>>     >
>>     > On Mon Dec 08 2014 at 7:41:32 PM Neha Narkhede
>>     <n...@confluent.io <mailto:n...@confluent.io>> wrote:
>>     >
>>     > > Thanks for reporting the issue. Would you mind running hprof
>>     and sending
>>     > > the output?
>>     > >
>>     > > On Mon, Dec 8, 2014 at 1:25 AM, Mathias Söderberg <
>>     > > mathias.soederb...@gmail.com
>>     <mailto:mathias.soederb...@gmail.com>> wrote:
>>     > >
>>     > > > Good day,
>>     > > >
>>     > > > I upgraded a Kafka cluster from v0.8.1.1 to v0.8.2-beta and
>>     noticed
>>     > that
>>     > > > the CPU usage on the broker machines went up by roughly 40%,
>>     from ~60%
>>     > to
>>     > > > ~100% and am wondering if anyone else has experienced something
>>     > similar?
>>     > > > The load average also went up by 2x-3x.
>>     > > >
>>     > > > We're running on EC2 and the cluster currently consists of four
>>     > > m1.xlarge,
>>     > > > with roughly 1100 topics / 4000 partitions. Using Java 7
>>     (1.7.0_65 to
>>     > be
>>     > > > exact) and Scala 2.9.2. Configurations can be found over here:
>>     > > > https://gist.github.com/mthssdrbrg/7df34a795e07eef10262.
>>     > > >
>>     > > > I'm assuming that this is not expected behaviour for 0.8.2-beta?
>>     > > >
>>     > > > Best regards,
>>     > > > Mathias
>>     > > >
>>     > >
>>     > >
>>     > >
>>     > > --
>>     > > Thanks,
>>     > > Neha
>>     > >
>>     >
>>
>>
>>
>>     --
>>     Thanks,
>>     Neha
>>
>>
>

Reply via email to