[ 
https://issues.apache.org/jira/browse/KAFKA-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357199#comment-16357199
 ] 

ASF GitHub Bot commented on KAFKA-6517:
---------------------------------------

rajinisivaram opened a new pull request #4551: KAFKA-6517: Avoid deadlock in 
ZooKeeperClient during session expiry
URL: https://github.com/apache/kafka/pull/4551
 
 
   `ZooKeeperClient` acquires `initializationLock#writeLock` to establish a new 
connection while processing session expiry WatchEvent. 
`ZooKeeperClient#handleRequests` acquires  `initializationLock#readLock`, 
allowing multiple batches of requests to be processed concurrently, but 
preventing reconnections while processing requests. At the moment, 
`handleRequests` holds onto the readLock throughout the method, even while 
waiting for responses and inflight requests to complete. But responses cannot 
be delivered if event thread is blocked on the writeLock to process session 
expiry event. This results in a deadlock. During broker shutdown, the shutdown 
thread is also blocked since it needs the readLock to perform 
`ZooKeeperClient#unregisterStateChangeHandler`, which cannot be acquired if a 
session expiry had occurred earlier since this thread gets queued behind the 
event handler thread waiting for writeLock.
   
   This PR fixes the issue by limiting locking in 
`ZooKeeperClient#handleRequests` to just the non-blocking send, so that session 
expiry handling doesn't get blocked.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ZooKeeperClient holds a lock while waiting for responses, blocking shutdown
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-6517
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6517
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.1.0
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Blocker
>             Fix For: 1.1.0
>
>
> Stack traces from a local test run that was deadlocked because shutdown 
> couldn't acquire the lock:
>  # kafka-scheduler-7: acquired read lock in 
> kafka.zookeeper.ZooKeeperClient.handleRequests
>  # Test worker-EventThread waiting for write lock to process SessionExpired 
> in kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process
>  # ForkJoinPool-1-worker-11 processing KafkaServer.shutdown is queued behind 
> 2) waiting to acquire read lock for 
> kafka.zookeeper.ZooKeeperClient.unregisterStateChangeHandler
> Stack traces of the relevant threads:
> {quote}
> "kafka-scheduler-7" daemon prio=5 tid=0x00007fade918d800 nid=0xd317 waiting 
> on condition [0x000070000b371000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000007e4c6e698> (a 
> java.util.concurrent.CountDownLatch$Sync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
>         at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
>         at 
> kafka.zookeeper.ZooKeeperClient$$anonfun$handleRequests$1.apply(ZooKeeperClient.scala:146)
>         at 
> kafka.zookeeper.ZooKeeperClient$$anonfun$handleRequests$1.apply(ZooKeeperClient.scala:126)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>         at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:256)
>         at 
> kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:125)
>         at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1432)
>         at 
> kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1425)
>         at 
> kafka.zk.KafkaZkClient.conditionalUpdatePath(KafkaZkClient.scala:583)
>         at 
> kafka.utils.ReplicationUtils$.updateLeaderAndIsr(ReplicationUtils.scala:33)
>         at 
> kafka.cluster.Partition.kafka$cluster$Partition$$updateIsr(Partition.scala:665)
>         at 
> kafka.cluster.Partition$$anonfun$4.apply$mcZ$sp(Partition.scala:509)
>         at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:500)
>         at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:500)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>         at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
>         at kafka.cluster.Partition.maybeShrinkIsr(Partition.scala:499)
>         at 
> kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1335)
>         at 
> kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1335)
> ......
> "Test worker-EventThread" daemon prio=5 tid=0x00007fade90cf800 nid=0xef13 
> waiting on condition [0x000070000a23f000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x0000000781847620> (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
>         at 
> java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:945)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
>         at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
>         at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:355)
>         at 
> org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531)
>         at 
> org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506)
>  
> "ForkJoinPool-1-worker-11" daemon prio=5 tid=0x00007fade9a83000 nid=0x17907 
> waiting on condition [0x0000700011eaf000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x0000000781847620> (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:964)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1282)
>         at 
> java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:731)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
>         at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:256)
>         at 
> kafka.zookeeper.ZooKeeperClient.unregisterStateChangeHandler(ZooKeeperClient.scala:295)
>         at 
> kafka.zk.KafkaZkClient.unregisterStateChangeHandler(KafkaZkClient.scala:1217)
>         at 
> kafka.common.ZkNodeChangeNotificationListener.close(ZkNodeChangeNotificationListener.scala:68)
>         at 
> kafka.server.DynamicConfigManager.shutdown(DynamicConfigManager.scala:181)
>         at 
> kafka.server.KafkaServer$$anonfun$shutdown$2.apply$mcV$sp(KafkaServer.scala:552)
>         at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:85)
>         at kafka.server.KafkaServer.shutdown(KafkaServer.scala:552)
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to