[ https://issues.apache.org/jira/browse/KAFKA-6042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ben Corlett updated KAFKA-6042: ------------------------------- Attachment: heapusage.png heapusage > Kafka Request Handler deadlocks and brings down the cluster. > ------------------------------------------------------------ > > Key: KAFKA-6042 > URL: https://issues.apache.org/jira/browse/KAFKA-6042 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0 > Environment: kafka version: 0.11.0.1 > client versions: 0.8.2.1-0.10.2.1 > platform: aws (eu-west-1a) > nodes: 36 x r4.xlarge > disk storage: 2.5 tb per node (~73% usage per node) > topics: 250 > number of partitions: 48k (approx) > os: ubuntu 14.04 > jvm: Java(TM) SE Runtime Environment (build 1.8.0_131-b11), Java HotSpot(TM) > 64-Bit Server VM (build 25.131-b11, mixed mode) > Reporter: Ben Corlett > Assignee: Rajini Sivaram > Priority: Blocker > Fix For: 1.0.0, 0.11.0.2 > > Attachments: heapusage.png, thread_dump.txt.gz > > > We have been experiencing a deadlock that happens on a consistent server > within our cluster. This happens multiple times a week currently. It first > started happening when we upgraded to 0.11.0.0. Sadly 0.11.0.1 failed to > resolve the issue. > Sequence of events: > At a seemingly random time broker 125 goes into a deadlock. As soon as it is > deadlocked it will remove all the ISR's for any partition is its the leader > for. > [2017-10-10 00:06:10,061] INFO Partition [XXXXXXXXXX,24] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,073] INFO Partition [XXXXXXXXXX,974] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,079] INFO Partition [XXXXXXXXXX,64] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,081] INFO Partition [XXXXXXXXXX,21] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,084] INFO Partition [XXXXXXXXXX,12] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,085] INFO Partition [XXXXXXXXXX,61] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,086] INFO Partition [XXXXXXXXXX,53] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,088] INFO Partition [XXXXXXXXXX,27] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,090] INFO Partition [XXXXXXXXXX,182] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,091] INFO Partition [XXXXXXXXXX,16] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > .... > The other nodes fail to connect to the node 125 > [2017-10-10 00:08:42,318] WARN [ReplicaFetcherThread-0-125]: Error in fetch > to broker 125, request (type=FetchRequest, replicaId=101, maxWait=500, > minBytes=1, maxBytes=10485760, fetchData={XXXXXXXXXX-94=(offset=0, > logStartOffset=0, maxBytes=1048576), XXXXXXXXXX-22=(offset=0, > logStartOffset=0, maxBytes=1048576), XXXXXXXXXX-58=(offset=0, > logStartOffset=0, maxBytes=1048576), XXXXXXXXXX-11=(offset=78932482, > logStartOffset=50881481, maxBytes=1048576), XXXXXXXXXX-55=(offset=0, > logStartOffset=0, maxBytes=1048576), XXXXXXXXXX-19=(offset=0, > logStartOffset=0, maxBytes=1048576), XXXXXXXXXX-91=(offset=0, > logStartOffset=0, maxBytes=1048576), XXXXXXXXXX-5=(offset=903857106, > logStartOffset=0, maxBytes=1048576), XXXXXXXXXX-80=(offset=0, > logStartOffset=0, maxBytes=1048576), XXXXXXXXXX-88=(offset=0, > logStartOffset=0, maxBytes=1048576), XXXXXXXXXX-34=(offset=308, > logStartOffset=308, maxBytes=1048576), XXXXXXXXXX-7=(offset=369990, > logStartOffset=369990, maxBytes=1048576), XXXXXXXXXX-0=(offset=57965795, > logStartOffset=0, maxBytes=1048576)}) (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 125 was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:93) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:93) > at > kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:207) > at > kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:151) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) > As node 125 removed all the ISRs as it was locking up, a failover for any > partition without an unclean leader election is not possible. This breaks any > partition that this node was leader for. As we spread all topics across all > servers this essentials brings down the entire cluster. > Recovery: > Unforuntately with broker 125 in its deadlocked state a clean shutdown does > not work. A kill -9 is necessary. After an unclean shutdown the indexes must > be rebuilt and start up time is around 10 minutes. After node 125 finally > starts the cluster recovers. > A thread dump on 125 shows: > Found one Java-level deadlock: > ============================= > "executor-Produce": > waiting to lock monitor 0x00007f01e417ac48 (object 0x000000068d10a1a0, a > kafka.coordinator.group.GroupMetadata), > which is held by "kafka-request-handler-2" > "kafka-request-handler-2": > waiting to lock monitor 0x00007f0208f5e198 (object 0x000000068d2a45f8, a > kafka.coordinator.group.GroupMetadata), > which is held by "kafka-request-handler-1" > "kafka-request-handler-1": > waiting to lock monitor 0x00007f01e417ac48 (object 0x000000068d10a1a0, a > kafka.coordinator.group.GroupMetadata), > which is held by "kafka-request-handler-2" > Java stack information for the threads listed above: > =================================================== > "executor-Produce": > at > kafka.coordinator.group.GroupMetadataManager.putCacheCallback$2(GroupMetadataManager.scala:312) > - waiting to lock <0x000000068d10a1a0> (a > kafka.coordinator.group.GroupMetadata) > at > kafka.coordinator.group.GroupMetadataManager.$anonfun$storeOffsets$10(GroupMetadataManager.scala:381) > at > kafka.coordinator.group.GroupMetadataManager.$anonfun$storeOffsets$10$adapted(GroupMetadataManager.scala:381) > at > kafka.coordinator.group.GroupMetadataManager$$Lambda$990/350831638.apply(Unknown > Source) > at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:131) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:66) > at kafka.server.DelayedOperation.run(DelayedOperation.scala:112) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > "kafka-request-handler-2": > at > kafka.server.DelayedProduce.safeTryComplete(DelayedProduce.scala:75) > - waiting to lock <0x000000068d2a45f8> (a > kafka.coordinator.group.GroupMetadata) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:338) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:244) > at > kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:250) > at > kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:418) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:500) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:546) > at kafka.server.ReplicaManager$$Lambda$887/1936159787.apply(Unknown > Source) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$Lambda$14/1859039536.apply(Unknown Source) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:120) > at scala.collection.TraversableLike.map(TraversableLike.scala:234) > at scala.collection.TraversableLike.map$(TraversableLike.scala:227) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:532) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:374) > at > kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:246) > at > kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:381) > at > kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:465) > - locked <0x000000068d10a1a0> (a > kafka.coordinator.group.GroupMetadata) > at > kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:429) > at > kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:361) > at kafka.server.KafkaApis.handle(KafkaApis.scala:105) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66) > at java.lang.Thread.run(Thread.java:748) > "kafka-request-handler-1": > at > kafka.server.DelayedProduce.safeTryComplete(DelayedProduce.scala:75) > - waiting to lock <0x000000068d10a1a0> (a > kafka.coordinator.group.GroupMetadata) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:338) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:244) > at > kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:250) > at > kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:418) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:500) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:546) > at kafka.server.ReplicaManager$$Lambda$887/1936159787.apply(Unknown > Source) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$Lambda$14/1859039536.apply(Unknown Source) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:120) > at scala.collection.TraversableLike.map(TraversableLike.scala:234) > at scala.collection.TraversableLike.map$(TraversableLike.scala:227) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:532) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:374) > at > kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:246) > at > kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:381) > at > kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:465) > - locked <0x000000068d2a45f8> (a > kafka.coordinator.group.GroupMetadata) > at > kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:429) > at > kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:361) > at kafka.server.KafkaApis.handle(KafkaApis.scala:105) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66) > at java.lang.Thread.run(Thread.java:748) > Full thread dump attached. -- This message was sent by Atlassian JIRA (v6.4.14#64029)