Repository: kafka Updated Branches: refs/heads/trunk 0d19f5885 -> e5a0d1398
MINOR: MetadataCache brokerId is not set on first run with generated broker id â¦broker id This is because the id passed into the MetadataCache is the value from the config before the real broker id is generated. Author: Grant Henke <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #1632 from granthenke/metadata-id Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e5a0d139 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e5a0d139 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e5a0d139 Branch: refs/heads/trunk Commit: e5a0d139861b58589233e2bcd3a10e9fb032ddbc Parents: 0d19f58 Author: Grant Henke <[email protected]> Authored: Thu Jul 21 00:49:24 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Thu Jul 21 00:49:24 2016 +0100 ---------------------------------------------------------------------- core/src/main/scala/kafka/server/KafkaServer.scala | 4 +++- core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e5a0d139/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 78c6606..04a07f9 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -131,7 +131,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) var kafkaHealthcheck: KafkaHealthcheck = null - val metadataCache: MetadataCache = new MetadataCache(config.brokerId) + var metadataCache: MetadataCache = null var zkUtils: ZkUtils = null val correlationId: AtomicInteger = new AtomicInteger(0) @@ -188,6 +188,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr config.brokerId = getBrokerId this.logIdent = "[Kafka Server " + config.brokerId + "], " + metadataCache = new MetadataCache(config.brokerId) + socketServer = new SocketServer(config, metrics, kafkaMetricsTime) socketServer.startup() http://git-wip-us.apache.org/repos/asf/kafka/blob/e5a0d139/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala index a5909fe..55eb6f8 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala @@ -52,7 +52,7 @@ class MetadataRequestTest extends BaseRequestTest { assertNotEquals("Controller id should switch to a new broker", controllerId, controllerId2) TestUtils.waitUntilTrue(() => { val metadataResponse2 = sendMetadataRequest(MetadataRequest.allTopics(), 1) - controllerServer2.apis.brokerId == metadataResponse2.controller.id + metadataResponse2.controller != null && controllerServer2.apis.brokerId == metadataResponse2.controller.id }, "Controller id should match the active controller after failover", 5000) }
