KAFKA-992 Double Check on Broker Registration to Avoid False NodeExist Exception; reviewed by Neha Narkhede and Swapnil Ghike
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f3f8fa5c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f3f8fa5c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f3f8fa5c Branch: refs/heads/trunk Commit: f3f8fa5c8c17ade2205ef0f80d9092cae640327d Parents: 81a9f6a Author: Guozhang Wang <[email protected]> Authored: Mon Aug 5 10:05:44 2013 -0700 Committer: Neha Narkhede <[email protected]> Committed: Mon Aug 5 10:05:55 2013 -0700 ---------------------------------------------------------------------- .../scala/kafka/server/KafkaZooKeeper.scala | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 40 ++++++++++++++++---- 2 files changed, 33 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f3f8fa5c/core/src/main/scala/kafka/server/KafkaZooKeeper.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala index 0e6c656..553640f 100644 --- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala +++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala @@ -48,7 +48,7 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging { else config.hostName val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt - ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port, jmxPort) + ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port, config.zkSessionTimeoutMs, jmxPort) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/f3f8fa5c/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index d53d511..0072a1a 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -181,19 +181,43 @@ object ZkUtils extends Logging { replicas.contains(brokerId.toString) } - def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, jmxPort: Int) { + def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, timeout: Int, jmxPort: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id + val timestamp = "\"" + SystemTime.milliseconds.toString + "\"" val brokerInfo = Utils.mergeJsonFields(Utils.mapToJsonFields(Map("host" -> host), valueInQuotes = true) ++ - Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString), + Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString, "timestamp" -> timestamp), valueInQuotes = false)) - try { - createEphemeralPathExpectConflict(zkClient, brokerIdPath, brokerInfo) - } catch { - case e: ZkNodeExistsException => - throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.") + + while (true) { + try { + createEphemeralPathExpectConflict(zkClient, brokerIdPath, brokerInfo) + + info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port)) + return + } catch { + case e: ZkNodeExistsException => { + // An ephemeral node may still exist even after its corresponding session has expired + // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted + // and hence the write succeeds without ZkNodeExistsException + ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + id)._1 match { + case Some(brokerZKString) => { + val broker = Broker.createBroker(id, brokerZKString) + if (broker.host == host && broker.port == port) { + info("I wrote this conflicted ephemeral node [%s] a while back in a different session, ".format(brokerZKString) + + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry") + Thread.sleep(timeout) + } else { + // otherwise, throw the runtime exception + throw new RuntimeException("Another broker [%s:%s] other than the current broker [%s:%s] is already registered on the path %s." + .format(broker.host, broker.port, host, port, brokerIdPath)) + } + } + case None => // the node disappeared; retry creating the ephemeral node immediately + } + } + } } - info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port)) } def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
