Sam Meder created KAFKA-1010:
--------------------------------

             Summary: Concurrency issue in getCluster() causes rebalance 
failure and dead consumer
                 Key: KAFKA-1010
                 URL: https://issues.apache.org/jira/browse/KAFKA-1010
             Project: Kafka
          Issue Type: Bug
          Components: consumer
    Affects Versions: 0.8
            Reporter: Sam Meder
            Assignee: Neha Narkhede
            Priority: Blocker
             Fix For: 0.8


We're seeing the following stack trace on the consumer when brokers are 
(forcefully) removed from the cluster:

Thu Aug 15 05:10:06 GMT 2013 Exception in thread "main" 
org.I0Itec.zkclient.exception.ZkNoNodeException: 
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode 
for /brokers/ids/4
at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
at kafka.utils.ZkUtils$.readData(ZkUtils.scala:407)
at kafka.utils.ZkUtils$$anonfun$getCluster$1.apply(ZkUtils.scala:453)
at kafka.utils.ZkUtils$$anonfun$getCluster$1.apply(ZkUtils.scala:452)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at 
scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:549)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
at 
scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:596)
at kafka.utils.ZkUtils$.getCluster(ZkUtils.scala:452)
at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:394)
at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:391)
at 
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
at 
kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:206)
at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:77)
at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:89)

I'm pretty sure this is due to the following logic in getCluster():

    val nodes = getChildrenParentMayNotExist(zkClient, BrokerIdsPath)
    for (node <- nodes) {
      val brokerZKString = readData(zkClient, BrokerIdsPath + "/" + node)._1
      cluster.add(Broker.createBroker(node.toInt, brokerZKString))
    }

which is obviously not safe since the nodes retrieved in the first call may 
have disappeared by the time we iterate to get the values.

getCluster() seems to only be used in 
ZookeeperConsumerConnector.syncedRebalance and in 
ImportZkOffsets.updateZkOffsets (which doesn't actually look like it is using 
the values), so the simplest solution may be to just move the getCluster() call 
into the try block in syncedRebalance and kill the usage in the other call.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to