Repository: kafka Updated Branches: refs/heads/0.8.1 655e1a8aa -> 39a560789
KAFKA-1317 KafkaServer 0.8.1 not responding to .shutdown() cleanly, possibly related to TopicDeletionManager or MetricsMeter state; reviewed by Neha Narkhede Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/39a56078 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/39a56078 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/39a56078 Branch: refs/heads/0.8.1 Commit: 39a560789e65e28cc91468933487c0b23e1a1db0 Parents: 655e1a8 Author: Timothy Chen <tnac...@gmail.com> Authored: Wed Mar 26 15:35:55 2014 -0700 Committer: Neha Narkhede <neha.narkh...@gmail.com> Committed: Wed Mar 26 15:36:05 2014 -0700 ---------------------------------------------------------------------- .../kafka/controller/KafkaController.scala | 19 ++++++----------- .../kafka/controller/TopicDeletionManager.scala | 22 +++++++++++++------- .../unit/kafka/server/ServerShutdownTest.scala | 20 ++++++++++++++++++ 3 files changed, 41 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/39a56078/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index f12ffc2..2867ef1 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -35,9 +35,9 @@ import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} import java.util.concurrent.atomic.AtomicInteger import org.apache.log4j.Logger +import java.util.concurrent.locks.ReentrantLock import scala.Some import kafka.common.TopicAndPartition -import java.util.concurrent.locks.ReentrantLock class ControllerContext(val zkClient: ZkClient, val zkSessionTimeout: Int) { @@ -335,15 +335,16 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg */ def onControllerResignation() { inLock(controllerContext.controllerLock) { - if (config.autoLeaderRebalanceEnable) - autoRebalanceScheduler.shutdown() - deleteTopicManager.shutdown() Utils.unregisterMBean(KafkaController.MBeanName) + deleteTopicManager.shutdown() partitionStateMachine.shutdown() replicaStateMachine.shutdown() + if(config.autoLeaderRebalanceEnable) + autoRebalanceScheduler.shutdown() if(controllerContext.controllerChannelManager != null) { controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null + info("Controller shutdown complete") } } } @@ -640,15 +641,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg def shutdown() = { inLock(controllerContext.controllerLock) { isRunning = false - partitionStateMachine.shutdown() - replicaStateMachine.shutdown() - if (config.autoLeaderRebalanceEnable) - autoRebalanceScheduler.shutdown() - if(controllerContext.controllerChannelManager != null) { - controllerContext.controllerChannelManager.shutdown() - controllerContext.controllerChannelManager = null - info("Controller shutdown complete") - } + onControllerResignation() } } http://git-wip-us.apache.org/repos/asf/kafka/blob/39a56078/core/src/main/scala/kafka/controller/TopicDeletionManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index 8262e10..6f615cf 100644 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -22,6 +22,7 @@ import kafka.utils.Utils._ import collection.Set import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.api.{StopReplicaResponse, RequestOrResponse} +import java.util.concurrent.locks.ReentrantLock /** * This manages the state machine for topic deletion. @@ -71,9 +72,10 @@ class TopicDeletionManager(controller: KafkaController, val partitionStateMachine = controller.partitionStateMachine val replicaStateMachine = controller.replicaStateMachine var topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted + val deleteLock = new ReentrantLock() var topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++ (initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted) - val deleteTopicsCond = controllerContext.controllerLock.newCondition() + val deleteTopicsCond = deleteLock.newCondition() var deleteTopicStateChanged: Boolean = false var deleteTopicsThread: DeleteTopicsThread = null val isDeleteTopicEnabled = controller.config.deleteTopicEnable @@ -195,11 +197,14 @@ class TopicDeletionManager(controller: KafkaController, * controllerLock should be acquired before invoking this API */ private def awaitTopicDeletionNotification() { - while(!deleteTopicStateChanged) { - info("Waiting for signal to start or continue topic deletion") - deleteTopicsCond.await() + inLock(deleteLock) { + while(!deleteTopicStateChanged) { + info("Waiting for signal to start or continue topic deletion") + + deleteTopicsCond.await() + } + deleteTopicStateChanged = false } - deleteTopicStateChanged = false } /** @@ -207,7 +212,9 @@ class TopicDeletionManager(controller: KafkaController, */ private def resumeTopicDeletionThread() { deleteTopicStateChanged = true - deleteTopicsCond.signal() + inLock(deleteLock) { + deleteTopicsCond.signal() + } } /** @@ -352,8 +359,9 @@ class TopicDeletionManager(controller: KafkaController, class DeleteTopicsThread() extends ShutdownableThread("delete-topics-thread") { val zkClient = controllerContext.zkClient override def doWork() { + awaitTopicDeletionNotification() + inLock(controllerContext.controllerLock) { - awaitTopicDeletionNotification() val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted if(topicsQueuedForDeletion.size > 0) info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(",")) http://git-wip-us.apache.org/repos/asf/kafka/blob/39a56078/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 20fe93e..c7e058f 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -96,5 +96,25 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { producer.close() server.shutdown() Utils.rm(server.config.logDirs) + verifyNonDaemonThreadsStatus + } + + @Test + def testCleanShutdownWithDeleteTopicEnabled() { + val newProps = TestUtils.createBrokerConfig(0, port) + newProps.setProperty("delete.topic.enable", "true") + val newConfig = new KafkaConfig(newProps) + var server = new KafkaServer(newConfig) + server.startup() + server.shutdown() + server.awaitShutdown() + Utils.rm(server.config.logDirs) + verifyNonDaemonThreadsStatus + } + + def verifyNonDaemonThreadsStatus() { + assertEquals(0, Thread.getAllStackTraces.keySet().toArray + .map(_.asInstanceOf[Thread]) + .count(t => !t.isDaemon && t.isAlive && t.getClass.getCanonicalName.toLowerCase.startsWith("kafka"))) } }