Maung,

This issue may be due to the session timeout value set too small. With
batch size 1000 and message size 420 you are sending 420K of data in each
request. What is your time out value?

Guozhang


On Mon, May 26, 2014 at 11:46 PM, Maung Than <maung_t...@apple.com> wrote:

> Hi All,
>
> This is what we are running on the Broker:
>
> /Users/worun/kafkabuild/kafka-0.8.1-src/bin/kafka-producer-perf-test.sh
> --broker-list vp21q12ic-hpaj020921:9092 --messages 10000000 --topic
> imessage --threads 10 --message-size 420 --batch-size 1000
> --compression-codec 1
>
> We are getting the below exception for the test, it occurs only with gzip
> in an async mode. In the broker log, I saw Connection reset by peer at
> sun.nio.ch.FileDispatcher. exception. Any thoughts?
>
> [2014-05-26 22:49:33,361] WARN Failed to send producer request with
> correlation id 58510 to broker 3 with data for partitions [imessage,1]
> (kafka.producer.async.DefaultEventHandler)
> java.net.SocketTimeoutException
>         at
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
>         at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
>         at kafka.utils.Utils$.read(Utils.scala:375)
>         at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>         at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>         at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>         at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
>         at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
>         at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:106)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:100)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>         at
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
>         at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
>         at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>         at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:254)
>         at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>         at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> 2014-05-26 22:49:02:067, 2014-05-26 22:49:48:306, 1, 420, 1000, 400.54,
> 8.6625, 1000000, 21626.7653
>
> Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) at
> sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198) at
> sun.nio.ch.IOUtil.read(IOUtil.java:171) at
> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243) at
> kafka.utils.Utils$.read(Utils.scala:375) at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Processor.read(SocketServer.scala:347) at
> kafka.network.Processor.run(SocketServer.scala:245) at
> java.lang.Thread.run(Thread.java:662)
>
> Thanks,
> Maung
>



-- 
-- Guozhang

Reply via email to