Ben Corlett created KAFKA-6042:
----------------------------------

             Summary: Kafka Request Handler deadlocks and brings down the 
cluster.
                 Key: KAFKA-6042
                 URL: https://issues.apache.org/jira/browse/KAFKA-6042
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 0.11.0.0, 0.11.0.1
         Environment: kafka version: 0.11.0.1
client versions: 0.8.2.1-0.10.2.1
platform: aws (eu-west-1a)
nodes: 36 x r4.xlarge
disk storage: 2.5 tb per node (~73% usage per node)
topics: 250
number of partitions: 48k (approx)
os: ubuntu 14.04
jvm: Java(TM) SE Runtime Environment (build 1.8.0_131-b11), Java HotSpot(TM) 
64-Bit Server VM (build 25.131-b11, mixed mode)
            Reporter: Ben Corlett
            Priority: Critical
         Attachments: thread_dump.txt.gz

We have been experiencing a deadlock that happens on a consistent server within 
our cluster. This happens multiple times a week currently. It first started 
happening when we upgraded to 0.11.0.0. Sadly 0.11.0.1 failed to resolve the 
issue.

Sequence of events:

At a seemingly random time broker 125 goes into a deadlock. As soon as it is 
deadlocked it will remove all the ISR's for any partition is its the leader for.

[2017-10-10 00:06:10,061] INFO Partition [XXXXXXXXXX,24] on broker 125: 
Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
[2017-10-10 00:06:10,073] INFO Partition [XXXXXXXXXX,974] on broker 125: 
Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
[2017-10-10 00:06:10,079] INFO Partition [XXXXXXXXXX,64] on broker 125: 
Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
[2017-10-10 00:06:10,081] INFO Partition [XXXXXXXXXX,21] on broker 125: 
Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
[2017-10-10 00:06:10,084] INFO Partition [XXXXXXXXXX,12] on broker 125: 
Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
[2017-10-10 00:06:10,085] INFO Partition [XXXXXXXXXX,61] on broker 125: 
Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
[2017-10-10 00:06:10,086] INFO Partition [XXXXXXXXXX,53] on broker 125: 
Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
[2017-10-10 00:06:10,088] INFO Partition [XXXXXXXXXX,27] on broker 125: 
Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
[2017-10-10 00:06:10,090] INFO Partition [XXXXXXXXXX,182] on broker 125: 
Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
[2017-10-10 00:06:10,091] INFO Partition [XXXXXXXXXX,16] on broker 125: 
Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
....


The other nodes fail to connect to the node 125 

[2017-10-10 00:08:42,318] WARN [ReplicaFetcherThread-0-125]: Error in fetch to 
broker 125, request (type=FetchRequest, replicaId=101, maxWait=500, minBytes=1, 
maxBytes=10485760, fetchData={XXXXXXXXXX-94=(offset=0, logStartOffset=0, 
maxBytes=1048576), XXXXXXXXXX-22=(offset=0, logStartOffset=0, 
maxBytes=1048576), XXXXXXXXXX-58=(offset=0, logStartOffset=0, 
maxBytes=1048576), XXXXXXXXXX-11=(offset=78932482, logStartOffset=50881481, 
maxBytes=1048576), XXXXXXXXXX-55=(offset=0, logStartOffset=0, 
maxBytes=1048576), XXXXXXXXXX-19=(offset=0, logStartOffset=0, 
maxBytes=1048576), XXXXXXXXXX-91=(offset=0, logStartOffset=0, 
maxBytes=1048576), XXXXXXXXXX-5=(offset=903857106, logStartOffset=0, 
maxBytes=1048576), XXXXXXXXXX-80=(offset=0, logStartOffset=0, 
maxBytes=1048576), XXXXXXXXXX-88=(offset=0, logStartOffset=0, 
maxBytes=1048576), XXXXXXXXXX-34=(offset=308, logStartOffset=308, 
maxBytes=1048576), XXXXXXXXXX-7=(offset=369990, logStartOffset=369990, 
maxBytes=1048576), XXXXXXXXXX-0=(offset=57965795, logStartOffset=0, 
maxBytes=1048576)}) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 125 was disconnected before the response was 
read
        at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:93)
        at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:93)
        at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:207)
        at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
        at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:151)
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)

