Repository: kafka
Updated Branches:
  refs/heads/trunk fe0335ea1 -> 3e89d2bc5


KAFKA-3525; getSequenceId should return 1 for first data node creation

ZkUtils.getSequenceId() method is used to generate broker id sequence numbers. 
During startup, each broker updates the data at /brokers/seqid zk path and 
returns stat.getVersion as next sequence id.

stat.getVersion returns "1" for first data update. So ZkUtils.getSequenceId() 
should return "1" on first data update.

Author: Manikumar reddy O <[email protected]>

Reviewers: Flavio Junqueira <[email protected]>, Ismael Juma <[email protected]>

Closes #1224 from omkreddy/KAFKA-3525


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3e89d2bc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3e89d2bc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3e89d2bc

Branch: refs/heads/trunk
Commit: 3e89d2bc59b1bf6c73b262a36019df01af7a958c
Parents: fe0335e
Author: Manikumar reddy O <[email protected]>
Authored: Wed May 11 01:52:43 2016 +0100
Committer: Ismael Juma <[email protected]>
Committed: Wed May 11 01:52:43 2016 +0100

----------------------------------------------------------------------
 .../main/scala/kafka/server/KafkaConfig.scala   |  2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 20 ++++++--------------
 .../server/ServerGenerateBrokerIdTest.scala     |  8 ++++++++
 3 files changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3e89d2bc/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 596cc58..dff2b66 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -356,7 +356,7 @@ object KafkaConfig {
   val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader"
   val ZkEnableSecureAclsDoc = "Set client to use secure ACLs"
   /** ********* General Configuration ***********/
-  val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on 
the server? When enabled the value configured for $MaxReservedBrokerIdProp 
should be reviewed."
+  val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on 
the server. When enabled the value configured for $MaxReservedBrokerIdProp 
should be reviewed."
   val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id"
   val BrokerIdDoc = "The broker id for this server. If unset, a unique broker 
id will be generated." +
   "To avoid conflicts between zookeeper generated broker id's and user 
configured broker id's, generated broker ids" +

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e89d2bc/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala 
b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 81eb24a..ec72029 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -792,24 +792,16 @@ class ZkUtils(val zkClient: ZkClient,
   /**
     * This API produces a sequence number by creating / updating given path in 
zookeeper
     * It uses the stat returned by the zookeeper and return the version. Every 
time
-    * client updates the path stat.version gets incremented
+    * client updates the path stat.version gets incremented. Starting value of 
sequence number is 1.
     */
   def getSequenceId(path: String, acls: java.util.List[ACL] = DefaultAcls): 
Int = {
+    def writeToZk: Int = zkClient.writeDataReturnStat(path, "", -1).getVersion
     try {
-      val stat = zkClient.writeDataReturnStat(path, "", -1)
-      stat.getVersion
+      writeToZk
     } catch {
-      case e: ZkNoNodeException => {
-        createParentPath(BrokerSequenceIdPath, acls)
-        try {
-          zkClient.createPersistent(BrokerSequenceIdPath, "", acls)
-          0
-        } catch {
-          case e: ZkNodeExistsException =>
-            val stat = zkClient.writeDataReturnStat(BrokerSequenceIdPath, "", 
-1)
-            stat.getVersion
-        }
-      }
+      case e1: ZkNoNodeException =>
+        makeSurePersistentPathExists(path)
+        writeToZk
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e89d2bc/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index 8e25366..312edd4 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -187,4 +187,12 @@ class ServerGenerateBrokerIdTest extends 
ZooKeeperTestHarness {
     }
     true
   }
+
+  @Test
+  def testGetSequenceIdMethod() {
+    val path = "/test/seqid"
+    (1 to 10).foreach { seqid =>
+      assertEquals(seqid, zkUtils.getSequenceId(path))
+    }
+  }
 }

Reply via email to