Repository: kafka Updated Branches: refs/heads/trunk fe0335ea1 -> 3e89d2bc5
KAFKA-3525; getSequenceId should return 1 for first data node creation ZkUtils.getSequenceId() method is used to generate broker id sequence numbers. During startup, each broker updates the data at /brokers/seqid zk path and returns stat.getVersion as next sequence id. stat.getVersion returns "1" for first data update. So ZkUtils.getSequenceId() should return "1" on first data update. Author: Manikumar reddy O <[email protected]> Reviewers: Flavio Junqueira <[email protected]>, Ismael Juma <[email protected]> Closes #1224 from omkreddy/KAFKA-3525 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3e89d2bc Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3e89d2bc Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3e89d2bc Branch: refs/heads/trunk Commit: 3e89d2bc59b1bf6c73b262a36019df01af7a958c Parents: fe0335e Author: Manikumar reddy O <[email protected]> Authored: Wed May 11 01:52:43 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Wed May 11 01:52:43 2016 +0100 ---------------------------------------------------------------------- .../main/scala/kafka/server/KafkaConfig.scala | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 20 ++++++-------------- .../server/ServerGenerateBrokerIdTest.scala | 8 ++++++++ 3 files changed, 15 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3e89d2bc/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 596cc58..dff2b66 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -356,7 +356,7 @@ object KafkaConfig { val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader" val ZkEnableSecureAclsDoc = "Set client to use secure ACLs" /** ********* General Configuration ***********/ - val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on the server? When enabled the value configured for $MaxReservedBrokerIdProp should be reviewed." + val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on the server. When enabled the value configured for $MaxReservedBrokerIdProp should be reviewed." val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id" val BrokerIdDoc = "The broker id for this server. If unset, a unique broker id will be generated." + "To avoid conflicts between zookeeper generated broker id's and user configured broker id's, generated broker ids" + http://git-wip-us.apache.org/repos/asf/kafka/blob/3e89d2bc/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 81eb24a..ec72029 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -792,24 +792,16 @@ class ZkUtils(val zkClient: ZkClient, /** * This API produces a sequence number by creating / updating given path in zookeeper * It uses the stat returned by the zookeeper and return the version. Every time - * client updates the path stat.version gets incremented + * client updates the path stat.version gets incremented. Starting value of sequence number is 1. */ def getSequenceId(path: String, acls: java.util.List[ACL] = DefaultAcls): Int = { + def writeToZk: Int = zkClient.writeDataReturnStat(path, "", -1).getVersion try { - val stat = zkClient.writeDataReturnStat(path, "", -1) - stat.getVersion + writeToZk } catch { - case e: ZkNoNodeException => { - createParentPath(BrokerSequenceIdPath, acls) - try { - zkClient.createPersistent(BrokerSequenceIdPath, "", acls) - 0 - } catch { - case e: ZkNodeExistsException => - val stat = zkClient.writeDataReturnStat(BrokerSequenceIdPath, "", -1) - stat.getVersion - } - } + case e1: ZkNoNodeException => + makeSurePersistentPathExists(path) + writeToZk } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3e89d2bc/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala index 8e25366..312edd4 100755 --- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala @@ -187,4 +187,12 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { } true } + + @Test + def testGetSequenceIdMethod() { + val path = "/test/seqid" + (1 to 10).foreach { seqid => + assertEquals(seqid, zkUtils.getSequenceId(path)) + } + } }
