Re: kafka 0.8.1: Producer.send() can block forever when a broker is down
Make sense. Please file a JIRA and attach a patch there. It will be great to add a simple test case as well. Thanks, Neha On Wed, Sep 17, 2014 at 8:25 AM, Jonathan Weeks Gmail < jonathanbwe...@gmail.com> wrote: > > The issue is that even with one down broker, the rest of the cluster is > up, but unreachable from the producer client in this case, which defeats > the high availability characteristics of clustering. > > For any producer trying to use the service, it is "russian roulette" > whether you will get meta-data back when asking for topic/partition data. > > The ClientUtils code rightly iterates through the broker list looking for > the metadata in random order, but if the first broker in the list is down, > the others are never retried in a timely manner. > > An example stacktrace shows the problem: > > default-dispatcher-3" prio=5 tid=0x7fef131c6000 nid=0x5f03 runnable > [0x0001146d2000] > java.lang.Thread.State: RUNNABLE > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) > - locked <0x0007ad4c1b50> (a java.lang.Object) > - locked <0x0007ad4c1b70> (a java.lang.Object) > - locked <0x0007ad4c1b60> (a java.lang.Object) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > - locked <0x0007ad3f3408> (a java.lang.Object) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) > at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > - locked <0x0007ad3de648> (a java.lang.Object) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) > at kafka.utils.Utils$.swallow(Utils.scala:167) > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > at kafka.utils.Utils$.swallowError(Utils.scala:46) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) > at kafka.producer.Producer.send(Producer.scala:76) > > An eight minute timeout is a non-starter for a clustered (HA) service. One > would expect the system to respect the request.timeout.ms config setting, > which it does, unless a broker host is down and happens to be first in the > shuffled list of brokers to try to get the metadata. > > I believe this bug is also exacerbated by the fact that the meta data is > (rightly) refreshed via the topic.metadata.refresh.interval.ms config > setting, which defaults to every 10 minutes. AFAIK, this means that if a > single broker is down, every new producer as well as every existing > producer has a (1/clusterSize-1) chance of either not starting or hanging > for a minimum of 8 minutes, (assuming the tcp connection code times out), > every 10 minutes (or whatever topic.metadata.refresh.interval.ms is set > to), if I understand correctly. > > Initializing the SocketChannel in code that doesn't respect the > request.timeout.ms setting logically defeats the spirit of the timeout > setting as well makes as the iteration code in ClientUtils far less useful: > > (from fetchTopicMetadata:) > val shuffledBrokers = Random.shuffle(brokers) > while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) { > val producer: SyncProducer = > ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i)) > info("Fetching metadata from broker %s with correlation id %d for %d > topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics)) > try { > topicMetadataResponse = producer.send(topicMetadataRequest) > > Opening the connection with a timeout as Jack suggests seems far > preferable to the current situation. > > Best Regards, > > -Jonathan > > > On Sep 16, 2014, at 10:08 PM, Jun Rao wrote: > > Jack, > > > > If the broker is down, channel.connect() should throw an IOException, > > instead of blocking forever. In your case, is the broker host down? In > that > > case, the connect call will likely wait for the default tcp connection > > timeout, which is 8+ mins. > > > > Thanks, > > > > Jun > > > > On Tue, Sep 16, 2014 at 5:43 PM, Jack Foy wrote: > > > >> We observe that when a broker is down, Producer.send() can get into a > >> state where it will block forever, even when using the async producer. > >> > >> When a Producer first sends data, it fetches topic metadata from the > >> broker cluster. To do this, it shuffles the list of hosts in the > cluster, > >> then iterates through the list querying each broker. > >> > >> For each broker in the shuffled list, the Producer creates a > SyncProducer > >> and invokes SyncProducer.send(). > >> SyncProducer.send() creates a BlockingChannel and invokes
Re: kafka 0.8.1: Producer.send() can block forever when a broker is down
The issue is that even with one down broker, the rest of the cluster is up, but unreachable from the producer client in this case, which defeats the high availability characteristics of clustering. For any producer trying to use the service, it is "russian roulette" whether you will get meta-data back when asking for topic/partition data. The ClientUtils code rightly iterates through the broker list looking for the metadata in random order, but if the first broker in the list is down, the others are never retried in a timely manner. An example stacktrace shows the problem: default-dispatcher-3" prio=5 tid=0x7fef131c6000 nid=0x5f03 runnable [0x0001146d2000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:465) at sun.nio.ch.Net.connect(Net.java:457) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) - locked <0x0007ad4c1b50> (a java.lang.Object) - locked <0x0007ad4c1b70> (a java.lang.Object) - locked <0x0007ad4c1b60> (a java.lang.Object) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) - locked <0x0007ad3f3408> (a java.lang.Object) at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) - locked <0x0007ad3de648> (a java.lang.Object) at kafka.producer.SyncProducer.send(SyncProducer.scala:112) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.Utils$.swallowError(Utils.scala:46) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) at kafka.producer.Producer.send(Producer.scala:76) An eight minute timeout is a non-starter for a clustered (HA) service. One would expect the system to respect the request.timeout.ms config setting, which it does, unless a broker host is down and happens to be first in the shuffled list of brokers to try to get the metadata. I believe this bug is also exacerbated by the fact that the meta data is (rightly) refreshed via the topic.metadata.refresh.interval.ms config setting, which defaults to every 10 minutes. AFAIK, this means that if a single broker is down, every new producer as well as every existing producer has a (1/clusterSize-1) chance of either not starting or hanging for a minimum of 8 minutes, (assuming the tcp connection code times out), every 10 minutes (or whatever topic.metadata.refresh.interval.ms is set to), if I understand correctly. Initializing the SocketChannel in code that doesn't respect the request.timeout.ms setting logically defeats the spirit of the timeout setting as well makes as the iteration code in ClientUtils far less useful: (from fetchTopicMetadata:) val shuffledBrokers = Random.shuffle(brokers) while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) { val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i)) info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics)) try { topicMetadataResponse = producer.send(topicMetadataRequest) Opening the connection with a timeout as Jack suggests seems far preferable to the current situation. Best Regards, -Jonathan On Sep 16, 2014, at 10:08 PM, Jun Rao wrote: > Jack, > > If the broker is down, channel.connect() should throw an IOException, > instead of blocking forever. In your case, is the broker host down? In that > case, the connect call will likely wait for the default tcp connection > timeout, which is 8+ mins. > > Thanks, > > Jun > > On Tue, Sep 16, 2014 at 5:43 PM, Jack Foy wrote: > >> We observe that when a broker is down, Producer.send() can get into a >> state where it will block forever, even when using the async producer. >> >> When a Producer first sends data, it fetches topic metadata from the >> broker cluster. To do this, it shuffles the list of hosts in the cluster, >> then iterates through the list querying each broker. >> >> For each broker in the shuffled list, the Producer creates a SyncProducer >> and invokes SyncProducer.send(). >> SyncProducer.send() creates a BlockingChannel and invokes >> BlockingChannel.connect(). >> BlockingChannel.connect() retrieves a java.nio.channels.SocketChannel, >> sets it to blocking mode, and invokes SocketChannel.connect(), passing the >> current broker hostname. >> >> If the first broker in the list is nonresponsive, SocketChannel.connect() >> will wait forever. >> >> I think the correct change is as follows: >> >> diff --git
Re: kafka 0.8.1: Producer.send() can block forever when a broker is down
Jack, If the broker is down, channel.connect() should throw an IOException, instead of blocking forever. In your case, is the broker host down? In that case, the connect call will likely wait for the default tcp connection timeout, which is 8+ mins. Thanks, Jun On Tue, Sep 16, 2014 at 5:43 PM, Jack Foy wrote: > We observe that when a broker is down, Producer.send() can get into a > state where it will block forever, even when using the async producer. > > When a Producer first sends data, it fetches topic metadata from the > broker cluster. To do this, it shuffles the list of hosts in the cluster, > then iterates through the list querying each broker. > > For each broker in the shuffled list, the Producer creates a SyncProducer > and invokes SyncProducer.send(). > SyncProducer.send() creates a BlockingChannel and invokes > BlockingChannel.connect(). > BlockingChannel.connect() retrieves a java.nio.channels.SocketChannel, > sets it to blocking mode, and invokes SocketChannel.connect(), passing the > current broker hostname. > > If the first broker in the list is nonresponsive, SocketChannel.connect() > will wait forever. > > I think the correct change is as follows: > > diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala > b/core/src/main/scala/kafka/network/BlockingChannel.scala > index eb7bb14..9bb102a 100644 > --- a/core/src/main/scala/kafka/network/BlockingChannel.scala > +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala > @@ -55,7 +55,7 @@ class BlockingChannel( val host: String, > channel.socket.setSoTimeout(readTimeoutMs) > channel.socket.setKeepAlive(true) > channel.socket.setTcpNoDelay(true) > -channel.connect(new InetSocketAddress(host, port)) > +channel.socket.connect(new InetSocketAddress(host, port), > connectTimeoutMs) > > writeChannel = channel > readChannel = Channels.newChannel(channel.socket().getInputStream) > > Is the next step to create a JIRA with this information? Thanks. > > -- > Jack Foy > > > >
kafka 0.8.1: Producer.send() can block forever when a broker is down
We observe that when a broker is down, Producer.send() can get into a state where it will block forever, even when using the async producer. When a Producer first sends data, it fetches topic metadata from the broker cluster. To do this, it shuffles the list of hosts in the cluster, then iterates through the list querying each broker. For each broker in the shuffled list, the Producer creates a SyncProducer and invokes SyncProducer.send(). SyncProducer.send() creates a BlockingChannel and invokes BlockingChannel.connect(). BlockingChannel.connect() retrieves a java.nio.channels.SocketChannel, sets it to blocking mode, and invokes SocketChannel.connect(), passing the current broker hostname. If the first broker in the list is nonresponsive, SocketChannel.connect() will wait forever. I think the correct change is as follows: diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala index eb7bb14..9bb102a 100644 --- a/core/src/main/scala/kafka/network/BlockingChannel.scala +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala @@ -55,7 +55,7 @@ class BlockingChannel( val host: String, channel.socket.setSoTimeout(readTimeoutMs) channel.socket.setKeepAlive(true) channel.socket.setTcpNoDelay(true) -channel.connect(new InetSocketAddress(host, port)) +channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs) writeChannel = channel readChannel = Channels.newChannel(channel.socket().getInputStream) Is the next step to create a JIRA with this information? Thanks. -- Jack Foy