Repository: kafka
Updated Branches:
  refs/heads/trunk c06542ede -> 5c337d759


KAFKA-3012: Avoid reserved.broker.max.id collisions on upgrade

Provides a configuration to opt out of broker id generation.

Author: Grant Henke <[email protected]>

Reviewers: Gwen Shapira

Closes #762 from granthenke/id-generation


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5c337d75
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5c337d75
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5c337d75

Branch: refs/heads/trunk
Commit: 5c337d759892ae6d46d6901a24e9b97cebd2a4da
Parents: c06542e
Author: Grant Henke <[email protected]>
Authored: Mon Jan 18 18:39:55 2016 -0800
Committer: Gwen Shapira <[email protected]>
Committed: Mon Jan 18 18:39:55 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaConfig.scala  | 11 ++++++++++-
 core/src/main/scala/kafka/server/KafkaServer.scala  |  6 +++---
 .../scala/unit/kafka/server/KafkaConfigTest.scala   |  4 +++-
 .../kafka/server/ServerGenerateBrokerIdTest.scala   | 16 ++++++++++++++++
 docs/upgrade.html                                   |  8 +++++++-
 5 files changed, 39 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5c337d75/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 54b79ab..4911809 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -42,6 +42,7 @@ object Defaults {
   val ZkEnableSecureAcls = false
 
   /** ********* General Configuration ***********/
+  val BrokerIdGenerationEnable = true
   val MaxReservedBrokerId = 1000
   val BrokerId = -1
   val MessageMaxBytes = 1000000 + MessageSet.LogOverhead
@@ -190,6 +191,7 @@ object KafkaConfig {
   val ZkSyncTimeMsProp = "zookeeper.sync.time.ms"
   val ZkEnableSecureAclsProp = "zookeeper.set.acl"
   /** ********* General Configuration ***********/
+  val BrokerIdGenerationEnableProp = "broker.id.generation.enable"
   val MaxReservedBrokerIdProp = "reserved.broker.max.id"
   val BrokerIdProp = "broker.id"
   val MessageMaxBytesProp = "message.max.bytes"
@@ -338,6 +340,7 @@ object KafkaConfig {
   val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader"
   val ZkEnableSecureAclsDoc = "Set client to use secure ACLs"
   /** ********* General Configuration ***********/
+  val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on 
the server? When enabled the value configured for $MaxReservedBrokerIdProp 
should be reviewed."
   val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id"
   val BrokerIdDoc = "The broker id for this server. If unset, a unique broker 
id will be generated." +
   "To avoid conflicts between zookeeper generated broker id's and user 
configured broker id's, generated broker ids" +
@@ -522,6 +525,7 @@ object KafkaConfig {
       .define(ZkEnableSecureAclsProp, BOOLEAN, Defaults.ZkEnableSecureAcls, 
HIGH, ZkEnableSecureAclsDoc)
 
       /** ********* General Configuration ***********/
+      .define(BrokerIdGenerationEnableProp, BOOLEAN, 
Defaults.BrokerIdGenerationEnable, MEDIUM, BrokerIdGenerationEnableDoc)
       .define(MaxReservedBrokerIdProp, INT, Defaults.MaxReservedBrokerId, 
atLeast(0), MEDIUM, MaxReservedBrokerIdDoc)
       .define(BrokerIdProp, INT, Defaults.BrokerId, HIGH, BrokerIdDoc)
       .define(MessageMaxBytesProp, INT, Defaults.MessageMaxBytes, atLeast(0), 
HIGH, MessageMaxBytesDoc)
@@ -718,6 +722,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean) extends Abstra
   val zkEnableSecureAcls: Boolean = 
getBoolean(KafkaConfig.ZkEnableSecureAclsProp)
 
   /** ********* General Configuration ***********/
+  val brokerIdGenerationEnable: Boolean = 
getBoolean(KafkaConfig.BrokerIdGenerationEnableProp)
   val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
   var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
   val numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp)
@@ -927,7 +932,11 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean) extends Abstra
   validateValues()
 
   private def validateValues() {
-    require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must 
be equal or greater than -1 and not greater than reserved.broker.max.id")
+    if(brokerIdGenerationEnable) {
+      require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id 
must be equal or greater than -1 and not greater than reserved.broker.max.id")
+    } else {
+      require(brokerId >= 0, "broker.id must be equal or greater than 0")
+    }
     require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 
1")
     require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or 
greater than 0")
     require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, 
"log.retention.ms must be unlimited (-1) or, equal or greater than 1")

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c337d75/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index aaa6ea9..454633e 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -609,9 +609,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
   }
 
   /**
-    * Generates new brokerId or reads from meta.properties based on following 
conditions
+    * Generates new brokerId if enabled or reads from meta.properties based on 
following conditions
     * <ol>
-    * <li> config has no broker.id provided , generates a broker.id based on 
Zookeeper's sequence
+    * <li> config has no broker.id provided and broker id generation is 
enabled, generates a broker.id based on Zookeeper's sequence
     * <li> stored broker.id in meta.properties doesn't match in all the 
log.dirs throws InconsistentBrokerIdException
     * <li> config has broker.id and meta.properties contains broker.id if they 
don't match throws InconsistentBrokerIdException
     * <li> config has broker.id and there is no meta.properties file, creates 
new meta.properties and stores broker.id
@@ -637,7 +637,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
       throw new InconsistentBrokerIdException("Failed to match brokerId across 
logDirs")
     else if(brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last != 
brokerId)
       throw new InconsistentBrokerIdException("Configured brokerId %s doesn't 
match stored brokerId %s in meta.properties".format(brokerId, brokerIdSet.last))
-    else if(brokerIdSet.size == 0 && brokerId < 0)  // generate a new brokerId 
from Zookeeper
+    else if(brokerIdSet.size == 0 && brokerId < 0 && 
config.brokerIdGenerationEnable)  // generate a new brokerId from Zookeeper
       brokerId = generateBrokerId
     else if(brokerIdSet.size == 1) // pick broker.id from meta.properties
       brokerId = brokerIdSet.last

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c337d75/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 8a5038f..9ddc2c1 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -408,7 +408,7 @@ class KafkaConfigTest {
         case KafkaConfig.RequestTimeoutMsProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
 
         case KafkaConfig.AuthorizerClassNameProp => //ignore string
-          
+
         case KafkaConfig.PortProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.HostNameProp => // ignore string
         case KafkaConfig.AdvertisedHostNameProp => //ignore string
@@ -526,6 +526,7 @@ class KafkaConfigTest {
     defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
     // For ZkConnectionTimeoutMs
     defaults.put(KafkaConfig.ZkSessionTimeoutMsProp, "1234")
+    defaults.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
     defaults.put(KafkaConfig.MaxReservedBrokerIdProp, "1")
     defaults.put(KafkaConfig.BrokerIdProp, "1")
     defaults.put(KafkaConfig.HostNameProp, "127.0.0.1")
@@ -542,6 +543,7 @@ class KafkaConfigTest {
     val config = KafkaConfig.fromProps(defaults)
     Assert.assertEquals("127.0.0.1:2181", config.zkConnect)
     Assert.assertEquals(1234, config.zkConnectionTimeoutMs)
+    Assert.assertEquals(false, config.brokerIdGenerationEnable)
     Assert.assertEquals(1, config.maxReservedBrokerId)
     Assert.assertEquals(1, config.brokerId)
     Assert.assertEquals("127.0.0.1", config.hostName)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c337d75/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index 9afb2ca..60ec561 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -82,6 +82,22 @@ class ServerGenerateBrokerIdTest extends 
ZooKeeperTestHarness {
   }
 
   @Test
+  def testDisableGeneratedBrokerId() {
+    val props3 = TestUtils.createBrokerConfig(3, zkConnect)
+    props3.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+    // Set reserve broker ids to cause collision and ensure disabling broker 
id generation ignores the setting
+    props3.put(KafkaConfig.MaxReservedBrokerIdProp, "0")
+    val config3 = KafkaConfig.fromProps(props3)
+    val server3 = new KafkaServer(config3)
+    server3.startup()
+    assertEquals(server3.config.brokerId,3)
+    server3.shutdown()
+    assertTrue(verifyBrokerMetadata(server3.config.logDirs,3))
+    CoreUtils.rm(server3.config.logDirs)
+    TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
+  }
+
+  @Test
   def testMultipleLogDirsMetaProps() {
     // add multiple logDirs and check if the generate brokerId is stored in 
all of them
     val logDirs = props1.getProperty("log.dir")+ "," + 
TestUtils.tempDir().getAbsolutePath +

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c337d75/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index eccd626..c723957 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -39,7 +39,6 @@
     <li> Broker IDs above 1000 are now reserved by default to automatically 
assigned broker IDs. If your cluster has existing broker IDs above that 
threshold make sure to increase the reserved.broker.max.id broker configuration 
property accordingly. </li>
     <li> Configuration parameter replica.lag.max.messages was removed. 
Partition leaders will no longer consider the number of lagging messages when 
deciding which replicas are in sync. </li>
     <li> Configuration parameter replica.lag.time.max.ms now refers not just 
to the time passed since last fetch request from replica, but also to time 
since the replica last caught up. Replicas that are still fetching messages 
from leaders but did not catch up to the latest messages in 
replica.lag.time.max.ms will be considered out of sync. </li>
-    <li> Configuration parameter log.cleaner.enable is now true by default. 
This means topics with a cleanup.policy=compact will now be compacted by 
default, and 128 MB of heap will be allocated to the cleaner process via 
log.cleaner.dedupe.buffer.size. You may want to review 
log.cleaner.dedupe.buffer.size and the other log.cleaner configuration values 
based on your usage of compacted topics. </li>
     <li> MirrorMaker no longer supports multiple target clusters. As a result 
it will only accept a single --consumer.config parameter. To mirror multiple 
source clusters, you will need at least one MirrorMaker instance per source 
cluster, each with its own consumer configuration. </li>
     <li> Tools packaged under <em>org.apache.kafka.clients.tools.*</em> have 
been moved to <em>org.apache.kafka.tools.*</em>. All included scripts will 
still function as usual, only custom code directly importing these classes will 
be affected. </li>
     <li> The default Kafka JVM performance options 
(KAFKA_JVM_PERFORMANCE_OPTS) have been changed in kafka-run-class.sh. </li>
@@ -49,6 +48,13 @@
     <li> By default all command line tools will print all logging messages to 
stderr instead of stdout. </li>
 </ul>
 
+<h5><a id="upgrade_901_notable" href="#upgrade_901_notable">Notable changes in 
0.9.0.1</a></h5>
+
+<ul>
+    <li> The new broker id generation feature can be disable by setting 
broker.id.generation.enable to false. </li>
+    <li> Configuration parameter log.cleaner.enable is now true by default. 
This means topics with a cleanup.policy=compact will now be compacted by 
default, and 128 MB of heap will be allocated to the cleaner process via 
log.cleaner.dedupe.buffer.size. You may want to review 
log.cleaner.dedupe.buffer.size and the other log.cleaner configuration values 
based on your usage of compacted topics. </li>
+</ul>
+
 <h5>Deprecations in 0.9.0.0</h5>
 
 <ul>

Reply via email to