[ 
https://issues.apache.org/jira/browse/KAFKA-6879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16468075#comment-16468075
 ] 

ASF GitHub Bot commented on KAFKA-6879:
---------------------------------------

ijuma closed pull request #4977: KAFKA-6879; Invoke session init callbacks 
outside lock to avoid deadlock
URL: https://github.com/apache/kafka/pull/4977
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index bc721e39f96..d3d1a81c41f 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -160,7 +160,10 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
       override def beforeInitializingSession(): Unit = {
         val expireEvent = new Expire
         eventManager.clearAndPut(expireEvent)
-        expireEvent.waitUntilProcessed()
+
+        // Block initialization of the new session until the expiration event 
is being handled,
+        // which ensures that all pending events have been processed before 
creating the new session
+        expireEvent.waitUntilProcessingStarted()
       }
     })
     eventManager.put(Startup)
@@ -1518,17 +1521,17 @@ class KafkaController(val config: KafkaConfig, 
zkClient: KafkaZkClient, time: Ti
 
   // We can't make this a case object due to the countDownLatch field
   class Expire extends ControllerEvent {
-    private val countDownLatch = new CountDownLatch(1)
+    private val processingStarted = new CountDownLatch(1)
     override def state = ControllerState.ControllerChange
 
     override def process(): Unit = {
-      countDownLatch.countDown()
+      processingStarted.countDown()
       activeControllerId = -1
       onControllerResignation()
     }
 
-    def waitUntilProcessed(): Unit = {
-      countDownLatch.await()
+    def waitUntilProcessingStarted(): Unit = {
+      processingStarted.await()
     }
   }
 
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala 
b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index 5407934e987..24eb17770ec 100755
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -99,11 +99,15 @@ class KafkaScheduler(val threads: Int,
     }
   }
 
-  def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: 
TimeUnit) {
+  def scheduleOnce(name: String, fun: () => Unit): Unit = {
+    schedule(name, fun, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS)
+  }
+
+  def schedule(name: String, fun: () => Unit, delay: Long, period: Long, unit: 
TimeUnit) {
     debug("Scheduling task %s with initial delay %d ms and period %d ms."
         .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), 
TimeUnit.MILLISECONDS.convert(period, unit)))
     this synchronized {
-      ensureRunning
+      ensureRunning()
       val runnable = CoreUtils.runnable {
         try {
           trace("Beginning execution of scheduled task '%s'.".format(name))
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 5c4cd685565..5cb127c3c7e 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -59,7 +59,7 @@ class ZooKeeperClient(connectString: String,
   private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, 
ZNodeChildChangeHandler]().asScala
   private val inFlightRequests = new Semaphore(maxInFlightRequests)
   private val stateChangeHandlers = new ConcurrentHashMap[String, 
StateChangeHandler]().asScala
-  private[zookeeper] val expiryScheduler = new KafkaScheduler(0, 
"zk-session-expiry-handler")
+  private[zookeeper] val expiryScheduler = new KafkaScheduler(threads = 1, 
"zk-session-expiry-handler")
 
   private val metricNames = Set[String]()
 
@@ -325,43 +325,65 @@ class ZooKeeperClient(connectString: String,
     zooKeeper
   }
   
-  private def initialize(): Unit = {
-    if (!connectionState.isAlive) {
-      zooKeeper.close()
-      info(s"Initializing a new session to $connectString.")
-      // retry forever until ZooKeeper can be instantiated
-      var connected = false
-      while (!connected) {
-        try {
-          zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZooKeeperClientWatcher)
-          connected = true
-        } catch {
-          case e: Exception =>
-            info("Error when recreating ZooKeeper, retrying after a short 
sleep", e)
-            Thread.sleep(1000)
+  private def reinitialize(): Unit = {
+    // Initialization callbacks are invoked outside of the lock to avoid 
deadlock potential since their completion
+    // may require additional Zookeeper requests, which will block to acquire 
the initialization lock
+    stateChangeHandlers.values.foreach(callBeforeInitializingSession _)
+
+    inWriteLock(initializationLock) {
+      if (!connectionState.isAlive) {
+        zooKeeper.close()
+        info(s"Initializing a new session to $connectString.")
+        // retry forever until ZooKeeper can be instantiated
+        var connected = false
+        while (!connected) {
+          try {
+            zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZooKeeperClientWatcher)
+            connected = true
+          } catch {
+            case e: Exception =>
+              info("Error when recreating ZooKeeper, retrying after a short 
sleep", e)
+              Thread.sleep(1000)
+          }
         }
       }
     }
+
+    stateChangeHandlers.values.foreach(callAfterInitializingSession _)
   }
 
   /**
-   * reinitialize method to use in unit tests
+   * Close the zookeeper client to force session reinitialization. This is 
visible for testing only.
    */
-  private[zookeeper] def reinitialize(): Unit = {
+  private[zookeeper] def forceReinitialize(): Unit = {
     zooKeeper.close()
-    initialize()
+    reinitialize()
+  }
+
+  private def callBeforeInitializingSession(handler: StateChangeHandler): Unit 
= {
+    try {
+      handler.beforeInitializingSession()
+    } catch {
+      case t: Throwable =>
+        error(s"Uncaught error in handler ${handler.name}", t)
+    }
+  }
+
+  private def callAfterInitializingSession(handler: StateChangeHandler): Unit 
= {
+    try {
+      handler.afterInitializingSession()
+    } catch {
+      case t: Throwable =>
+        error(s"Uncaught error in handler ${handler.name}", t)
+    }
   }
 
   // Visibility for testing
   private[zookeeper] def scheduleSessionExpiryHandler(): Unit = {
-    expiryScheduler.schedule("zk-session-expired", () => {
-      inWriteLock(initializationLock) {
-        info("Session expired.")
-        stateChangeHandlers.values.foreach(_.beforeInitializingSession())
-        initialize()
-        stateChangeHandlers.values.foreach(_.afterInitializingSession())
-      }
-    }, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS)
+    expiryScheduler.scheduleOnce("zk-session-expired", () => {
+      info("Session expired.")
+      reinitialize()
+    })
   }
 
   // package level visibility for testing only
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala 
b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index 77e11eae716..c4143e2a8e6 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -18,7 +18,7 @@ package kafka.zookeeper
 
 import java.nio.charset.StandardCharsets
 import java.util.UUID
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, 
CountDownLatch, Executors, Semaphore, TimeUnit}
 
 import com.yammer.metrics.Metrics
@@ -304,6 +304,82 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     assertTrue("Failed to receive data change notification", 
znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
   }
 
+  @Test
+  def testBlockOnRequestCompletionFromStateChangeHandler(): Unit = {
+    // This tests the scenario exposed by KAFKA-6879 in which the expiration 
callback awaits
+    // completion of a request which is handled by another thread
+
+    val latch = new CountDownLatch(1)
+    val stateChangeHandler = new StateChangeHandler {
+      override val name = this.getClass.getName
+      override def beforeInitializingSession(): Unit = {
+        latch.await()
+      }
+    }
+
+    val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, Int.MaxValue, time,
+      "testMetricGroup", "testMetricType")
+    client.registerStateChangeHandler(stateChangeHandler)
+
+    val requestThread = new Thread() {
+      override def run(): Unit = {
+        try
+          client.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
+            ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+        finally
+          latch.countDown()
+      }
+    }
+
+    val reinitializeThread = new Thread() {
+      override def run(): Unit = {
+        client.forceReinitialize()
+      }
+    }
+
+    reinitializeThread.start()
+
+    // sleep briefly before starting the request thread so that the 
initialization
+    // thread is blocking on the latch
+    Thread.sleep(100)
+    requestThread.start()
+
+    reinitializeThread.join()
+    requestThread.join()
+  }
+
+  @Test
+  def testExceptionInBeforeInitializingSession(): Unit = {
+    val faultyHandler = new StateChangeHandler {
+      override val name = this.getClass.getName
+      override def beforeInitializingSession(): Unit = {
+        throw new RuntimeException()
+      }
+    }
+
+    val goodHandler = new StateChangeHandler {
+      val calls = new AtomicInteger(0)
+      override val name = this.getClass.getName
+      override def beforeInitializingSession(): Unit = {
+        calls.incrementAndGet()
+      }
+    }
+
+    val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, Int.MaxValue, time,
+      "testMetricGroup", "testMetricType")
+    client.registerStateChangeHandler(faultyHandler)
+    client.registerStateChangeHandler(goodHandler)
+
+    client.forceReinitialize()
+
+    assertEquals(1, goodHandler.calls.get)
+
+    // Client should be usable even if the callback throws an error
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, 
createResponse.resultCode)
+  }
+
   @Test
   def testZNodeChildChangeHandlerForChildChange(): Unit = {
     import scala.collection.JavaConverters._
@@ -343,7 +419,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
       "testMetricGroup", "testMetricType")
     try {
       zooKeeperClient.registerStateChangeHandler(stateChangeHandler)
-      zooKeeperClient.reinitialize()
+      zooKeeperClient.forceReinitialize()
 
       assertTrue("Failed to receive auth failed notification", 
stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
     } finally zooKeeperClient.close()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Controller deadlock following session expiration
> ------------------------------------------------
>
>                 Key: KAFKA-6879
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6879
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller
>    Affects Versions: 1.1.0
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>            Priority: Critical
>             Fix For: 2.0.0, 1.1.1
>
>
> We have observed an apparent deadlock situation which occurs following a 
> session expiration. The suspected deadlock occurs between the zookeeper 
> "initializationLock" and the latch inside the Expire event which we use to 
> ensure all events have been handled.
> In the logs, we see the "Session expired" message following acquisition of 
> the initialization lock: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala#L358
> But we never see any logs indicating that the new session is being 
> initialized. In fact, the controller logs are basically empty from that point 
> on. The problem we suspect is that completion of the 
> {{beforeInitializingSession}} callback requires that all events have finished 
> processing in order to count down the latch: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1525.
> But an event which was dequeued just prior to the acquisition of the write 
> lock may be unable to complete because it is awaiting acquisition of the 
> initialization lock: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala#L137.
> The impact is that the broker continues in a zombie state. It continues 
> fetching and is periodically added to ISRs, but it never receives any further 
> requests from the controller since it is not registered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to