Updated Branches: refs/heads/0.8 7a9faa49e -> 6849da050
kafka-992; follow-up patch; Double Check on Broker Registration to Avoid False NodeExist Exception; patched by Guozhang Wang; reviewed by Neha Narkhede and Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6849da05 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6849da05 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6849da05 Branch: refs/heads/0.8 Commit: 6849da0505da1f38edfbfb64e27cff504231142a Parents: 7a9faa4 Author: Guozhang Wang <[email protected]> Authored: Fri Aug 16 21:51:35 2013 -0700 Committer: Jun Rao <[email protected]> Committed: Fri Aug 16 21:51:35 2013 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/cluster/Broker.scala | 2 - .../consumer/ZookeeperConsumerConnector.scala | 24 +---- .../kafka/controller/KafkaController.scala | 25 ++++- .../kafka/server/ZookeeperLeaderElector.scala | 89 +++++----------- core/src/main/scala/kafka/utils/ZkUtils.scala | 101 ++++++++++--------- 5 files changed, 103 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6849da05/core/src/main/scala/kafka/cluster/Broker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index 435c473..b03dea2 100644 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -58,8 +58,6 @@ private[kafka] case class Broker(val id: Int, val host: String, val port: Int) { override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port) - def getZkString(): String = host + ":" + port - def getConnectionString(): String = host + ":" + port def writeTo(buffer: ByteBuffer) { http://git-wip-us.apache.org/repos/asf/kafka/blob/6849da05/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index c2b9b9a..e7a692a 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -220,28 +220,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false) ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern, "timestamp" -> timestamp), valueInQuotes = true)) - while (true) { - try { - createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo) - - info("end registering consumer " + consumerIdString + " in ZK") - return - } catch { - case e: ZkNodeExistsException => { - // An ephemeral node may still exist even after its corresponding session has expired - // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted - // and hence the write succeeds without ZkNodeExistsException - ZkUtils.readDataMaybeNull(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString)._1 match { - case Some(consumerZKString) => { - info("I wrote this conflicted ephemeral node a while back in a different session, " - + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry") - Thread.sleep(config.zkSessionTimeoutMs) - } - case None => // the node disappeared; retry creating the ephemeral node immediately - } - } - } - } + createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, null, (consumerZKString, consumer) => true, config.zkSessionTimeoutMs) + info("end registering consumer " + consumerIdString + " in ZK") } private def sendShutdownToAllQueues() = { http://git-wip-us.apache.org/repos/asf/kafka/blob/6849da05/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 800f900..bde405a 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -28,7 +28,7 @@ import kafka.common._ import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} import kafka.server.{ZookeeperLeaderElector, KafkaConfig} import kafka.utils.ZkUtils._ -import kafka.utils.{Utils, ZkUtils, Logging} +import kafka.utils.{Json, Utils, ZkUtils, Logging} import org.apache.zookeeper.Watcher.Event.KeeperState import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} @@ -74,11 +74,32 @@ trait KafkaControllerMBean { def shutdownBroker(id: Int): Set[TopicAndPartition] } -object KafkaController { +object KafkaController extends Logging { val MBeanName = "kafka.controller:type=KafkaController,name=ControllerOps" val stateChangeLogger = "state.change.logger" val InitialControllerEpoch = 1 val InitialControllerEpochZkVersion = 1 + + def parseControllerId(controllerInfoString: String): Int = { + try { + Json.parseFull(controllerInfoString) match { + case Some(m) => + val controllerInfo = m.asInstanceOf[Map[String, Any]] + return controllerInfo.get("brokerid").get.asInstanceOf[Int] + case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString)) + } + } catch { + case t => + // It may be due to an incompatible controller register version + warn("Failed to parse the controller info as json. " + + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString)) + try { + return controllerInfoString.toInt + } catch { + case t => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t) + } + } + } } class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean { http://git-wip-us.apache.org/repos/asf/kafka/blob/6849da05/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala index d785db9..50e3f79 100644 --- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala +++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala @@ -17,10 +17,11 @@ package kafka.server import kafka.utils.ZkUtils._ -import kafka.utils.{Json, Utils, SystemTime, Logging} +import kafka.utils.{Utils, SystemTime, Logging} import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.I0Itec.zkclient.IZkDataListener import kafka.controller.ControllerContext +import kafka.controller.KafkaController import kafka.common.KafkaException /** @@ -52,57 +53,29 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "brokerid" -> brokerId.toString), valueInQuotes = false) ++ Utils.mapToJsonFields(Map("timestamp" -> timestamp), valueInQuotes = true)) - var electNotDone = true - do { - electNotDone = false - try { - createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, electString) - - info(brokerId + " successfully elected as leader") - leaderId = brokerId - onBecomingLeader() + try { + createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, leaderId, + (controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int], + controllerContext.zkSessionTimeout) + info(brokerId + " successfully elected as leader") + leaderId = brokerId + onBecomingLeader() } catch { case e: ZkNodeExistsException => - readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match { - // If someone else has written the path, then read the broker id - case Some(controllerString) => - try { - Json.parseFull(controllerString) match { - case Some(m) => - val controllerInfo = m.asInstanceOf[Map[String, Any]] - leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int] - if (leaderId != brokerId) { - info("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId)) - } else { - info("I wrote this conflicted ephemeral node a while back in a different session, " - + "hence I will retry") - electNotDone = true - Thread.sleep(controllerContext.zkSessionTimeout) - } - case None => - warn("Error while reading leader info %s on broker %d, may be it is an old version".format(controllerString, brokerId)) - throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. May be it is an old version") - } - } catch { - case t => - // It may be due to an incompatible controller register version - info("Failed to parse the controller info as json. " + - "Probably this controller is still using the old format [%s] of storing the broker id in the zookeeper path".format(controllerString)) - try { - leaderId = controllerString.toInt - info("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId)) - } catch { - case t => throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. This is neither the new or the old format.", t) - } - } - case None => - // The node disappears, retry immediately + // If someone else has written the path, then + leaderId = readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match { + case Some(controller) => KafkaController.parseControllerId(controller) + case None => { + warn("A leader has been elected but just resigned, this will result in another round of election") + -1 + } } + if (leaderId != -1) + debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId)) case e2 => error("Error while electing or becoming leader on broker %d".format(brokerId), e2) - resign() - } - } while (electNotDone) + leaderId = -1 + } amILeader } @@ -114,6 +87,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: def amILeader : Boolean = leaderId == brokerId def resign() = { + leaderId = -1 deletePath(controllerContext.zkClient, electionPath) } @@ -129,25 +103,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: @throws(classOf[Exception]) def handleDataChange(dataPath: String, data: Object) { controllerContext.controllerLock synchronized { - try { - Json.parseFull(data.toString) match { - case Some(m) => - val controllerInfo = m.asInstanceOf[Map[String, Any]] - leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int] - info("New leader is %d".format(leaderId)) - case None => - error("Error while reading the leader info %s".format(data.toString)) - } - } catch { - case t => - // It may be due to an incompatible controller register version - try { - leaderId = data.toString.toInt - info("New leader is %d".format(leaderId)) - } catch { - case t => throw new KafkaException("Failed to parse the leader info from zookeeper: " + data, t) - } - } + leaderId = KafkaController.parseControllerId(data.toString) + info("New leader is %d".format(leaderId)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6849da05/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 9772af8..ba5eacc 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -31,6 +31,7 @@ import kafka.admin._ import kafka.common.{KafkaException, NoEpochForPartitionException} import kafka.controller.ReassignedPartitionsContext import kafka.controller.PartitionAndReplica +import kafka.controller.KafkaController import scala.Some import kafka.controller.LeaderIsrAndControllerEpoch import kafka.common.TopicAndPartition @@ -54,25 +55,7 @@ object ZkUtils extends Logging { def getController(zkClient: ZkClient): Int= { readDataMaybeNull(zkClient, ControllerPath)._1 match { - case Some(controller) => - try { - Json.parseFull(controller) match { - case Some(m) => - val controllerInfo = m.asInstanceOf[Map[String, Any]] - controllerInfo.get("brokerid").get.asInstanceOf[Int] - case None => throw new KafkaException("Failed to parse the controller info json [%s] from zookeeper.".format(controller)) - } - } catch { - case t => - // It may be due to an incompatible controller register version - info("Failed to parse the controller info as json. " + - "Probably this controller is still using the old format [%s] of storing the broker id in the zookeeper path".format(controller)) - try { - controller.toInt - } catch { - case t => throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. This is neither the new or the old format.", t) - } - } + case Some(controller) => KafkaController.parseControllerId(controller) case None => throw new KafkaException("Controller doesn't exist") } } @@ -206,36 +189,21 @@ object ZkUtils extends Logging { Utils.mergeJsonFields(Utils.mapToJsonFields(Map("host" -> host), valueInQuotes = true) ++ Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString, "timestamp" -> timestamp), valueInQuotes = false)) + val expectedBroker = new Broker(id, host, port) - while (true) { - try { - createEphemeralPathExpectConflict(zkClient, brokerIdPath, brokerInfo) + try { + createEphemeralPathExpectConflictHandleZKBug(zkClient, brokerIdPath, brokerInfo, expectedBroker, + (brokerString: String, broker: Any) => Broker.createBroker(broker.asInstanceOf[Broker].id, brokerString).equals(broker.asInstanceOf[Broker]), + timeout) - info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port)) - return - } catch { - case e: ZkNodeExistsException => { - // An ephemeral node may still exist even after its corresponding session has expired - // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted - // and hence the write succeeds without ZkNodeExistsException - ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + id)._1 match { - case Some(brokerZKString) => { - val broker = Broker.createBroker(id, brokerZKString) - if (broker.host == host && broker.port == port) { - info("I wrote this conflicted ephemeral node [%s] a while back in a different session, ".format(brokerZKString) - + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry") - Thread.sleep(timeout) - } else { - // otherwise, throw the runtime exception - throw new RuntimeException("Another broker [%s:%s] other than the current broker [%s:%s] is already registered on the path %s." - .format(broker.host, broker.port, host, port, brokerIdPath)) - } - } - case None => // the node disappeared; retry creating the ephemeral node immediately - } - } - } + } catch { + case e: ZkNodeExistsException => + throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + + "else you have shutdown this broker and restarted it faster than the zookeeper " + + "timeout so it appears to be re-registering.") } + info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port)) } def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = { @@ -318,6 +286,47 @@ object ZkUtils extends Logging { } /** + * Create an ephemeral node with the given path and data. + * Throw NodeExistsException if node already exists. + * Handles the following ZK session timeout bug: + * + * https://issues.apache.org/jira/browse/ZOOKEEPER-1740 + * + * Upon receiving a NodeExistsException, read the data from the conflicted path and + * trigger the checker function comparing the read data and the expected data, + * If the checker function returns true then the above bug might be encountered, back off and retry; + * otherwise re-throw the exception + */ + def createEphemeralPathExpectConflictHandleZKBug(zkClient: ZkClient, path: String, data: String, expectedCallerData: Any, checker: (String, Any) => Boolean, backoffTime: Int): Unit = { + while (true) { + try { + createEphemeralPathExpectConflict(zkClient, path, data) + return + } catch { + case e: ZkNodeExistsException => { + // An ephemeral node may still exist even after its corresponding session has expired + // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted + // and hence the write succeeds without ZkNodeExistsException + ZkUtils.readDataMaybeNull(zkClient, path)._1 match { + case Some(writtenData) => { + if (checker(writtenData, expectedCallerData)) { + info("I wrote this conflicted ephemeral node [%s] at %s a while back in a different session, ".format(data, path) + + "hence I will backoff for this node to be deleted by Zookeeper and retry") + + Thread.sleep(backoffTime) + } else { + throw e + } + } + case None => // the node disappeared; retry creating the ephemeral node immediately + } + } + case e2 => throw e2 + } + } + } + + /** * Create an persistent node with the given path and data. Create parents if necessary. */ def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = {
