I think Guozhang meant to say request.timeout.ms, not session timeout. You can try increasing the request timeout through the request-timeout-ms command line option.
Thanks, Neha On Tue, May 27, 2014 at 8:55 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Maung, > > This issue may be due to the session timeout value set too small. With > batch size 1000 and message size 420 you are sending 420K of data in each > request. What is your time out value? > > Guozhang > > > On Mon, May 26, 2014 at 11:46 PM, Maung Than <maung_t...@apple.com> wrote: > > > Hi All, > > > > This is what we are running on the Broker: > > > > /Users/worun/kafkabuild/kafka-0.8.1-src/bin/kafka-producer-perf-test.sh > > --broker-list vp21q12ic-hpaj020921:9092 --messages 10000000 --topic > > imessage --threads 10 --message-size 420 --batch-size 1000 > > --compression-codec 1 > > > > We are getting the below exception for the test, it occurs only with gzip > > in an async mode. In the broker log, I saw Connection reset by peer at > > sun.nio.ch.FileDispatcher. exception. Any thoughts? > > > > [2014-05-26 22:49:33,361] WARN Failed to send producer request with > > correlation id 58510 to broker 3 with data for partitions [imessage,1] > > (kafka.producer.async.DefaultEventHandler) > > java.net.SocketTimeoutException > > at > > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201) > > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86) > > at > > > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221) > > at kafka.utils.Utils$.read(Utils.scala:375) > > at > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at > > kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at > kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > > at > kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74) > > at > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71) > > at > > > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) > > at > > > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) > > at > > > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > at > > > kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) > > at > > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) > > at > > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:100) > > at > > > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) > > at > > > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:106) > > at > > > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:100) > > at > > > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > > at > > > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > > at > > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > > at > > > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > > at > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > > at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > > at > > > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) > > at > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > > at > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > > at > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > > at > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > > at scala.collection.immutable.Stream.foreach(Stream.scala:254) > > at > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) > > at > > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > > 2014-05-26 22:49:02:067, 2014-05-26 22:49:48:306, 1, 420, 1000, 400.54, > > 8.6625, 1000000, 21626.7653 > > > > Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native > Method) > > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) at > > sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198) at > > sun.nio.ch.IOUtil.read(IOUtil.java:171) at > > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243) at > > kafka.utils.Utils$.read(Utils.scala:375) at > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at kafka.network.Processor.read(SocketServer.scala:347) at > > kafka.network.Processor.run(SocketServer.scala:245) at > > java.lang.Thread.run(Thread.java:662) > > > > Thanks, > > Maung > > > > > > -- > -- Guozhang >