We are seeing some odd socket timeouts from one of our producers. This producer fans out data from one topic into dozens or hundreds of potential output topics. We batch the send's to write 1,000 messages at a time.
The odd thing is that the timeouts are happening in the socket read, so I assume that the socket.timeout.ms value applies, which we leave as the default of 30 seconds. The odd thing is that these exceptions are getting thrown in clusters of 3-5 at a time with just a few seconds or less in between each. We are running with 64 network threads in our brokers, which seems plenty given that the broker has only 8 cores. From the clustering of timeouts, it looks perhaps like we are issuing multiple metadata requests in parallel. Is that true? We haven't touched the io threads (still set at 2), but I'm wondering if these are just artifacts of congestion in the communication between the brokers and our clients. Are we using too many distinct topics (~95) and should we try to cut down on them as a way to smooth the message exchanges between broker and client? I think that we are expecting the number of topics in production to be much higher than these values. It does appear that the producer in this case is able to continue sending, but these exceptions in the logs make our testers unhappy. I won't include the very lengthy log messages in toto, but the stack traces look like: java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86) at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221) at kafka.utils.Utils$.read(Utils.scala:372) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71) at kafka.producer.SyncProducer.send(SyncProducer.scala:105) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:33) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:75) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:66) at kafka.utils.Utils$.swallow(Utils.scala:164) at kafka.utils.Logging$class.swallowError(Logging.scala:105) at kafka.utils.Utils$.swallowError(Utils.scala:43) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:66) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:41) at com.visibletechnologies.platform.common.kafka.KafkaWriter.flush(KafkaWriter.java:114) at com.visibletechnologies.platform.common.kafka.KafkaWriter.checkFlush(KafkaWriter.java:92) at com.visibletechnologies.platform.katta.krouter.KRouter.checkFlush(KRouter.java:182) at com.visibletechnologies.platform.katta.krouter.KRouter.doWork(KRouter.java:139) at com.visibletechnologies.framework.servicebase.ServiceBase.start(ServiceBase.java:187) at com.visibletechnologies.platform.katta.krouter.Main.main(Main.java:132)