Repository: kafka
Updated Branches:
  refs/heads/trunk 62dc1afb6 -> be0f3502d


Setting broker state as running after publishing to ZK

junrao

Currently, the broker state is set to running before it registers itself in 
ZooKeeper.  This is too early in the broker lifecycle.  If clients use the 
broker state as an indicator that the broker is ready to accept requests, they 
will get errors.  This change is to delay setting the broker state to running 
until it's registered in ZK.

Author: Roger Hoover <[email protected]>

Reviewers: Jun Rao <[email protected]>

Closes #1426 from theduderog/broker-running-after-zk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/be0f3502
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/be0f3502
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/be0f3502

Branch: refs/heads/trunk
Commit: be0f3502da1f703b27d7a3fae5a01325dff44957
Parents: 62dc1af
Author: Roger Hoover <[email protected]>
Authored: Thu May 26 09:50:49 2016 -0700
Committer: Jun Rao <[email protected]>
Committed: Thu May 26 09:50:49 2016 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/server/KafkaServer.scala   |  2 +-
 .../unit/kafka/server/ServerStartupTest.scala   | 28 +++++++++++++++++++-
 2 files changed, 28 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/be0f3502/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index de3054a..f95d9ef 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -214,7 +214,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
         apis = new KafkaApis(socketServer.requestChannel, replicaManager, 
groupCoordinator,
           kafkaController, zkUtils, config.brokerId, config, metadataCache, 
metrics, authorizer)
         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, 
socketServer.requestChannel, apis, config.numIoThreads)
-        brokerState.newState(RunningAsBroker)
 
         Mx4jLoader.maybeLoad()
 
@@ -249,6 +248,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
         /* register broker metrics */
         registerStats()
 
+        brokerState.newState(RunningAsBroker)
         shutdownLatch = new CountDownLatch(1)
         startupComplete.set(true)
         isStartingUp.set(false)

http://git-wip-us.apache.org/repos/asf/kafka/blob/be0f3502/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index 9b49365..b5560c3 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -20,8 +20,8 @@ package kafka.server
 import kafka.utils.ZkUtils
 import kafka.utils.CoreUtils
 import kafka.utils.TestUtils
-
 import kafka.zk.ZooKeeperTestHarness
+import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Test
 
@@ -82,4 +82,30 @@ class ServerStartupTest extends ZooKeeperTestHarness {
     server.shutdown()
     CoreUtils.delete(server.config.logDirs)
   }
+
+  @Test
+  def testBrokerStateRunningAfterZK {
+    val brokerId = 0
+    val mockBrokerState = EasyMock.niceMock(classOf[kafka.server.BrokerState])
+
+    class BrokerStateInterceptor() extends BrokerState {
+      override def newState(newState: BrokerStates): Unit = {
+        val brokers = zkUtils.getAllBrokersInCluster()
+        assertEquals(1, brokers.size)
+        assertEquals(brokerId, brokers.head.id)
+      }
+    }
+
+    class MockKafkaServer(override val config: KafkaConfig, override val 
brokerState: BrokerState = mockBrokerState) extends KafkaServer(config) {}
+
+    val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
+    val server = new MockKafkaServer(KafkaConfig.fromProps(props))
+
+    
EasyMock.expect(mockBrokerState.newState(RunningAsBroker)).andDelegateTo(new 
BrokerStateInterceptor).once()
+    EasyMock.replay(mockBrokerState)
+
+    server.startup()
+    server.shutdown()
+    CoreUtils.delete(server.config.logDirs)
+  }
 }

Reply via email to