Repository: kafka
Updated Branches:
  refs/heads/trunk 0d19f5885 -> e5a0d1398


MINOR: MetadataCache brokerId is not set on first run with generated broker id

…broker id

This is because the id passed into the MetadataCache is the value from the 
config before the real broker id is generated.

Author: Grant Henke <[email protected]>

Reviewers: Ismael Juma <[email protected]>

Closes #1632 from granthenke/metadata-id


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e5a0d139
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e5a0d139
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e5a0d139

Branch: refs/heads/trunk
Commit: e5a0d139861b58589233e2bcd3a10e9fb032ddbc
Parents: 0d19f58
Author: Grant Henke <[email protected]>
Authored: Thu Jul 21 00:49:24 2016 +0100
Committer: Ismael Juma <[email protected]>
Committed: Thu Jul 21 00:49:24 2016 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaServer.scala              | 4 +++-
 core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala | 2 +-
 2 files changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e5a0d139/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 78c6606..04a07f9 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -131,7 +131,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
   val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
 
   var kafkaHealthcheck: KafkaHealthcheck = null
-  val metadataCache: MetadataCache = new MetadataCache(config.brokerId)
+  var metadataCache: MetadataCache = null
 
   var zkUtils: ZkUtils = null
   val correlationId: AtomicInteger = new AtomicInteger(0)
@@ -188,6 +188,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
         config.brokerId =  getBrokerId
         this.logIdent = "[Kafka Server " + config.brokerId + "], "
 
+        metadataCache = new MetadataCache(config.brokerId)
+
         socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
         socketServer.startup()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5a0d139/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index a5909fe..55eb6f8 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -52,7 +52,7 @@ class MetadataRequestTest extends BaseRequestTest {
     assertNotEquals("Controller id should switch to a new broker", 
controllerId, controllerId2)
     TestUtils.waitUntilTrue(() => {
       val metadataResponse2 = sendMetadataRequest(MetadataRequest.allTopics(), 
1)
-      controllerServer2.apis.brokerId == metadataResponse2.controller.id
+      metadataResponse2.controller != null && controllerServer2.apis.brokerId 
== metadataResponse2.controller.id
     }, "Controller id should match the active controller after failover", 5000)
   }
 

Reply via email to