[ 
https://issues.apache.org/jira/browse/KAFKA-1010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13742365#comment-13742365
 ] 

Sam Meder commented on KAFKA-1010:
----------------------------------

Git formatted patch is now attached.
                
> Concurrency issue in getCluster() causes rebalance failure and dead consumer
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-1010
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1010
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8
>            Reporter: Sam Meder
>            Assignee: Neha Narkhede
>            Priority: Blocker
>             Fix For: 0.8
>
>         Attachments: get_cluster_0_8_git.patch
>
>
> We're seeing the following stack trace on the consumer when brokers are 
> (forcefully) removed from the cluster:
> Thu Aug 15 05:10:06 GMT 2013 Exception in thread "main" 
> org.I0Itec.zkclient.exception.ZkNoNodeException: 
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /brokers/ids/4
> at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> at kafka.utils.ZkUtils$.readData(ZkUtils.scala:407)
> at kafka.utils.ZkUtils$$anonfun$getCluster$1.apply(ZkUtils.scala:453)
> at kafka.utils.ZkUtils$$anonfun$getCluster$1.apply(ZkUtils.scala:452)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:549)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:596)
> at kafka.utils.ZkUtils$.getCluster(ZkUtils.scala:452)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:394)
> at 
> scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
> at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
> at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:391)
> at 
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
> at 
> kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:206)
> at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:77)
> at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:89)
> I'm pretty sure this is due to the following logic in getCluster():
>     val nodes = getChildrenParentMayNotExist(zkClient, BrokerIdsPath)
>     for (node <- nodes) {
>       val brokerZKString = readData(zkClient, BrokerIdsPath + "/" + node)._1
>       cluster.add(Broker.createBroker(node.toInt, brokerZKString))
>     }
> which is obviously not safe since the nodes retrieved in the first call may 
> have disappeared by the time we iterate to get the values.
> getCluster() seems to only be used in 
> ZookeeperConsumerConnector.syncedRebalance and in 
> ImportZkOffsets.updateZkOffsets (which doesn't actually look like it is using 
> the values), so the simplest solution may be to just move the getCluster() 
> call into the try block in syncedRebalance and kill the usage in the other 
> call.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to