As Jun suggested, if you look at the request metrics, you get a general
idea of how the request latencies are distributed across the network and
i/o layers. However, if you want to get to the bottom of investigating why
certain requests have failed, you can look at the request log. This logs
the same request latency breakdown for every request served by Kafka, but
what's really useful is that it also logs the request's unique correlation
id. This correlation id is a unique value assigned to a request by the
producer. By tracing this correlation id in the broker's request log, you
can see how that request performed and where it spent most of its time.

Let us know what those values were for the requests that timed out.

Thanks,
Neha


On Thu, Mar 21, 2013 at 8:24 AM, Jun Rao <jun...@gmail.com> wrote:

> Which ack are you using in the producer? On the broker, we have a jmx bean
> under RequestChannel that measures the total time of serving each producer
> request and the breakdown (queue time, local time, remote time, etc). Take
> a look at those metrics and see if the total time exceeds 30 secs. If so,
> find out which part takes the most amount of time.
>
> Also, are you using the latest code in the 0.8 branch?
>
> Thanks,
>
> Jun
>
>
> On Wed, Mar 20, 2013 at 11:06 AM, Bob Jervis <bjer...@gmail.com> wrote:
>
> > We are seeing some odd socket timeouts from one of our producers.  This
> > producer fans out data from one topic into dozens or hundreds of
> potential
> > output topics.  We batch the send's to write 1,000 messages at a time.
> >
> > The odd thing is that the timeouts are happening in the socket read, so I
> > assume that the socket.timeout.ms value applies, which we leave as the
> > default of 30 seconds.  The odd thing is that these exceptions are
> getting
> > thrown in clusters of 3-5 at a time with just a few seconds or less in
> > between each.  We are running with 64 network threads in our brokers,
> which
> > seems plenty given that the broker has only 8 cores.  From the clustering
> > of timeouts, it looks perhaps like we are issuing multiple metadata
> > requests in parallel.  Is that true?
> >
> > We haven't touched the io threads (still set at 2), but I'm wondering if
> > these are just artifacts of congestion in the communication between the
> > brokers and our clients.  Are we using too many distinct topics (~95) and
> > should we try to cut down on them as a way to smooth the message
> exchanges
> > between broker and client?  I think that we are expecting the number of
> > topics in production to be much higher than these values.
> >
> > It does appear that the producer in this case is able to continue
> sending,
> > but these exceptions in the logs make our testers unhappy.
> >
> > I won't include the very lengthy log messages in toto, but the stack
> traces
> > look like:
> >
> > java.net.SocketTimeoutException
> >         at
> > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
> >         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
> >         at
> >
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
> >         at kafka.utils.Utils$.read(Utils.scala:372)
> >         at
> >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> >         at
> > kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >         at
> >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >         at
> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >         at
> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> >         at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
> >         at kafka.producer.SyncProducer.send(SyncProducer.scala:105)
> >         at
> > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:33)
> >         at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:75)
> >         at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:66)
> >         at kafka.utils.Utils$.swallow(Utils.scala:164)
> >         at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> >         at kafka.utils.Utils$.swallowError(Utils.scala:43)
> >         at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:66)
> >         at kafka.producer.Producer.send(Producer.scala:76)
> >         at kafka.javaapi.producer.Producer.send(Producer.scala:41)
> >         at
> >
> >
> com.visibletechnologies.platform.common.kafka.KafkaWriter.flush(KafkaWriter.java:114)
> >         at
> >
> >
> com.visibletechnologies.platform.common.kafka.KafkaWriter.checkFlush(KafkaWriter.java:92)
> >         at
> >
> >
> com.visibletechnologies.platform.katta.krouter.KRouter.checkFlush(KRouter.java:182)
> >         at
> >
> >
> com.visibletechnologies.platform.katta.krouter.KRouter.doWork(KRouter.java:139)
> >         at
> >
> >
> com.visibletechnologies.framework.servicebase.ServiceBase.start(ServiceBase.java:187)
> >         at
> > com.visibletechnologies.platform.katta.krouter.Main.main(Main.java:132)
> >
>

Reply via email to