As node 125 removed all the ISRs as it was locking up, a failover for any 
partition without an unclean leader election is not possible. This breaks any 
partition that this node was leader for. As we spread all topics across all 
servers this essentials brings down the entire cluster.


Recovery:

Unforuntately with broker 125 in its deadlocked state a clean shutdown does not 
work. A kill -9 is necessary. After an unclean shutdown the indexes must be 
rebuilt and start up time is around 10 minutes. After node 125 finally starts 
the cluster recovers.


A thread dump on 125 shows:

Found one Java-level deadlock:
=============================
"executor-Produce":
  waiting to lock monitor 0x00007f01e417ac48 (object 0x000000068d10a1a0, a 
kafka.coordinator.group.GroupMetadata),
  which is held by "kafka-request-handler-2"
"kafka-request-handler-2":
  waiting to lock monitor 0x00007f0208f5e198 (object 0x000000068d2a45f8, a 
kafka.coordinator.group.GroupMetadata),
  which is held by "kafka-request-handler-1"
"kafka-request-handler-1":
  waiting to lock monitor 0x00007f01e417ac48 (object 0x000000068d10a1a0, a 
kafka.coordinator.group.GroupMetadata),
  which is held by "kafka-request-handler-2"

Java stack information for the threads listed above:
===================================================
"executor-Produce":
        at 
kafka.coordinator.group.GroupMetadataManager.putCacheCallback$2(GroupMetadataManager.scala:312)
        - waiting to lock <0x000000068d10a1a0> (a 
kafka.coordinator.group.GroupMetadata)
        at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$storeOffsets$10(GroupMetadataManager.scala:381)
        at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$storeOffsets$10$adapted(GroupMetadataManager.scala:381)
        at 
kafka.coordinator.group.GroupMetadataManager$$Lambda$990/350831638.apply(Unknown
 Source)
        at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:131)
        at 
kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:66)
        at kafka.server.DelayedOperation.run(DelayedOperation.scala:112)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
"kafka-request-handler-2":
        at kafka.server.DelayedProduce.safeTryComplete(DelayedProduce.scala:75)
        - waiting to lock <0x000000068d2a45f8> (a 
kafka.coordinator.group.GroupMetadata)
        at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:338)
        at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:244)
        at 
kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:250)
        at 
kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:418)
        at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:500)
        at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:546)
        at kafka.server.ReplicaManager$$Lambda$887/1936159787.apply(Unknown 
Source)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$Lambda$14/1859039536.apply(Unknown 
Source)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:120)
        at scala.collection.TraversableLike.map(TraversableLike.scala:234)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:532)
        at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:374)
        at 
kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:246)
        at 
kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:381)
        at 
kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:465)
        - locked <0x000000068d10a1a0> (a kafka.coordinator.group.GroupMetadata)
        at 
kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:429)
        at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:361)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:105)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
        at java.lang.Thread.run(Thread.java:748)
"kafka-request-handler-1":
        at kafka.server.DelayedProduce.safeTryComplete(DelayedProduce.scala:75)
        - waiting to lock <0x000000068d10a1a0> (a 
kafka.coordinator.group.GroupMetadata)
        at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:338)
        at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:244)
        at 
kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:250)
        at 
kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:418)
        at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:500)
        at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:546)
        at kafka.server.ReplicaManager$$Lambda$887/1936159787.apply(Unknown 
Source)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$Lambda$14/1859039536.apply(Unknown 
Source)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:120)
        at scala.collection.TraversableLike.map(TraversableLike.scala:234)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:532)
        at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:374)
        at 
kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:246)
        at 
kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:381)
        at 
kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:465)
        - locked <0x000000068d2a45f8> (a kafka.coordinator.group.GroupMetadata)
        at 
kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:429)
        at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:361)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:105)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
        at java.lang.Thread.run(Thread.java:748)


Full thread dump attached.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to