Repository: kafka Updated Branches: refs/heads/trunk 62dc1afb6 -> be0f3502d
Setting broker state as running after publishing to ZK junrao Currently, the broker state is set to running before it registers itself in ZooKeeper. This is too early in the broker lifecycle. If clients use the broker state as an indicator that the broker is ready to accept requests, they will get errors. This change is to delay setting the broker state to running until it's registered in ZK. Author: Roger Hoover <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #1426 from theduderog/broker-running-after-zk Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/be0f3502 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/be0f3502 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/be0f3502 Branch: refs/heads/trunk Commit: be0f3502da1f703b27d7a3fae5a01325dff44957 Parents: 62dc1af Author: Roger Hoover <[email protected]> Authored: Thu May 26 09:50:49 2016 -0700 Committer: Jun Rao <[email protected]> Committed: Thu May 26 09:50:49 2016 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/server/KafkaServer.scala | 2 +- .../unit/kafka/server/ServerStartupTest.scala | 28 +++++++++++++++++++- 2 files changed, 28 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/be0f3502/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index de3054a..f95d9ef 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -214,7 +214,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator, kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) - brokerState.newState(RunningAsBroker) Mx4jLoader.maybeLoad() @@ -249,6 +248,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr /* register broker metrics */ registerStats() + brokerState.newState(RunningAsBroker) shutdownLatch = new CountDownLatch(1) startupComplete.set(true) isStartingUp.set(false) http://git-wip-us.apache.org/repos/asf/kafka/blob/be0f3502/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala index 9b49365..b5560c3 100755 --- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala @@ -20,8 +20,8 @@ package kafka.server import kafka.utils.ZkUtils import kafka.utils.CoreUtils import kafka.utils.TestUtils - import kafka.zk.ZooKeeperTestHarness +import org.easymock.EasyMock import org.junit.Assert._ import org.junit.Test @@ -82,4 +82,30 @@ class ServerStartupTest extends ZooKeeperTestHarness { server.shutdown() CoreUtils.delete(server.config.logDirs) } + + @Test + def testBrokerStateRunningAfterZK { + val brokerId = 0 + val mockBrokerState = EasyMock.niceMock(classOf[kafka.server.BrokerState]) + + class BrokerStateInterceptor() extends BrokerState { + override def newState(newState: BrokerStates): Unit = { + val brokers = zkUtils.getAllBrokersInCluster() + assertEquals(1, brokers.size) + assertEquals(brokerId, brokers.head.id) + } + } + + class MockKafkaServer(override val config: KafkaConfig, override val brokerState: BrokerState = mockBrokerState) extends KafkaServer(config) {} + + val props = TestUtils.createBrokerConfig(brokerId, zkConnect) + val server = new MockKafkaServer(KafkaConfig.fromProps(props)) + + EasyMock.expect(mockBrokerState.newState(RunningAsBroker)).andDelegateTo(new BrokerStateInterceptor).once() + EasyMock.replay(mockBrokerState) + + server.startup() + server.shutdown() + CoreUtils.delete(server.config.logDirs) + } }
