No.
Now we only have this exception in the terminal: It seems to be happening with
gzip or large volume of sends even without compression:
java.net.SocketTimeoutException
at
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
at
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
at kafka.utils.Utils$.read(Utils.scala:375)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
at
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
at
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
at
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:106)
at
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:100)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:254)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
2014-05-28 14:13:36:858, 2014-05-28 14:14:20:797, 1, 420, 1000, 400.54, 9.1159,
1000000, 22758.8247
On May 28, 2014, at 12:58 PM, Guozhang Wang <[email protected]> wrote:
> Do you see any exceptions on the broker side request logs and server logs?
>
> Guozhang
>
>
> On Wed, May 28, 2014 at 12:25 PM, Maung Than <[email protected]> wrote:
>
>> The new value is 25000 ms.
>> We still have the same issue.
>>
>> Thanks,
>> Maung
>>
>> On May 27, 2014, at 3:15 PM, Guozhang Wang <[email protected]> wrote:
>>
>>> What is the new value you set? The new exception you saw seems not
>> related
>>> to Kafka network issues, you may want to, for example, check
>>>
>>> https://netbeans.org/bugzilla/show_bug.cgi?id=140000
>>>
>>>
>>>
>>> On Tue, May 27, 2014 at 12:34 PM, Maung Than <[email protected]>
>> wrote:
>>>
>>>> We are now getting additional IO exception as well:
>>>>
>>>> May 27, 2014 12:32:29 PM sun.rmi.transport.tcp.TCPTransport$AcceptLoop
>>>> executeAcceptLoop
>>>> WARNING: RMI TCP Accept-0: accept loop for ServerSocket[addr=
>>>> 0.0.0.0/0.0.0.0,port=0,localport=50110] throws
>>>> java.io.IOException: The server sockets created using the
>>>> LocalRMIServerSocketFactory only accept connections from clients
>> running on
>>>> the host where the RMI remote objects have been exported.
>>>> at
>>>>
>> sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMIServerSocketFactory.java:96)
>>>> at
>>>>
>> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:369)
>>>> at
>>>> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341)
>>>> at java.lang.Thread.run(Thread.java:662)
>>>> 2014-05-27 12:28:50:309, 2014-05-27 12:33:25:806, 1, 420, 1000, 4005.43,
>>>> 14.5389, 10000000, 36298.0359
>>>>
>>>> Maung
>>>>
>>>> On May 27, 2014, at 11:43 AM, Maung Than <[email protected]> wrote:
>>>>
>>>>> You meant to say the below parameter; if so we do not override it; so
>> it
>>>> is the default 10 sec. Should It be larger? It is happening only with
>> Gzip;
>>>> Snappy or no compression works fine.
>>>>>
>>>>> request.timeout.ms 10000 The amount of time the broker will wait
>>>> trying to meet the request.required.acks requirement before sending
>> back an
>>>> error to the client.
>>>>>
>>>>> Thanks,
>>>>> Mauing
>>>>>
>>>>> On May 27, 2014, at 9:29 AM, Neha Narkhede <[email protected]>
>>>> wrote:
>>>>>
>>>>>> I think Guozhang meant to say request.timeout.ms, not session
>> timeout.
>>>> You
>>>>>> can try increasing the request timeout through the request-timeout-ms
>>>>>> command line option.
>>>>>>
>>>>>> Thanks,
>>>>>> Neha
>>>>>>
>>>>>>
>>>>>> On Tue, May 27, 2014 at 8:55 AM, Guozhang Wang <[email protected]>
>>>> wrote:
>>>>>>
>>>>>>> Maung,
>>>>>>>
>>>>>>> This issue may be due to the session timeout value set too small.
>> With
>>>>>>> batch size 1000 and message size 420 you are sending 420K of data in
>>>> each
>>>>>>> request. What is your time out value?
>>>>>>>
>>>>>>> Guozhang
>>>>>>>
>>>>>>>
>>>>>>> On Mon, May 26, 2014 at 11:46 PM, Maung Than <[email protected]>
>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> This is what we are running on the Broker:
>>>>>>>>
>>>>>>>>
>>>> /Users/worun/kafkabuild/kafka-0.8.1-src/bin/kafka-producer-perf-test.sh
>>>>>>>> --broker-list vp21q12ic-hpaj020921:9092 --messages 10000000 --topic
>>>>>>>> imessage --threads 10 --message-size 420 --batch-size 1000
>>>>>>>> --compression-codec 1
>>>>>>>>
>>>>>>>> We are getting the below exception for the test, it occurs only with
>>>> gzip
>>>>>>>> in an async mode. In the broker log, I saw Connection reset by peer
>> at
>>>>>>>> sun.nio.ch.FileDispatcher. exception. Any thoughts?
>>>>>>>>
>>>>>>>> [2014-05-26 22:49:33,361] WARN Failed to send producer request with
>>>>>>>> correlation id 58510 to broker 3 with data for partitions
>> [imessage,1]
>>>>>>>> (kafka.producer.async.DefaultEventHandler)
>>>>>>>> java.net.SocketTimeoutException
>>>>>>>> at
>>>>>>>>
>>>> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
>>>>>>>> at
>>>> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>
>> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
>>>>>>>> at kafka.utils.Utils$.read(Utils.scala:375)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>
>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>>>>>>>> at
>>>>>>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>
>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>>>>>> at
>>>>>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>>>>>>>> at
>>>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>
>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>
>> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>
>> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>
>> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
>>>>>>>> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>
>> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
>>>>>>>> at
>>>>>>>>
>>>>
>> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
>>>>>>>> at
>>>>>>>>
>>>>
>> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
>>>>>>>> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>
>> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>
>> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:106)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>
>> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:100)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>>>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>>>>>>>> at
>>>>>>>>
>>>> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>
>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
>>>>>>>> at
>>>>>>> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>>>>>>>> at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>
>> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>
>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>
>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>
>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>
>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>>>>>>>> at scala.collection.immutable.Stream.foreach(Stream.scala:254)
>>>>>>>> at
>>>>>>>>
>>>>>>>
>>>>
>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>>>>>>>> at
>>>>>>>>
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>>>>>>>> 2014-05-26 22:49:02:067, 2014-05-26 22:49:48:306, 1, 420, 1000,
>>>> 400.54,
>>>>>>>> 8.6625, 1000000, 21626.7653
>>>>>>>>
>>>>>>>> Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native
>>>>>>> Method)
>>>>>>>> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) at
>>>>>>>> sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198) at
>>>>>>>> sun.nio.ch.IOUtil.read(IOUtil.java:171) at
>>>>>>>> sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243) at
>>>>>>>> kafka.utils.Utils$.read(Utils.scala:375) at
>>>>>>>>
>>>>>>>
>>>>
>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>>>>>>>> at kafka.network.Processor.read(SocketServer.scala:347) at
>>>>>>>> kafka.network.Processor.run(SocketServer.scala:245) at
>>>>>>>> java.lang.Thread.run(Thread.java:662)
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Maung
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>
>>
>
>
> --
> -- Guozhang