We observe that when a broker is down, Producer.send() can get into a state where it will block forever, even when using the async producer.
When a Producer first sends data, it fetches topic metadata from the broker cluster. To do this, it shuffles the list of hosts in the cluster, then iterates through the list querying each broker. For each broker in the shuffled list, the Producer creates a SyncProducer and invokes SyncProducer.send(). SyncProducer.send() creates a BlockingChannel and invokes BlockingChannel.connect(). BlockingChannel.connect() retrieves a java.nio.channels.SocketChannel, sets it to blocking mode, and invokes SocketChannel.connect(), passing the current broker hostname. If the first broker in the list is nonresponsive, SocketChannel.connect() will wait forever. I think the correct change is as follows: diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala index eb7bb14..9bb102a 100644 --- a/core/src/main/scala/kafka/network/BlockingChannel.scala +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala @@ -55,7 +55,7 @@ class BlockingChannel( val host: String, channel.socket.setSoTimeout(readTimeoutMs) channel.socket.setKeepAlive(true) channel.socket.setTcpNoDelay(true) - channel.connect(new InetSocketAddress(host, port)) + channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs) writeChannel = channel readChannel = Channels.newChannel(channel.socket().getInputStream) Is the next step to create a JIRA with this information? Thanks. -- Jack Foy <j...@whitepages.com>