kafka-1010; Concurrency issue in getCluster() causes rebalance failure and dead consumer; patched by Sam Meder; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7a9faa49 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7a9faa49 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7a9faa49 Branch: refs/heads/trunk Commit: 7a9faa49ed5c7581cb2bd6c86b68df06a8879fec Parents: 1db824e Author: Sam Meder <[email protected]> Authored: Fri Aug 16 10:13:30 2013 -0700 Committer: Jun Rao <[email protected]> Committed: Fri Aug 16 10:13:30 2013 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 3 ++- core/src/main/scala/kafka/tools/ImportZkOffsets.scala | 3 --- 2 files changed, 2 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7a9faa49/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 17977e7..c2b9b9a 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -399,8 +399,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, for (i <- 0 until config.rebalanceMaxRetries) { info("begin rebalancing consumer " + consumerIdString + " try #" + i) var done = false - val cluster = getCluster(zkClient) + var cluster: Cluster = null try { + cluster = getCluster(zkClient) done = rebalance(cluster) } catch { case e => http://git-wip-us.apache.org/repos/asf/kafka/blob/7a9faa49/core/src/main/scala/kafka/tools/ImportZkOffsets.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala index 63519e1..55709b5 100644 --- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala @@ -96,9 +96,6 @@ object ImportZkOffsets extends Logging { } private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String,String]): Unit = { - val cluster = ZkUtils.getCluster(zkClient) - var partitions: List[String] = Nil - for ((partition, offset) <- partitionOffsets) { debug("updating [" + partition + "] with offset [" + offset + "]")
