We observe that when a broker is down, Producer.send() can get into a state 
where it will block forever, even when using the async producer.

When a Producer first sends data, it fetches topic metadata from the broker 
cluster. To do this, it shuffles the list of hosts in the cluster, then 
iterates through the list querying each broker.

For each broker in the shuffled list, the Producer creates a SyncProducer and 
invokes SyncProducer.send(). 
SyncProducer.send() creates a BlockingChannel and invokes 
BlockingChannel.connect().
BlockingChannel.connect() retrieves a java.nio.channels.SocketChannel, sets it 
to blocking mode, and invokes SocketChannel.connect(), passing the current 
broker hostname. 

If the first broker in the list is nonresponsive, SocketChannel.connect() will 
wait forever. 

I think the correct change is as follows:

diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala 
b/core/src/main/scala/kafka/network/BlockingChannel.scala
index eb7bb14..9bb102a 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -55,7 +55,7 @@ class BlockingChannel( val host: String,
         channel.socket.setSoTimeout(readTimeoutMs)
         channel.socket.setKeepAlive(true)
         channel.socket.setTcpNoDelay(true)
-        channel.connect(new InetSocketAddress(host, port))
+        channel.socket.connect(new InetSocketAddress(host, port), 
connectTimeoutMs)

         writeChannel = channel
         readChannel = Channels.newChannel(channel.socket().getInputStream)

Is the next step to create a JIRA with this information? Thanks.

-- 
Jack Foy <j...@whitepages.com>



Reply via email to