I'm on Kafka 0.8 final. Both brokers are up. The behavior is my producer
produces messages just fine, then it pauses for a few seconds. Then it
continues. The brokers are not stopping and starting. The broker logs show
that another producer/consumer has a connection error at the same time my
producer pauses. The exception at my producer (that pauses) indicates that
the connection was aborted, which to my understanding usually indicates
that the TCP connection was closed underneath it due to the broker end not
responding. It's as if the socket error indicated in the logs below for
"10.236.67.30" (the other producer/consumer) causes the broker to hang long
enough for the TCP connection for my producer to timeout.


Thanks


-------------------------------------------------------------------------

What version of kafka are you on?



It seems like your producers are not seeing your broker(s).  can you
confirm brokers are up?



On Mon, Dec 16, 2013 at 7:52 PM, Tom Amon <ta46...@gmail.com> wrote:



> Hi All,

>

> I have a situation where one producer/consumer is causing timeout

> errors on the Kafka broker. The exception in the logs looks like this:

>

> [2013-12-16 17:32:25,992] ERROR Closing socket for /10.236.67.30

> because of error (kafka.network.Processor)

> java.io.IOException: Connection timed out

>         at sun.nio.ch.FileDispatcher.read0(Native Method)

>         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)

>         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:202)

>         at sun.nio.ch.IOUtil.read(IOUtil.java:175)

>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)

>         at kafka.utils.Utils$.read(Utils.scala:395)

>         at

>

>
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)

>         at kafka.network.Processor.read(SocketServer.scala:347)

>         at kafka.network.Processor.run(SocketServer.scala:245)

>         at java.lang.Thread.run(Thread.java:662)

>

> When this happens, _another separate_ producer hangs for about 2-5
seconds.

>

> In the producer log I get this exception:

>

> [2013-12-16 16:32:14,961] INFO Disconnecting from

> qa-hermes004.phx.qa.com:9092 (kafka.producer.SyncProducer)

> [2013-12-16 16:32:14,982] WARN Failed to send producer request with

> correlation id 3290 to broker 1 with data for partitions [ktr3,4]

> (kafka.producer.async.DefaultEventHandler)

> java.io.IOException: An established connection was aborted by the

> software in your host machine.

>         at sun.nio.ch.SocketDispatcher.writev0(Native Method)

>         at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:49)

>         at sun.nio.ch.IOUtil.write(IOUtil.java:171)

>         at sun.nio.ch.SocketChannelImpl.write0(SocketChannelImpl.java:377)

>         at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:400)

>         at java.nio.channels.SocketChannel.write(SocketChannel.java:371)

>         at

>
kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)

>         at kafka.network.Send$class.writeCompletely(Transmission.scala:75)

>         at

>

>
kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)

>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:92)

>         at
kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72)

>         at

>

>
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)

>         at

>

>
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)

>         at

>

>
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)

>         at

>

>
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)

>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)

>         at

>

>
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)

>         at

> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)

>         at

> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)

>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)

>         at kafka.producer.SyncProducer.send(SyncProducer.scala:100)

>         at

>

>
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:245)

>         at

>

>
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107)

>         at

>

>
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101)

>         at

>
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)

>         at

>
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)

>         at scala.collection.Iterator$class.foreach(Iterator.scala:631)

>         at

> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)

>         at

> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)

>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)

>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)

>         at

>

>
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:97)

>         at

>

>
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)

>         at kafka.producer.Producer.send(Producer.scala:74)

>         at kafka.javaapi.producer.Producer.send(Producer.scala:32)

>         at producer.MessageProducer.send(MessageProducer.java:33)

>         at producer.MessageProducer.main(MessageProducer.java:103)

> [2013-12-16 16:32:14,987] INFO Back off for 100 ms before retrying send.

> Remaining retries = 3 (kafka.producer.async.DefaultEventHandler)

> [2013-12-16 16:32:15,088] INFO Fetching metadata from broker id:0,host:

> qa-hermes003.phx.qa.com,port:9092 with correlation id 3291 for 1

> topic(s)

> Set(ktr3) (kafka.client.ClientUtils$)

> [2013-12-16 16:32:15,134] INFO Connected to

> qa-hermes003.phx.qa.com:9092for producing

> (kafka.producer.SyncProducer)

> [2013-12-16 16:32:15,185] INFO Disconnecting from

> qa-hermes003.phx.qa.com:9092 (kafka.producer.SyncProducer)

> [2013-12-16 16:32:15,185] INFO Disconnecting from

> qa-hermes003.phx.qa.com:9092 (kafka.producer.SyncProducer)

> [2013-12-16 16:32:15,232] INFO Connected to

> qa-hermes004.phx.qa.com:9092for producing

> (kafka.producer.SyncProducer)

> [2013-12-16 16:32:15,279] INFO Connected to

> qa-hermes003.phx.qa.com:9092for producing

> (kafka.producer.SyncProducer)

>

> It's almost as if the error from the other producer/consumer is

> causing the broker itself to hang, causing my producer to abort the
connection.

>

> Has anyone else seen anything like this before? Any idea what's going on?

>

Reply via email to