The request log will show the total time that a broker takes to complete a request. Could you see if that request takes more then request timeout to complete?
Thanks, Jun On Wed, May 28, 2014 at 2:18 PM, Maung Than <maung_t...@apple.com> wrote: > No. > > Now we only have this exception in the terminal: It seems to be happening > with gzip or large volume of sends even without compression: > > java.net.SocketTimeoutException > at > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201) > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86) > at > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221) > at kafka.utils.Utils$.read(Utils.scala:375) > at > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > at > kafka.network.Receive$class.readCompletely(Transmission.scala:56) > at > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71) > at > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) > at > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) > at > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) > at > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) > at > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.producer.SyncProducer.send(SyncProducer.scala:100) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) > at > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:106) > at > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:100) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > at scala.collection.immutable.Stream.foreach(Stream.scala:254) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > 2014-05-28 14:13:36:858, 2014-05-28 14:14:20:797, 1, 420, 1000, 400.54, > 9.1159, 1000000, 22758.8247 > > > On May 28, 2014, at 12:58 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Do you see any exceptions on the broker side request logs and server > logs? > > > > Guozhang > > > > > > On Wed, May 28, 2014 at 12:25 PM, Maung Than <maung_t...@apple.com> > wrote: > > > >> The new value is 25000 ms. > >> We still have the same issue. > >> > >> Thanks, > >> Maung > >> > >> On May 27, 2014, at 3:15 PM, Guozhang Wang <wangg...@gmail.com> wrote: > >> > >>> What is the new value you set? The new exception you saw seems not > >> related > >>> to Kafka network issues, you may want to, for example, check > >>> > >>> https://netbeans.org/bugzilla/show_bug.cgi?id=140000 > >>> > >>> > >>> > >>> On Tue, May 27, 2014 at 12:34 PM, Maung Than <maung_t...@apple.com> > >> wrote: > >>> > >>>> We are now getting additional IO exception as well: > >>>> > >>>> May 27, 2014 12:32:29 PM sun.rmi.transport.tcp.TCPTransport$AcceptLoop > >>>> executeAcceptLoop > >>>> WARNING: RMI TCP Accept-0: accept loop for ServerSocket[addr= > >>>> 0.0.0.0/0.0.0.0,port=0,localport=50110] throws > >>>> java.io.IOException: The server sockets created using the > >>>> LocalRMIServerSocketFactory only accept connections from clients > >> running on > >>>> the host where the RMI remote objects have been exported. > >>>> at > >>>> > >> > sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMIServerSocketFactory.java:96) > >>>> at > >>>> > >> > sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:369) > >>>> at > >>>> > sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341) > >>>> at java.lang.Thread.run(Thread.java:662) > >>>> 2014-05-27 12:28:50:309, 2014-05-27 12:33:25:806, 1, 420, 1000, > 4005.43, > >>>> 14.5389, 10000000, 36298.0359 > >>>> > >>>> Maung > >>>> > >>>> On May 27, 2014, at 11:43 AM, Maung Than <maung_t...@apple.com> > wrote: > >>>> > >>>>> You meant to say the below parameter; if so we do not override it; so > >> it > >>>> is the default 10 sec. Should It be larger? It is happening only with > >> Gzip; > >>>> Snappy or no compression works fine. > >>>>> > >>>>> request.timeout.ms 10000 The amount of time the broker will > wait > >>>> trying to meet the request.required.acks requirement before sending > >> back an > >>>> error to the client. > >>>>> > >>>>> Thanks, > >>>>> Mauing > >>>>> > >>>>> On May 27, 2014, at 9:29 AM, Neha Narkhede <neha.narkh...@gmail.com> > >>>> wrote: > >>>>> > >>>>>> I think Guozhang meant to say request.timeout.ms, not session > >> timeout. > >>>> You > >>>>>> can try increasing the request timeout through the > request-timeout-ms > >>>>>> command line option. > >>>>>> > >>>>>> Thanks, > >>>>>> Neha > >>>>>> > >>>>>> > >>>>>> On Tue, May 27, 2014 at 8:55 AM, Guozhang Wang <wangg...@gmail.com> > >>>> wrote: > >>>>>> > >>>>>>> Maung, > >>>>>>> > >>>>>>> This issue may be due to the session timeout value set too small. > >> With > >>>>>>> batch size 1000 and message size 420 you are sending 420K of data > in > >>>> each > >>>>>>> request. What is your time out value? > >>>>>>> > >>>>>>> Guozhang > >>>>>>> > >>>>>>> > >>>>>>> On Mon, May 26, 2014 at 11:46 PM, Maung Than <maung_t...@apple.com > > > >>>> wrote: > >>>>>>> > >>>>>>>> Hi All, > >>>>>>>> > >>>>>>>> This is what we are running on the Broker: > >>>>>>>> > >>>>>>>> > >>>> > /Users/worun/kafkabuild/kafka-0.8.1-src/bin/kafka-producer-perf-test.sh > >>>>>>>> --broker-list vp21q12ic-hpaj020921:9092 --messages 10000000 > --topic > >>>>>>>> imessage --threads 10 --message-size 420 --batch-size 1000 > >>>>>>>> --compression-codec 1 > >>>>>>>> > >>>>>>>> We are getting the below exception for the test, it occurs only > with > >>>> gzip > >>>>>>>> in an async mode. In the broker log, I saw Connection reset by > peer > >> at > >>>>>>>> sun.nio.ch.FileDispatcher. exception. Any thoughts? > >>>>>>>> > >>>>>>>> [2014-05-26 22:49:33,361] WARN Failed to send producer request > with > >>>>>>>> correlation id 58510 to broker 3 with data for partitions > >> [imessage,1] > >>>>>>>> (kafka.producer.async.DefaultEventHandler) > >>>>>>>> java.net.SocketTimeoutException > >>>>>>>> at > >>>>>>>> > >>>> > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201) > >>>>>>>> at > >>>> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>> > >> > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221) > >>>>>>>> at kafka.utils.Utils$.read(Utils.scala:375) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>> > >> > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > >>>>>>>> at > >>>>>>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>> > >> > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > >>>>>>>> at > >>>>>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > >>>>>>>> at > >>>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>> > >> > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>> > >> > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>> > >> > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>> > >> > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) > >>>>>>>> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>> > >> > kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) > >>>>>>>> at > >>>>>>>> > >>>> > >> > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) > >>>>>>>> at > >>>>>>>> > >>>> > >> > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) > >>>>>>>> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > >>>>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:100) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>> > >> > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>> > >> > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:106) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>> > >> > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:100) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>> > >> > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>> > >> > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > >>>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:631) > >>>>>>>> at > >>>>>>>> > >>>> > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>> > >> > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > >>>>>>>> at > >>>>>>> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > >>>>>>>> at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>> > >> > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>> > >> > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>> > >> > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > >>>>>>>> at scala.collection.immutable.Stream.foreach(Stream.scala:254) > >>>>>>>> at > >>>>>>>> > >>>>>>> > >>>> > >> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) > >>>>>>>> at > >>>>>>>> > >>>> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > >>>>>>>> 2014-05-26 22:49:02:067, 2014-05-26 22:49:48:306, 1, 420, 1000, > >>>> 400.54, > >>>>>>>> 8.6625, 1000000, 21626.7653 > >>>>>>>> > >>>>>>>> Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native > >>>>>>> Method) > >>>>>>>> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) at > >>>>>>>> sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198) at > >>>>>>>> sun.nio.ch.IOUtil.read(IOUtil.java:171) at > >>>>>>>> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243) at > >>>>>>>> kafka.utils.Utils$.read(Utils.scala:375) at > >>>>>>>> > >>>>>>> > >>>> > >> > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > >>>>>>>> at kafka.network.Processor.read(SocketServer.scala:347) at > >>>>>>>> kafka.network.Processor.run(SocketServer.scala:245) at > >>>>>>>> java.lang.Thread.run(Thread.java:662) > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Maung > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> -- Guozhang > >>>>>>> > >>>>> > >>>> > >>>> > >>> > >>> > >>> -- > >>> -- Guozhang > >> > >> > > > > > > -- > > -- Guozhang > >