I think we forgot to commit the new BrokerState file.
On Tue, May 6, 2014 at 4:36 PM, <jun...@apache.org> wrote: > Repository: kafka > Updated Branches: > refs/heads/trunk 44c39c4ea -> 9b6bf4078 > > > kafka-1384; Log Broker state; patched by Timothy Chen; reviewed by Joel > Koshy and Jun Rao > > > Project: http://git-wip-us.apache.org/repos/asf/kafka/repo > Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9b6bf407 > Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9b6bf407 > Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9b6bf407 > > Branch: refs/heads/trunk > Commit: 9b6bf407874ef0fda12d8b2cc7f8331ce4aebeea > Parents: 44c39c4 > Author: Timothy Chen <tnac...@gmail.com> > Authored: Tue May 6 16:36:09 2014 -0700 > Committer: Jun Rao <jun...@gmail.com> > Committed: Tue May 6 16:36:09 2014 -0700 > > ---------------------------------------------------------------------- > .../kafka/controller/KafkaController.scala | 7 ++- > core/src/main/scala/kafka/log/LogManager.scala | 6 +- > .../main/scala/kafka/server/KafkaServer.scala | 26 ++++++-- > .../kafka/server/KafkaServerStartable.scala | 8 +++ > .../scala/unit/kafka/log/LogManagerTest.scala | 64 +++++++++++++++++--- > .../server/HighwatermarkPersistenceTest.scala | 1 + > .../unit/kafka/server/ReplicaManagerTest.scala | 1 + > 7 files changed, 97 insertions(+), 16 deletions(-) > ---------------------------------------------------------------------- > > > > http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/controller/KafkaController.scala > ---------------------------------------------------------------------- > diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala > b/core/src/main/scala/kafka/controller/KafkaController.scala > index 401bf1e..2fa1341 100644 > --- a/core/src/main/scala/kafka/controller/KafkaController.scala > +++ b/core/src/main/scala/kafka/controller/KafkaController.scala > @@ -28,7 +28,6 @@ import kafka.cluster.Broker > import kafka.common._ > import kafka.log.LogConfig > import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} > -import kafka.server.{ZookeeperLeaderElector, KafkaConfig} > import kafka.utils.ZkUtils._ > import kafka.utils._ > import kafka.utils.Utils._ > @@ -37,6 +36,8 @@ import org.I0Itec.zkclient.{IZkDataListener, > IZkStateListener, ZkClient} > import org.I0Itec.zkclient.exception.{ZkNodeExistsException, > ZkNoNodeException} > import java.util.concurrent.atomic.AtomicInteger > import java.util.concurrent.locks.ReentrantLock > +import scala.None > +import kafka.server._ > import scala.Some > import kafka.common.TopicAndPartition > > @@ -154,7 +155,7 @@ object KafkaController extends Logging { > } > } > > -class KafkaController(val config : KafkaConfig, zkClient: ZkClient) > extends Logging with KafkaMetricsGroup with KafkaControllerMBean { > +class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val > brokerState: BrokerState) extends Logging with KafkaMetricsGroup with > KafkaControllerMBean { > this.logIdent = "[Controller " + config.brokerId + "]: " > private var isRunning = true > private val stateChangeLogger = KafkaController.stateChangeLogger > @@ -316,6 +317,7 @@ class KafkaController(val config : KafkaConfig, > zkClient: ZkClient) extends Logg > controllerContext.allTopics.foreach(topic => > partitionStateMachine.registerPartitionChangeListener(topic)) > Utils.registerMBean(this, KafkaController.MBeanName) > info("Broker %d is ready to serve as the new controller with epoch > %d".format(config.brokerId, epoch)) > + brokerState.newState(RunningAsController) > maybeTriggerPartitionReassignment() > maybeTriggerPreferredReplicaElection() > /* send partition leadership info to all live brokers */ > @@ -351,6 +353,7 @@ class KafkaController(val config : KafkaConfig, > zkClient: ZkClient) extends Logg > controllerContext.controllerChannelManager.shutdown() > controllerContext.controllerChannelManager = null > } > + brokerState.newState(RunningAsBroker) > } > } > > > > http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/log/LogManager.scala > ---------------------------------------------------------------------- > diff --git a/core/src/main/scala/kafka/log/LogManager.scala > b/core/src/main/scala/kafka/log/LogManager.scala > index ab72cff..1946c94 100644 > --- a/core/src/main/scala/kafka/log/LogManager.scala > +++ b/core/src/main/scala/kafka/log/LogManager.scala > @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit > import kafka.utils._ > import scala.collection._ > import kafka.common.{TopicAndPartition, KafkaException} > -import kafka.server.OffsetCheckpoint > +import kafka.server.{RecoveringFromUncleanShutdown, BrokerState, > OffsetCheckpoint} > > /** > * The entry point to the kafka log management subsystem. The log manager > is responsible for log creation, retrieval, and cleaning. > @@ -43,6 +43,7 @@ class LogManager(val logDirs: Array[File], > val flushCheckpointMs: Long, > val retentionCheckMs: Long, > scheduler: Scheduler, > + val brokerState: BrokerState, > private val time: Time) extends Logging { > val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" > val LockFile = ".lock" > @@ -109,6 +110,9 @@ class LogManager(val logDirs: Array[File], > val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) > if(cleanShutDownFile.exists()) > info("Found clean shutdown file. Skipping recovery for all logs > in data directory '%s'".format(dir.getAbsolutePath)) > + else > + brokerState.newState(RecoveringFromUncleanShutdown) > + > for(dir <- subDirs) { > if(dir.isDirectory) { > info("Loading log '" + dir.getName + "'") > > > http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/server/KafkaServer.scala > ---------------------------------------------------------------------- > diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala > b/core/src/main/scala/kafka/server/KafkaServer.scala > index c208f83..c22e51e 100644 > --- a/core/src/main/scala/kafka/server/KafkaServer.scala > +++ b/core/src/main/scala/kafka/server/KafkaServer.scala > @@ -31,16 +31,19 @@ import kafka.cluster.Broker > import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest} > import kafka.common.ErrorMapping > import kafka.network.{Receive, BlockingChannel, SocketServer} > +import kafka.metrics.KafkaMetricsGroup > +import com.yammer.metrics.core.Gauge > > /** > * Represents the lifecycle of a single Kafka broker. Handles all > functionality required > * to start up and shutdown a single Kafka node. > */ > -class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) > extends Logging { > +class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) > extends Logging with KafkaMetricsGroup { > this.logIdent = "[Kafka Server " + config.brokerId + "], " > private var isShuttingDown = new AtomicBoolean(false) > private var shutdownLatch = new CountDownLatch(1) > private var startupComplete = new AtomicBoolean(false) > + val brokerState: BrokerState = new BrokerState > val correlationId: AtomicInteger = new AtomicInteger(0) > var socketServer: SocketServer = null > var requestHandlerPool: KafkaRequestHandlerPool = null > @@ -54,12 +57,20 @@ class KafkaServer(val config: KafkaConfig, time: Time > = SystemTime) extends Logg > val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) > var zkClient: ZkClient = null > > + newGauge( > + "BrokerState", > + new Gauge[Int] { > + def value = brokerState.currentState > + } > + ) > + > /** > * Start up API for bringing up a single instance of the Kafka server. > * Instantiates the LogManager, the SocketServer and the request > handlers - KafkaRequestHandlers > */ > def startup() { > info("starting") > + brokerState.newState(Starting) > isShuttingDown = new AtomicBoolean(false) > shutdownLatch = new CountDownLatch(1) > > @@ -70,7 +81,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = > SystemTime) extends Logg > zkClient = initZk() > > /* start log manager */ > - logManager = createLogManager(zkClient) > + logManager = createLogManager(zkClient, brokerState) > logManager.startup() > > socketServer = new SocketServer(config.brokerId, > @@ -88,11 +99,12 @@ class KafkaServer(val config: KafkaConfig, time: Time > = SystemTime) extends Logg > /* start offset manager */ > offsetManager = createOffsetManager() > > - kafkaController = new KafkaController(config, zkClient) > + kafkaController = new KafkaController(config, zkClient, brokerState) > > /* start processing requests */ > apis = new KafkaApis(socketServer.requestChannel, replicaManager, > offsetManager, zkClient, config.brokerId, config, kafkaController) > requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, > socketServer.requestChannel, apis, config.numIoThreads) > + brokerState.newState(RunningAsBroker) > > Mx4jLoader.maybeLoad() > > @@ -143,6 +155,7 @@ class KafkaServer(val config: KafkaConfig, time: Time > = SystemTime) extends Logg > var prevController : Broker = null > var shutdownSuceeded : Boolean = false > try { > + brokerState.newState(PendingControlledShutdown) > while (!shutdownSuceeded && remainingRetries > 0) { > remainingRetries = remainingRetries - 1 > > @@ -177,7 +190,9 @@ class KafkaServer(val config: KafkaConfig, time: Time > = SystemTime) extends Logg > // send the controlled shutdown request > val request = new > ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId) > channel.send(request) > + > response = channel.receive() > + > val shutdownResponse = > ControlledShutdownResponse.readFrom(response.buffer) > if (shutdownResponse.errorCode == ErrorMapping.NoError && > shutdownResponse.partitionsRemaining != null && > shutdownResponse.partitionsRemaining.size == 0) { > @@ -223,6 +238,7 @@ class KafkaServer(val config: KafkaConfig, time: Time > = SystemTime) extends Logg > val canShutdown = isShuttingDown.compareAndSet(false, true) > if (canShutdown) { > Utils.swallow(controlledShutdown()) > + brokerState.newState(BrokerShuttingDown) > if(kafkaHealthcheck != null) > Utils.swallow(kafkaHealthcheck.shutdown()) > if(socketServer != null) > @@ -243,6 +259,7 @@ class KafkaServer(val config: KafkaConfig, time: Time > = SystemTime) extends Logg > if(zkClient != null) > Utils.swallow(zkClient.close()) > > + brokerState.newState(NotRunning) > shutdownLatch.countDown() > startupComplete.set(false) > info("shut down completed") > @@ -256,7 +273,7 @@ class KafkaServer(val config: KafkaConfig, time: Time > = SystemTime) extends Logg > > def getLogManager(): LogManager = logManager > > - private def createLogManager(zkClient: ZkClient): LogManager = { > + private def createLogManager(zkClient: ZkClient, brokerState: > BrokerState): LogManager = { > val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, > segmentMs = 60L * 60L * 1000L * > config.logRollHours, > flushInterval = > config.logFlushIntervalMessages, > @@ -289,6 +306,7 @@ class KafkaServer(val config: KafkaConfig, time: Time > = SystemTime) extends Logg > flushCheckpointMs = > config.logFlushOffsetCheckpointIntervalMs, > retentionCheckMs = config.logCleanupIntervalMs, > scheduler = kafkaScheduler, > + brokerState = brokerState, > time = time) > } > > > > http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/server/KafkaServerStartable.scala > ---------------------------------------------------------------------- > diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala > b/core/src/main/scala/kafka/server/KafkaServerStartable.scala > index acda52b..cef3b84 100644 > --- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala > +++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala > @@ -52,6 +52,14 @@ class KafkaServerStartable(val serverConfig: > KafkaConfig) extends Logging { > } > } > > + /** > + * Allow setting broker state from the startable. > + * This is needed when a custom kafka server startable want to emit new > states that it introduces. > + */ > + def setServerState(newState: Byte) { > + server.brokerState.newState(newState) > + } > + > def awaitShutdown() = > server.awaitShutdown > > > > http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/log/LogManagerTest.scala > ---------------------------------------------------------------------- > diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala > b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala > index be1a1ee..d03d4c4 100644 > --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala > +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala > @@ -21,10 +21,9 @@ import java.io._ > import junit.framework.Assert._ > import org.junit.Test > import org.scalatest.junit.JUnit3Suite > -import kafka.server.KafkaConfig > +import kafka.server.{BrokerState, OffsetCheckpoint} > import kafka.common._ > import kafka.utils._ > -import kafka.server.OffsetCheckpoint > > class LogManagerTest extends JUnit3Suite { > > @@ -49,7 +48,8 @@ class LogManagerTest extends JUnit3Suite { > flushCheckpointMs = 100000L, > retentionCheckMs = 1000L, > scheduler = time.scheduler, > - time = time) > + time = time, > + brokerState = new BrokerState()) > logManager.startup > logDir = logManager.logDirs(0) > } > @@ -125,7 +125,18 @@ class LogManagerTest extends JUnit3Suite { > logManager.shutdown() > > val config = logConfig.copy(segmentSize = 10 * (setSize - 1), > retentionSize = 5L * 10L * setSize + 10L) > - logManager = new LogManager(Array(logDir), Map(), config, > cleanerConfig, 1000L, 100000L, 1000L, time.scheduler, time) > + logManager = new LogManager( > + logDirs = Array(logDir), > + topicConfigs = Map(), > + defaultConfig = config, > + cleanerConfig = cleanerConfig, > + flushCheckMs = 1000L, > + flushCheckpointMs = 100000L, > + retentionCheckMs = 1000L, > + scheduler = time.scheduler, > + brokerState = new BrokerState(), > + time = time > + ) > logManager.startup > > // create a log > @@ -165,7 +176,18 @@ class LogManagerTest extends JUnit3Suite { > def testTimeBasedFlush() { > logManager.shutdown() > val config = logConfig.copy(flushMs = 1000) > - logManager = new LogManager(Array(logDir), Map(), config, > cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time) > + logManager = new LogManager( > + logDirs = Array(logDir), > + topicConfigs = Map(), > + defaultConfig = config, > + cleanerConfig = cleanerConfig, > + flushCheckMs = 1000L, > + flushCheckpointMs = 10000L, > + retentionCheckMs = 1000L, > + scheduler = time.scheduler, > + brokerState = new BrokerState(), > + time = time > + ) > logManager.startup > val log = logManager.createLog(TopicAndPartition(name, 0), config) > val lastFlush = log.lastFlushTime > @@ -187,7 +209,18 @@ class LogManagerTest extends JUnit3Suite { > TestUtils.tempDir(), > TestUtils.tempDir()) > logManager.shutdown() > - logManager = new LogManager(dirs, Map(), logConfig, cleanerConfig, > 1000L, 10000L, 1000L, time.scheduler, time) > + logManager = new LogManager( > + logDirs = dirs, > + topicConfigs = Map(), > + defaultConfig = logConfig, > + cleanerConfig = cleanerConfig, > + flushCheckMs = 1000L, > + flushCheckpointMs = 10000L, > + retentionCheckMs = 1000L, > + scheduler = time.scheduler, > + brokerState = new BrokerState(), > + time = time > + ) > > // verify that logs are always assigned to the least loaded partition > for(partition <- 0 until 20) { > @@ -204,7 +237,18 @@ class LogManagerTest extends JUnit3Suite { > @Test > def testTwoLogManagersUsingSameDirFails() { > try { > - new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, > 1000L, 10000L, 1000L, time.scheduler, time) > + new LogManager( > + logDirs = Array(logDir), > + topicConfigs = Map(), > + defaultConfig = logConfig, > + cleanerConfig = cleanerConfig, > + flushCheckMs = 1000L, > + flushCheckpointMs = 10000L, > + retentionCheckMs = 1000L, > + scheduler = time.scheduler, > + brokerState = new BrokerState(), > + time = time > + ) > fail("Should not be able to create a second log manager instance > with the same data directory") > } catch { > case e: KafkaException => // this is good > @@ -234,7 +278,8 @@ class LogManagerTest extends JUnit3Suite { > flushCheckpointMs = 100000L, > retentionCheckMs = 1000L, > scheduler = time.scheduler, > - time = time) > + time = time, > + brokerState = new BrokerState()) > logManager.startup > verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), > logManager) > } > @@ -256,7 +301,8 @@ class LogManagerTest extends JUnit3Suite { > flushCheckpointMs = 100000L, > retentionCheckMs = 1000L, > scheduler = time.scheduler, > - time = time) > + time = time, > + brokerState = new BrokerState()) > logManager.startup > verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), > logManager) > } > > > http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala > ---------------------------------------------------------------------- > diff --git > a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala > b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala > index a78f7cf..558a5d6 100644 > --- > a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala > +++ > b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala > @@ -40,6 +40,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { > > flushCheckpointMs = 10000L, > retentionCheckMs > = 30000, > scheduler = new > KafkaScheduler(1), > + brokerState = > new BrokerState(), > time = new > MockTime)) > > @After > > > http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala > ---------------------------------------------------------------------- > diff --git > a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala > b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala > index 41ebc7a..518d416 100644 > --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala > +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala > @@ -69,6 +69,7 @@ class ReplicaManagerTest extends JUnit3Suite { > flushCheckpointMs = 100000L, > retentionCheckMs = 1000L, > scheduler = time.scheduler, > + brokerState = new BrokerState(), > time = time) > } > > >