Re: kafka 0.8.1: Producer.send() can block forever when a broker is down

2014-09-17 Thread Neha Narkhede
Make sense. Please file a JIRA and attach a patch there. It will be great
to add a simple test case as well.

Thanks,
Neha

On Wed, Sep 17, 2014 at 8:25 AM, Jonathan Weeks Gmail <
jonathanbwe...@gmail.com> wrote:

>
> The issue is that even with one down broker, the rest of the cluster is
> up, but unreachable from the producer client in this case, which defeats
> the high availability characteristics of clustering.
>
> For any producer trying to use the service, it is "russian roulette"
> whether you will get meta-data back when asking for topic/partition data.
>
> The ClientUtils code rightly iterates through the broker list looking for
> the metadata in random order, but if the first broker in the list is down,
> the others are never retried in a timely manner.
>
> An example stacktrace shows the problem:
>
> default-dispatcher-3" prio=5 tid=0x7fef131c6000 nid=0x5f03 runnable
> [0x0001146d2000]
>   java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> - locked <0x0007ad4c1b50> (a java.lang.Object)
> - locked <0x0007ad4c1b70> (a java.lang.Object)
> - locked <0x0007ad4c1b60> (a java.lang.Object)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> - locked <0x0007ad3f3408> (a java.lang.Object)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
> at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> - locked <0x0007ad3de648> (a java.lang.Object)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> at kafka.utils.Utils$.swallow(Utils.scala:167)
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> at kafka.utils.Utils$.swallowError(Utils.scala:46)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> at kafka.producer.Producer.send(Producer.scala:76)
>
> An eight minute timeout is a non-starter for a clustered (HA) service. One
> would expect the system to respect the request.timeout.ms config setting,
> which it does, unless a broker host is down and happens to be first in the
> shuffled list of brokers to try to get the metadata.
>
> I believe this bug is also exacerbated by the fact that the meta data is
> (rightly) refreshed via the topic.metadata.refresh.interval.ms config
> setting, which defaults to every 10 minutes. AFAIK, this means that if a
> single broker is down, every new producer as well as every existing
> producer has a (1/clusterSize-1) chance of either not starting or hanging
> for a minimum of 8 minutes, (assuming the tcp connection code times out),
> every 10 minutes (or whatever topic.metadata.refresh.interval.ms is set
> to), if I understand correctly.
>
> Initializing the SocketChannel in code that doesn't respect the
> request.timeout.ms setting logically defeats the spirit of the timeout
> setting as well makes as the iteration code in ClientUtils far less useful:
>
> (from fetchTopicMetadata:)
> val shuffledBrokers = Random.shuffle(brokers)
> while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) {
>   val producer: SyncProducer =
> ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))
>   info("Fetching metadata from broker %s with correlation id %d for %d
> topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics))
>   try {
> topicMetadataResponse = producer.send(topicMetadataRequest)
>
> Opening the connection with a timeout as Jack suggests seems far
> preferable to the current situation.
>
> Best Regards,
>
> -Jonathan
>
>
> On Sep 16, 2014, at 10:08 PM, Jun Rao  wrote:
> > Jack,
> >
> > If the broker is down, channel.connect() should throw an IOException,
> > instead of blocking forever. In your case, is the broker host down? In
> that
> > case, the connect call will likely wait for the default tcp connection
> > timeout, which is 8+ mins.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Sep 16, 2014 at 5:43 PM, Jack Foy  wrote:
> >
> >> We observe that when a broker is down, Producer.send() can get into a
> >> state where it will block forever, even when using the async producer.
> >>
> >> When a Producer first sends data, it fetches topic metadata from the
> >> broker cluster. To do this, it shuffles the list of hosts in the
> cluster,
> >> then iterates through the list querying each broker.
> >>
> >> For each broker in the shuffled list, the Producer creates a
> SyncProducer
> >> and invokes SyncProducer.send().
> >> SyncProducer.send() creates a BlockingChannel and invokes

Re: kafka 0.8.1: Producer.send() can block forever when a broker is down

2014-09-17 Thread Jonathan Weeks Gmail

The issue is that even with one down broker, the rest of the cluster is up, but 
unreachable from the producer client in this case, which defeats the high 
availability characteristics of clustering.

For any producer trying to use the service, it is "russian roulette" whether 
you will get meta-data back when asking for topic/partition data.

The ClientUtils code rightly iterates through the broker list looking for the 
metadata in random order, but if the first broker in the list is down, the 
others are never retried in a timely manner.

An example stacktrace shows the problem:

default-dispatcher-3" prio=5 tid=0x7fef131c6000 nid=0x5f03 runnable 
[0x0001146d2000]
  java.lang.Thread.State: RUNNABLE
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
- locked <0x0007ad4c1b50> (a java.lang.Object)
- locked <0x0007ad4c1b70> (a java.lang.Object)
- locked <0x0007ad4c1b60> (a java.lang.Object)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
- locked <0x0007ad3f3408> (a java.lang.Object)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
- locked <0x0007ad3de648> (a java.lang.Object)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at 
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
at kafka.utils.Utils$.swallow(Utils.scala:167)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at kafka.producer.Producer.send(Producer.scala:76)

An eight minute timeout is a non-starter for a clustered (HA) service. One 
would expect the system to respect the request.timeout.ms config setting, which 
it does, unless a broker host is down and happens to be first in the shuffled 
list of brokers to try to get the metadata.

I believe this bug is also exacerbated by the fact that the meta data is 
(rightly) refreshed via the topic.metadata.refresh.interval.ms config setting, 
which defaults to every 10 minutes. AFAIK, this means that if a single broker 
is down, every new producer as well as every existing producer has a 
(1/clusterSize-1) chance of either not starting or hanging for a minimum of 8 
minutes, (assuming the tcp connection code times out), every 10 minutes (or 
whatever topic.metadata.refresh.interval.ms is set to), if I understand 
correctly.  

Initializing the SocketChannel in code that doesn't respect the 
request.timeout.ms setting logically defeats the spirit of the timeout setting 
as well makes as the iteration code in ClientUtils far less useful:

(from fetchTopicMetadata:)
val shuffledBrokers = Random.shuffle(brokers)
while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) {
  val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, 
shuffledBrokers(i))
  info("Fetching metadata from broker %s with correlation id %d for %d topic(s) 
%s".format(shuffledBrokers(i), correlationId, topics.size, topics))
  try {
topicMetadataResponse = producer.send(topicMetadataRequest)
  
Opening the connection with a timeout as Jack suggests seems far preferable to 
the current situation.

Best Regards,

-Jonathan


On Sep 16, 2014, at 10:08 PM, Jun Rao  wrote:
> Jack,
> 
> If the broker is down, channel.connect() should throw an IOException,
> instead of blocking forever. In your case, is the broker host down? In that
> case, the connect call will likely wait for the default tcp connection
> timeout, which is 8+ mins.
> 
> Thanks,
> 
> Jun
> 
> On Tue, Sep 16, 2014 at 5:43 PM, Jack Foy  wrote:
> 
>> We observe that when a broker is down, Producer.send() can get into a
>> state where it will block forever, even when using the async producer.
>> 
>> When a Producer first sends data, it fetches topic metadata from the
>> broker cluster. To do this, it shuffles the list of hosts in the cluster,
>> then iterates through the list querying each broker.
>> 
>> For each broker in the shuffled list, the Producer creates a SyncProducer
>> and invokes SyncProducer.send().
>> SyncProducer.send() creates a BlockingChannel and invokes
>> BlockingChannel.connect().
>> BlockingChannel.connect() retrieves a java.nio.channels.SocketChannel,
>> sets it to blocking mode, and invokes SocketChannel.connect(), passing the
>> current broker hostname.
>> 
>> If the first broker in the list is nonresponsive, SocketChannel.connect()
>> will wait forever.
>> 
>> I think the correct change is as follows:
>> 
>> diff --git

Re: kafka 0.8.1: Producer.send() can block forever when a broker is down

2014-09-16 Thread Jun Rao
Jack,

If the broker is down, channel.connect() should throw an IOException,
instead of blocking forever. In your case, is the broker host down? In that
case, the connect call will likely wait for the default tcp connection
timeout, which is 8+ mins.

Thanks,

Jun

On Tue, Sep 16, 2014 at 5:43 PM, Jack Foy  wrote:

> We observe that when a broker is down, Producer.send() can get into a
> state where it will block forever, even when using the async producer.
>
> When a Producer first sends data, it fetches topic metadata from the
> broker cluster. To do this, it shuffles the list of hosts in the cluster,
> then iterates through the list querying each broker.
>
> For each broker in the shuffled list, the Producer creates a SyncProducer
> and invokes SyncProducer.send().
> SyncProducer.send() creates a BlockingChannel and invokes
> BlockingChannel.connect().
> BlockingChannel.connect() retrieves a java.nio.channels.SocketChannel,
> sets it to blocking mode, and invokes SocketChannel.connect(), passing the
> current broker hostname.
>
> If the first broker in the list is nonresponsive, SocketChannel.connect()
> will wait forever.
>
> I think the correct change is as follows:
>
> diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala
> b/core/src/main/scala/kafka/network/BlockingChannel.scala
> index eb7bb14..9bb102a 100644
> --- a/core/src/main/scala/kafka/network/BlockingChannel.scala
> +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
> @@ -55,7 +55,7 @@ class BlockingChannel( val host: String,
>  channel.socket.setSoTimeout(readTimeoutMs)
>  channel.socket.setKeepAlive(true)
>  channel.socket.setTcpNoDelay(true)
> -channel.connect(new InetSocketAddress(host, port))
> +channel.socket.connect(new InetSocketAddress(host, port),
> connectTimeoutMs)
>
>  writeChannel = channel
>  readChannel = Channels.newChannel(channel.socket().getInputStream)
>
> Is the next step to create a JIRA with this information? Thanks.
>
> --
> Jack Foy 
>
>
>
>


kafka 0.8.1: Producer.send() can block forever when a broker is down

2014-09-16 Thread Jack Foy
We observe that when a broker is down, Producer.send() can get into a state 
where it will block forever, even when using the async producer.

When a Producer first sends data, it fetches topic metadata from the broker 
cluster. To do this, it shuffles the list of hosts in the cluster, then 
iterates through the list querying each broker.

For each broker in the shuffled list, the Producer creates a SyncProducer and 
invokes SyncProducer.send(). 
SyncProducer.send() creates a BlockingChannel and invokes 
BlockingChannel.connect().
BlockingChannel.connect() retrieves a java.nio.channels.SocketChannel, sets it 
to blocking mode, and invokes SocketChannel.connect(), passing the current 
broker hostname. 

If the first broker in the list is nonresponsive, SocketChannel.connect() will 
wait forever. 

I think the correct change is as follows:

diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala 
b/core/src/main/scala/kafka/network/BlockingChannel.scala
index eb7bb14..9bb102a 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -55,7 +55,7 @@ class BlockingChannel( val host: String,
 channel.socket.setSoTimeout(readTimeoutMs)
 channel.socket.setKeepAlive(true)
 channel.socket.setTcpNoDelay(true)
-channel.connect(new InetSocketAddress(host, port))
+channel.socket.connect(new InetSocketAddress(host, port), 
connectTimeoutMs)

 writeChannel = channel
 readChannel = Channels.newChannel(channel.socket().getInputStream)

Is the next step to create a JIRA with this information? Thanks.

-- 
Jack Foy