We are seeing some odd socket timeouts from one of our producers.  This
producer fans out data from one topic into dozens or hundreds of potential
output topics.  We batch the send's to write 1,000 messages at a time.

The odd thing is that the timeouts are happening in the socket read, so I
assume that the socket.timeout.ms value applies, which we leave as the
default of 30 seconds.  The odd thing is that these exceptions are getting
thrown in clusters of 3-5 at a time with just a few seconds or less in
between each.  We are running with 64 network threads in our brokers, which
seems plenty given that the broker has only 8 cores.  From the clustering
of timeouts, it looks perhaps like we are issuing multiple metadata
requests in parallel.  Is that true?

We haven't touched the io threads (still set at 2), but I'm wondering if
these are just artifacts of congestion in the communication between the
brokers and our clients.  Are we using too many distinct topics (~95) and
should we try to cut down on them as a way to smooth the message exchanges
between broker and client?  I think that we are expecting the number of
topics in production to be much higher than these values.

It does appear that the producer in this case is able to continue sending,
but these exceptions in the logs make our testers unhappy.

I won't include the very lengthy log messages in toto, but the stack traces
look like:

java.net.SocketTimeoutException
        at
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
        at
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
        at kafka.utils.Utils$.read(Utils.scala:372)
        at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
        at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
        at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:105)
        at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:33)
        at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:75)
        at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:66)
        at kafka.utils.Utils$.swallow(Utils.scala:164)
        at kafka.utils.Logging$class.swallowError(Logging.scala:105)
        at kafka.utils.Utils$.swallowError(Utils.scala:43)
        at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:66)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:41)
        at
com.visibletechnologies.platform.common.kafka.KafkaWriter.flush(KafkaWriter.java:114)
        at
com.visibletechnologies.platform.common.kafka.KafkaWriter.checkFlush(KafkaWriter.java:92)
        at
com.visibletechnologies.platform.katta.krouter.KRouter.checkFlush(KRouter.java:182)
        at
com.visibletechnologies.platform.katta.krouter.KRouter.doWork(KRouter.java:139)
        at
com.visibletechnologies.framework.servicebase.ServiceBase.start(ServiceBase.java:187)
        at
com.visibletechnologies.platform.katta.krouter.Main.main(Main.java:132)

Reply via email to