Updated Branches: refs/heads/0.8 d217f4cc2 -> 7640bee3d
KAFKA-1029 Zookeeper leader election stuck in ephemeral node retry loop; reviewed by Neha and Guozhang Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7640bee3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7640bee3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7640bee3 Branch: refs/heads/0.8 Commit: 7640bee3d281e0658f97997738a1f1f599fd4c07 Parents: d217f4c Author: Sam Meder <[email protected]> Authored: Tue Aug 27 10:16:33 2013 -0700 Committer: Neha Narkhede <[email protected]> Committed: Tue Aug 27 10:16:56 2013 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7640bee3/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala index 50e3f79..f1f0625 100644 --- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala +++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala @@ -42,19 +42,19 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: def startup { controllerContext.controllerLock synchronized { + controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener) elect } } def elect: Boolean = { - controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener) val timestamp = SystemTime.milliseconds.toString val electString = Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "brokerid" -> brokerId.toString), valueInQuotes = false) ++ Utils.mapToJsonFields(Map("timestamp" -> timestamp), valueInQuotes = true)) try { - createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, leaderId, + createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId, (controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int], controllerContext.zkSessionTimeout) info(brokerId + " successfully elected as leader")
