Repository: kafka
Updated Branches:
  refs/heads/0.8.1 655e1a8aa -> 39a560789


KAFKA-1317 KafkaServer 0.8.1 not responding to .shutdown() cleanly, possibly 
related to TopicDeletionManager or MetricsMeter state; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/39a56078
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/39a56078
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/39a56078

Branch: refs/heads/0.8.1
Commit: 39a560789e65e28cc91468933487c0b23e1a1db0
Parents: 655e1a8
Author: Timothy Chen <tnac...@gmail.com>
Authored: Wed Mar 26 15:35:55 2014 -0700
Committer: Neha Narkhede <neha.narkh...@gmail.com>
Committed: Wed Mar 26 15:36:05 2014 -0700

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 19 ++++++-----------
 .../kafka/controller/TopicDeletionManager.scala | 22 +++++++++++++-------
 .../unit/kafka/server/ServerShutdownTest.scala  | 20 ++++++++++++++++++
 3 files changed, 41 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/39a56078/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index f12ffc2..2867ef1 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -35,9 +35,9 @@ import org.I0Itec.zkclient.{IZkDataListener, 
IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
 import java.util.concurrent.atomic.AtomicInteger
 import org.apache.log4j.Logger
+import java.util.concurrent.locks.ReentrantLock
 import scala.Some
 import kafka.common.TopicAndPartition
-import java.util.concurrent.locks.ReentrantLock
 
 class ControllerContext(val zkClient: ZkClient,
                         val zkSessionTimeout: Int) {
@@ -335,15 +335,16 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
    */
   def onControllerResignation() {
     inLock(controllerContext.controllerLock) {
-      if (config.autoLeaderRebalanceEnable)
-        autoRebalanceScheduler.shutdown()
-      deleteTopicManager.shutdown()
       Utils.unregisterMBean(KafkaController.MBeanName)
+      deleteTopicManager.shutdown()
       partitionStateMachine.shutdown()
       replicaStateMachine.shutdown()
+      if(config.autoLeaderRebalanceEnable)
+        autoRebalanceScheduler.shutdown()
       if(controllerContext.controllerChannelManager != null) {
         controllerContext.controllerChannelManager.shutdown()
         controllerContext.controllerChannelManager = null
+        info("Controller shutdown complete")
       }
     }
   }
@@ -640,15 +641,7 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
   def shutdown() = {
     inLock(controllerContext.controllerLock) {
       isRunning = false
-      partitionStateMachine.shutdown()
-      replicaStateMachine.shutdown()
-      if (config.autoLeaderRebalanceEnable)
-        autoRebalanceScheduler.shutdown()
-      if(controllerContext.controllerChannelManager != null) {
-        controllerContext.controllerChannelManager.shutdown()
-        controllerContext.controllerChannelManager = null
-        info("Controller shutdown complete")
-      }
+      onControllerResignation()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/39a56078/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala 
b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 8262e10..6f615cf 100644
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -22,6 +22,7 @@ import kafka.utils.Utils._
 import collection.Set
 import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.api.{StopReplicaResponse, RequestOrResponse}
+import java.util.concurrent.locks.ReentrantLock
 
 /**
  * This manages the state machine for topic deletion.
@@ -71,9 +72,10 @@ class TopicDeletionManager(controller: KafkaController,
   val partitionStateMachine = controller.partitionStateMachine
   val replicaStateMachine = controller.replicaStateMachine
   var topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ 
initialTopicsToBeDeleted
+  val deleteLock = new ReentrantLock()
   var topicsIneligibleForDeletion: mutable.Set[String] = 
mutable.Set.empty[String] ++
     (initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted)
-  val deleteTopicsCond = controllerContext.controllerLock.newCondition()
+  val deleteTopicsCond = deleteLock.newCondition()
   var deleteTopicStateChanged: Boolean = false
   var deleteTopicsThread: DeleteTopicsThread = null
   val isDeleteTopicEnabled = controller.config.deleteTopicEnable
@@ -195,11 +197,14 @@ class TopicDeletionManager(controller: KafkaController,
    * controllerLock should be acquired before invoking this API
    */
   private def awaitTopicDeletionNotification() {
-    while(!deleteTopicStateChanged) {
-      info("Waiting for signal to start or continue topic deletion")
-      deleteTopicsCond.await()
+    inLock(deleteLock) {
+      while(!deleteTopicStateChanged) {
+        info("Waiting for signal to start or continue topic deletion")
+
+        deleteTopicsCond.await()
+      }
+      deleteTopicStateChanged = false
     }
-    deleteTopicStateChanged = false
   }
 
   /**
@@ -207,7 +212,9 @@ class TopicDeletionManager(controller: KafkaController,
    */
   private def resumeTopicDeletionThread() {
     deleteTopicStateChanged = true
-    deleteTopicsCond.signal()
+    inLock(deleteLock) {
+      deleteTopicsCond.signal()
+    }
   }
 
   /**
@@ -352,8 +359,9 @@ class TopicDeletionManager(controller: KafkaController,
   class DeleteTopicsThread() extends 
ShutdownableThread("delete-topics-thread") {
     val zkClient = controllerContext.zkClient
     override def doWork() {
+      awaitTopicDeletionNotification()
+
       inLock(controllerContext.controllerLock) {
-        awaitTopicDeletionNotification()
         val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted
         if(topicsQueuedForDeletion.size > 0)
           info("Handling deletion for topics " + 
topicsQueuedForDeletion.mkString(","))

http://git-wip-us.apache.org/repos/asf/kafka/blob/39a56078/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 20fe93e..c7e058f 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -96,5 +96,25 @@ class ServerShutdownTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     producer.close()
     server.shutdown()
     Utils.rm(server.config.logDirs)
+    verifyNonDaemonThreadsStatus
+  }
+
+  @Test
+  def testCleanShutdownWithDeleteTopicEnabled() {
+    val newProps = TestUtils.createBrokerConfig(0, port)
+    newProps.setProperty("delete.topic.enable", "true")
+    val newConfig = new KafkaConfig(newProps)
+    var server = new KafkaServer(newConfig)
+    server.startup()
+    server.shutdown()
+    server.awaitShutdown()
+    Utils.rm(server.config.logDirs)
+    verifyNonDaemonThreadsStatus
+  }
+
+  def verifyNonDaemonThreadsStatus() {
+    assertEquals(0, Thread.getAllStackTraces.keySet().toArray
+      .map(_.asInstanceOf[Thread])
+      .count(t => !t.isDaemon && t.isAlive && 
t.getClass.getCanonicalName.toLowerCase.startsWith("kafka")))
   }
 }

Reply via email to