The request log will show the total time that a broker takes to complete a
request. Could you see if that request takes more then request timeout to
complete?

Thanks,

Jun


On Wed, May 28, 2014 at 2:18 PM, Maung Than <maung_t...@apple.com> wrote:

> No.
>
> Now we only have this exception in the terminal: It seems to be happening
> with gzip or large volume of sends even without compression:
>
> java.net.SocketTimeoutException
>         at
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
>         at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
>         at kafka.utils.Utils$.read(Utils.scala:375)
>         at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>         at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>         at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>         at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
>         at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
>         at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:106)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:100)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>         at
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
>         at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
>         at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>         at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:254)
>         at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>         at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> 2014-05-28 14:13:36:858, 2014-05-28 14:14:20:797, 1, 420, 1000, 400.54,
> 9.1159, 1000000, 22758.8247
>
>
> On May 28, 2014, at 12:58 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Do you see any exceptions on the broker side request logs and server
> logs?
> >
> > Guozhang
> >
> >
> > On Wed, May 28, 2014 at 12:25 PM, Maung Than <maung_t...@apple.com>
> wrote:
> >
> >> The new value is 25000 ms.
> >> We still have the same issue.
> >>
> >> Thanks,
> >> Maung
> >>
> >> On May 27, 2014, at 3:15 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> >>
> >>> What is the new value you set? The new exception you saw seems not
> >> related
> >>> to Kafka network issues, you may want to, for example, check
> >>>
> >>> https://netbeans.org/bugzilla/show_bug.cgi?id=140000
> >>>
> >>>
> >>>
> >>> On Tue, May 27, 2014 at 12:34 PM, Maung Than <maung_t...@apple.com>
> >> wrote:
> >>>
> >>>> We are now getting additional IO exception as well:
> >>>>
> >>>> May 27, 2014 12:32:29 PM sun.rmi.transport.tcp.TCPTransport$AcceptLoop
> >>>> executeAcceptLoop
> >>>> WARNING: RMI TCP Accept-0: accept loop for ServerSocket[addr=
> >>>> 0.0.0.0/0.0.0.0,port=0,localport=50110] throws
> >>>> java.io.IOException: The server sockets created using the
> >>>> LocalRMIServerSocketFactory only accept connections from clients
> >> running on
> >>>> the host where the RMI remote objects have been exported.
> >>>>       at
> >>>>
> >>
> sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMIServerSocketFactory.java:96)
> >>>>       at
> >>>>
> >>
> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:369)
> >>>>       at
> >>>>
> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341)
> >>>>       at java.lang.Thread.run(Thread.java:662)
> >>>> 2014-05-27 12:28:50:309, 2014-05-27 12:33:25:806, 1, 420, 1000,
> 4005.43,
> >>>> 14.5389, 10000000, 36298.0359
> >>>>
> >>>> Maung
> >>>>
> >>>> On May 27, 2014, at 11:43 AM, Maung Than <maung_t...@apple.com>
> wrote:
> >>>>
> >>>>> You meant to say the below parameter; if so we do not override it; so
> >> it
> >>>> is the default 10 sec. Should It be larger? It is happening only with
> >> Gzip;
> >>>> Snappy or no compression works fine.
> >>>>>
> >>>>> request.timeout.ms    10000   The amount of time the broker will
> wait
> >>>> trying to meet the request.required.acks requirement before sending
> >> back an
> >>>> error to the client.
> >>>>>
> >>>>> Thanks,
> >>>>> Mauing
> >>>>>
> >>>>> On May 27, 2014, at 9:29 AM, Neha Narkhede <neha.narkh...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> I think Guozhang meant to say request.timeout.ms, not session
> >> timeout.
> >>>> You
> >>>>>> can try increasing the request timeout through the
> request-timeout-ms
> >>>>>> command line option.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Neha
> >>>>>>
> >>>>>>
> >>>>>> On Tue, May 27, 2014 at 8:55 AM, Guozhang Wang <wangg...@gmail.com>
> >>>> wrote:
> >>>>>>
> >>>>>>> Maung,
> >>>>>>>
> >>>>>>> This issue may be due to the session timeout value set too small.
> >> With
> >>>>>>> batch size 1000 and message size 420 you are sending 420K of data
> in
> >>>> each
> >>>>>>> request. What is your time out value?
> >>>>>>>
> >>>>>>> Guozhang
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, May 26, 2014 at 11:46 PM, Maung Than <maung_t...@apple.com
> >
> >>>> wrote:
> >>>>>>>
> >>>>>>>> Hi All,
> >>>>>>>>
> >>>>>>>> This is what we are running on the Broker:
> >>>>>>>>
> >>>>>>>>
> >>>>
> /Users/worun/kafkabuild/kafka-0.8.1-src/bin/kafka-producer-perf-test.sh
> >>>>>>>> --broker-list vp21q12ic-hpaj020921:9092 --messages 10000000
> --topic
> >>>>>>>> imessage --threads 10 --message-size 420 --batch-size 1000
> >>>>>>>> --compression-codec 1
> >>>>>>>>
> >>>>>>>> We are getting the below exception for the test, it occurs only
> with
> >>>> gzip
> >>>>>>>> in an async mode. In the broker log, I saw Connection reset by
> peer
> >> at
> >>>>>>>> sun.nio.ch.FileDispatcher. exception. Any thoughts?
> >>>>>>>>
> >>>>>>>> [2014-05-26 22:49:33,361] WARN Failed to send producer request
> with
> >>>>>>>> correlation id 58510 to broker 3 with data for partitions
> >> [imessage,1]
> >>>>>>>> (kafka.producer.async.DefaultEventHandler)
> >>>>>>>> java.net.SocketTimeoutException
> >>>>>>>>     at
> >>>>>>>>
> >>>>
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
> >>>>>>>>     at
> >>>> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
> >>>>>>>>     at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
> >>>>>>>>     at kafka.utils.Utils$.read(Utils.scala:375)
> >>>>>>>>     at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> >>>>>>>>     at
> >>>>>>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >>>>>>>>     at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >>>>>>>>     at
> >>>>>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >>>>>>>>     at
> >>>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
> >>>>>>>>     at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
> >>>>>>>>     at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
> >>>>>>>>     at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
> >>>>>>>>     at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
> >>>>>>>>     at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >>>>>>>>     at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
> >>>>>>>>     at
> >>>>>>>>
> >>>>
> >>
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
> >>>>>>>>     at
> >>>>>>>>
> >>>>
> >>
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
> >>>>>>>>     at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >>>>>>>>     at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
> >>>>>>>>     at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
> >>>>>>>>     at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:106)
> >>>>>>>>     at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:100)
> >>>>>>>>     at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> >>>>>>>>     at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> >>>>>>>>     at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> >>>>>>>>     at
> >>>>>>>>
> >>>>
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> >>>>>>>>     at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> >>>>>>>>     at
> >>>>>>> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> >>>>>>>>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> >>>>>>>>     at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
> >>>>>>>>     at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> >>>>>>>>     at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> >>>>>>>>     at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> >>>>>>>>     at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> >>>>>>>>     at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> >>>>>>>>     at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> >>>>>>>>     at
> >>>>>>>>
> >>>>
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> >>>>>>>> 2014-05-26 22:49:02:067, 2014-05-26 22:49:48:306, 1, 420, 1000,
> >>>> 400.54,
> >>>>>>>> 8.6625, 1000000, 21626.7653
> >>>>>>>>
> >>>>>>>> Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native
> >>>>>>> Method)
> >>>>>>>> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) at
> >>>>>>>> sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198) at
> >>>>>>>> sun.nio.ch.IOUtil.read(IOUtil.java:171) at
> >>>>>>>> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243) at
> >>>>>>>> kafka.utils.Utils$.read(Utils.scala:375) at
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> >>>>>>>> at kafka.network.Processor.read(SocketServer.scala:347) at
> >>>>>>>> kafka.network.Processor.run(SocketServer.scala:245) at
> >>>>>>>> java.lang.Thread.run(Thread.java:662)
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Maung
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> -- Guozhang
> >>>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>
> >>
> >
> >
> > --
> > -- Guozhang
>
>

Reply via email to