Jason Gustafson created KAFKA-13254:
---------------------------------------

             Summary: Deadlock when expanding ISR
                 Key: KAFKA-13254
                 URL: https://issues.apache.org/jira/browse/KAFKA-13254
             Project: Kafka
          Issue Type: Improvement
            Reporter: Jason Gustafson
            Assignee: Jason Gustafson


Found this when debugging downgrade system test failures. The patch for 
https://issues.apache.org/jira/browse/KAFKA-13091 introduced a deadlock. Here 
are the jstack details:
{code}
"data-plane-kafka-request-handler-4":                                           
                                                                                
                                                   
  waiting for ownable synchronizer 0x00000000fcc00020, (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync),                          
                                                                         
  which is held by "data-plane-kafka-request-handler-5"                         
                                                                                
                                                   

"data-plane-kafka-request-handler-5":
  waiting for ownable synchronizer 0x00000000c9161b20, (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
  which is held by "data-plane-kafka-request-handler-4"

"data-plane-kafka-request-handler-4":
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000fcc00020> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
        at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
        at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
        at 
kafka.server.DelayedOperation.safeTryComplete(DelayedOperation.scala:121)
        at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:362)
        at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:264)
        at 
kafka.cluster.DelayedOperations.checkAndCompleteAll(Partition.scala:59)
        at 
kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:907)
        at kafka.cluster.Partition.handleAlterIsrResponse(Partition.scala:1421)
        at 
kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1(Partition.scala:1340)
        at 
kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1$adapted(Partition.scala:1340)
        at kafka.cluster.Partition$$Lambda$1496/2055478409.apply(Unknown Source)
        at kafka.server.ZkIsrManager.submit(ZkIsrManager.scala:74)
        at kafka.cluster.Partition.sendAlterIsrRequest(Partition.scala:1345)
        at kafka.cluster.Partition.expandIsr(Partition.scala:1312)
        at 
kafka.cluster.Partition.$anonfun$maybeExpandIsr$2(Partition.scala:755)
        at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:754)
        at kafka.cluster.Partition.updateFollowerFetchState(Partition.scala:672)
        at 
kafka.server.ReplicaManager.$anonfun$updateFollowerFetchState$1(ReplicaManager.scala:1806)
        at kafka.server.ReplicaManager$$Lambda$1075/1996432270.apply(Unknown 
Source)
        at 
scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:99)
        at 
scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:86)
        at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:42)
        at 
kafka.server.ReplicaManager.updateFollowerFetchState(ReplicaManager.scala:1790)
        at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:1025)
        at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:1029)
        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:970)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:173)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
        at java.lang.Thread.run(Thread.java:748)

"data-plane-kafka-request-handler-5":
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000c9161b20> (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
        at 
java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
        at kafka.cluster.Partition.fetchOffsetSnapshot(Partition.scala:1183)
        at 
kafka.server.DelayedFetch.$anonfun$tryComplete$1(DelayedFetch.scala:96)
        at 
kafka.server.DelayedFetch.$anonfun$tryComplete$1$adapted(DelayedFetch.scala:89)
        at kafka.server.DelayedFetch$$Lambda$1115/1987378797.apply(Unknown 
Source)
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
        at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:89)
        at 
kafka.server.DelayedOperation.safeTryComplete(DelayedOperation.scala:121)
        at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:362)
        at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:264)
        at 
kafka.server.ReplicaManager.$anonfun$appendRecords$6(ReplicaManager.scala:622)
        at kafka.server.ReplicaManager$$Lambda$1150/40125541.apply(Unknown 
Source)
        at scala.collection.mutable.HashMap$Node.foreach(HashMap.scala:627)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:489)
        at 
kafka.server.ReplicaManager.$anonfun$appendRecords$5(ReplicaManager.scala:611)
        at 
kafka.server.ReplicaManager$$Lambda$1134/1761219075.apply$mcV$sp(Unknown Source)
        at kafka.server.ActionQueue.tryCompleteActions(ActionQueue.scala:49)
        at 
kafka.server.ReplicaManager.tryCompleteActions(ReplicaManager.scala:569)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:245)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
        at java.lang.Thread.run(Thread.java:748)
{code}


Basically one thread holds the LeaderAndIsr write lock (from maybeExpandIsr) 
and is trying to grab the lock for a delayed operation in order to complete it. 
The other thread has the lock for the delayed operation and is trying to grab 
the LeaderAndIsr read lock.

Note that this does not affect 3.0 or any other released version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to