[ 
https://issues.apache.org/jira/browse/KAFKA-1282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14137118#comment-14137118
 ] 

nicu marasoiu commented on KAFKA-1282:
--------------------------------------

here is a time line:

he -> produced
he -> consumed
[ wait beyond timeout here, connection got closed underneath by the other side]
[2014-09-17 15:02:28,689] INFO Got user-level KeeperException when processing 
sessionid:0x148837ce1800001 type:setData cxid:0x24 zxid:0xec txntype:-1 
reqpath:n/a Error Path:/consumers/console-consumer-87959/offsets/topi/0 
Error:KeeperErrorCode = NoNode for 
/consumers/console-consumer-87959/offsets/topi/0 
(org.apache.zookeeper.server.PrepRequestProcessor)
[2014-09-17 15:02:28,691] INFO Got user-level KeeperException when processing 
sessionid:0x148837ce1800001 type:create cxid:0x25 zxid:0xed txntype:-1 
reqpath:n/a Error Path:/consumers/console-consumer-87959/offsets 
Error:KeeperErrorCode = NoNode for /consumers/console-consumer-87959/offsets 
(org.apache.zookeeper.server.PrepRequestProcessor)
dddddddddddddd --> produce attempt (never retried, or never reached the broker 
or at least never reached the consumer)
[ many seconds wait, to see if the message is being retried, apparently not, 
even though the default retry is 3 times]
wwwwwwwwwwwwwwwww --> new attempt (immediattely I see the message below with 
the stack trace, and reconnect + retry is instantly sucesfull)
[2014-09-17 15:03:12,599] WARN Failed to send producer request with correlation 
id 9 to broker 0 with data for partitions [topi,0] 
(kafka.producer.async.DefaultEventHandler)
java.io.IOException: Broken pipe
        at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
        at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
        at sun.nio.ch.IOUtil.write(IOUtil.java:149)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:483)
        at java.nio.channels.SocketChannel.write(SocketChannel.java:493)
        at 
kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
        at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
        at 
kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
        at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2014-09-17 15:03:12,712] INFO Closing socket connection to /127.0.0.1. 
(kafka.network.Processor)
wwwwwwwwwwwwwwwww

> Disconnect idle socket connection in Selector
> ---------------------------------------------
>
>                 Key: KAFKA-1282
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1282
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.8.2
>            Reporter: Jun Rao
>            Assignee: nicu marasoiu
>              Labels: newbie++
>             Fix For: 0.9.0
>
>         Attachments: 1282_brushed_up.patch, 
> KAFKA-1282_Disconnect_idle_socket_connection_in_Selector.patch
>
>
> To reduce # socket connections, it would be useful for the new producer to 
> close socket connections that are idle. We can introduce a new producer 
> config for the idle time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to