[ https://issues.apache.org/jira/browse/KAFKA-6042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224617#comment-16224617 ]
Ben Corlett commented on KAFKA-6042: ------------------------------------ We had another issue on broker 125 again. Not the same this time. All other nodes in the cluster are on 0.11.0.1. Broker 125 is running a build from: https://github.com/apache/kafka/commits/2a321941387c7739f2fbbbe592d017b703223ada It ran out of heap space. We are currently running a heap of 5GB. This is the first time we've seen an out of heap issue with kafka. I don't know if this is related to this issue. It might be that 1.0 requires more heap space or that running mixed versions uses more heap. This issue affected the entire cluster and messages rates didn't go back to normal until broker 125 was restarted. I can increase the heap size. {code} [2017-10-28 16:19:31,061] ERROR [KafkaApi-125] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,topics=[{topic=XXXXXXXXX,partitions=[{partition=40,fetch_offset=153707886,max_bytes=1048576}]}]} (kafka.server.KafkaApis) java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101) at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253) at kafka.server.KafkaApis.$anonfun$handleFetchRequest$4(KafkaApis.scala:520) at kafka.server.KafkaApis.$anonfun$handleFetchRequest$4$adapted(KafkaApis.scala:518) at kafka.server.KafkaApis$$Lambda$837/843104331.apply(Unknown Source) at scala.Option.map(Option.scala:146) at kafka.server.KafkaApis.$anonfun$handleFetchRequest$3(KafkaApis.scala:518) at kafka.server.KafkaApis.$anonfun$handleFetchRequest$3$adapted(KafkaApis.scala:508) at kafka.server.KafkaApis$$Lambda$836/1538921035.apply(Unknown Source) at scala.Option.flatMap(Option.scala:171) at kafka.server.KafkaApis.convertedPartitionData$1(KafkaApis.scala:508) at kafka.server.KafkaApis.$anonfun$handleFetchRequest$12(KafkaApis.scala:556) at kafka.server.KafkaApis$$Lambda$833/1032345356.apply(Unknown Source) at scala.collection.Iterator.foreach(Iterator.scala:929) at scala.collection.Iterator.foreach$(Iterator.scala:929) at scala.collection.AbstractIterator.foreach(Iterator.scala:1417) at scala.collection.IterableLike.foreach(IterableLike.scala:71) at scala.collection.IterableLike.foreach$(IterableLike.scala:70) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.server.KafkaApis.createResponse$2(KafkaApis.scala:555) at kafka.server.KafkaApis.$anonfun$handleFetchRequest$14(KafkaApis.scala:569) at kafka.server.KafkaApis.$anonfun$handleFetchRequest$14$adapted(KafkaApis.scala:569) at kafka.server.KafkaApis$$Lambda$844/44004770.apply(Unknown Source) at kafka.server.KafkaApis.$anonfun$sendResponseMaybeThrottle$1(KafkaApis.scala:2034) at kafka.server.KafkaApis$$Lambda$439/940799008.apply$mcVI$sp(Unknown Source) at kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52) at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2034) at kafka.server.KafkaApis.fetchResponseCallback$1(KafkaApis.scala:569) at kafka.server.KafkaApis.$anonfun$handleFetchRequest$15(KafkaApis.scala:588) at kafka.server.KafkaApis$$Lambda$843/1757998472.apply$mcVI$sp(Unknown Source) {code} {code} 2017-10-28T16:19:31.207+0100: 439215.093: [GC pause (G1 Evacuation Pause) (young), 0.0025596 secs] [Parallel Time: 1.3 ms, GC Workers: 4] [GC Worker Start (ms): Min: 439215093.0, Avg: 439215093.0, Max: 439215093.0, Diff: 0.0] [Ext Root Scanning (ms): Min: 0.6, Avg: 0.6, Max: 0.7, Diff: 0.1, Sum: 2.5] [Update RS (ms): Min: 0.0, Avg: 0.4, Max: 0.5, Diff: 0.5, Sum: 1.6] [Processed Buffers: Min: 1, Avg: 1.2, Max: 2, Diff: 1, Sum: 5] [Scan RS (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, Sum: 0.0] [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, Sum: 0.0] [Object Copy (ms): Min: 0.1, Avg: 0.1, Max: 0.1, Diff: 0.0, Sum: 0.3] [Termination (ms): Min: 0.0, Avg: 0.1, Max: 0.4, Diff: 0.4, Sum: 0.4] [Termination Attempts: Min: 1, Avg: 1.0, Max: 1, Diff: 0, Sum: 4] [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, Sum: 0.0] [GC Worker Total (ms): Min: 1.2, Avg: 1.2, Max: 1.2, Diff: 0.0, Sum: 4.9] [GC Worker End (ms): Min: 439215094.2, Avg: 439215094.2, Max: 439215094.2, Diff: 0.0] [Code Root Fixup: 0.1 ms] [Code Root Purge: 0.0 ms] [Clear CT: 0.1 ms] [Other: 1.0 ms] [Choose CSet: 0.0 ms] [Ref Proc: 0.4 ms] [Ref Enq: 0.0 ms] [Redirty Cards: 0.1 ms] [Humongous Register: 0.4 ms] [Humongous Reclaim: 0.0 ms] [Free CSet: 0.0 ms] [Eden: 0.0B(256.0M)->0.0B(256.0M) Survivors: 0.0B->0.0B Heap: 4702.9M(5120.0M)->4702.9M(5120.0M)] [Times: user=0.00 sys=0.00, real=0.00 secs] 2017-10-28T16:19:31.210+0100: 439215.096: [Full GC (Allocation Failure) 4702M->4362M(5120M), 0.3454285 secs] [Eden: 0.0B(256.0M)->0.0B(256.0M) Survivors: 0.0B->0.0B Heap: 4702.9M(5120.0M)->4362.7M(5120.0M)], [Metaspace: 38956K->38955K(1087488K)] [Times: user=0.38 sys=0.13, real=0.35 secs] 2017-10-28T16:19:31.556+0100: 439215.441: [GC concurrent-mark-abort] 2017-10-28T16:19:31.714+0100: 439215.600: [GC pause (GCLocker Initiated GC) (young) (initial-mark), 0.0131186 secs] [Parallel Time: 9.7 ms, GC Workers: 4] [GC Worker Start (ms): Min: 439215600.8, Avg: 439215600.8, Max: 439215600.8, Diff: 0.0] [Ext Root Scanning (ms): Min: 1.1, Avg: 1.2, Max: 1.2, Diff: 0.1, Sum: 4.8] [Update RS (ms): Min: 2.4, Avg: 2.6, Max: 2.6, Diff: 0.2, Sum: 10.2] [Processed Buffers: Min: 10, Avg: 18.2, Max: 35, Diff: 25, Sum: 73] [Scan RS (ms): Min: 0.6, Avg: 0.7, Max: 0.8, Diff: 0.2, Sum: 2.8] [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, Sum: 0.0] [Object Copy (ms): Min: 5.1, Avg: 5.1, Max: 5.1, Diff: 0.0, Sum: 20.3] [Termination (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, Sum: 0.0] [Termination Attempts: Min: 1, Avg: 1.0, Max: 1, Diff: 0, Sum: 4] [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0, Sum: 0.1] [GC Worker Total (ms): Min: 9.5, Avg: 9.5, Max: 9.5, Diff: 0.0, Sum: 38.1] [GC Worker End (ms): Min: 439215610.3, Avg: 439215610.3, Max: 439215610.3, Diff: 0.0] [Code Root Fixup: 0.1 ms] [Code Root Purge: 0.1 ms] [Clear CT: 0.1 ms] [Other: 3.0 ms] [Choose CSet: 0.0 ms] [Ref Proc: 1.3 ms] [Ref Enq: 0.0 ms] [Redirty Cards: 0.1 ms] [Humongous Register: 0.8 ms] [Humongous Reclaim: 0.1 ms] [Free CSet: 0.1 ms] [Eden: 64.0M(256.0M)->0.0B(244.0M) Survivors: 0.0B->12.0M Heap: 4425.2M(5120.0M)->4373.6M(5120.0M)] [Times: user=0.03 sys=0.01, real=0.01 secs] {code} > Kafka Request Handler deadlocks and brings down the cluster. > ------------------------------------------------------------ > > Key: KAFKA-6042 > URL: https://issues.apache.org/jira/browse/KAFKA-6042 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0 > Environment: kafka version: 0.11.0.1 > client versions: 0.8.2.1-0.10.2.1 > platform: aws (eu-west-1a) > nodes: 36 x r4.xlarge > disk storage: 2.5 tb per node (~73% usage per node) > topics: 250 > number of partitions: 48k (approx) > os: ubuntu 14.04 > jvm: Java(TM) SE Runtime Environment (build 1.8.0_131-b11), Java HotSpot(TM) > 64-Bit Server VM (build 25.131-b11, mixed mode) > Reporter: Ben Corlett > Assignee: Rajini Sivaram > Priority: Blocker > Fix For: 1.0.0, 0.11.0.2 > > Attachments: thread_dump.txt.gz > > > We have been experiencing a deadlock that happens on a consistent server > within our cluster. This happens multiple times a week currently. It first > started happening when we upgraded to 0.11.0.0. Sadly 0.11.0.1 failed to > resolve the issue. > Sequence of events: > At a seemingly random time broker 125 goes into a deadlock. As soon as it is > deadlocked it will remove all the ISR's for any partition is its the leader > for. > [2017-10-10 00:06:10,061] INFO Partition [XXXXXXXXXX,24] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,073] INFO Partition [XXXXXXXXXX,974] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,079] INFO Partition [XXXXXXXXXX,64] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,081] INFO Partition [XXXXXXXXXX,21] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,084] INFO Partition [XXXXXXXXXX,12] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,085] INFO Partition [XXXXXXXXXX,61] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,086] INFO Partition [XXXXXXXXXX,53] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,088] INFO Partition [XXXXXXXXXX,27] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,090] INFO Partition [XXXXXXXXXX,182] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > [2017-10-10 00:06:10,091] INFO Partition [XXXXXXXXXX,16] on broker 125: > Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition) > .... > The other nodes fail to connect to the node 125 > [2017-10-10 00:08:42,318] WARN [ReplicaFetcherThread-0-125]: Error in fetch > to broker 125, request (type=FetchRequest, replicaId=101, maxWait=500, > minBytes=1, maxBytes=10485760, fetchData={XXXXXXXXXX-94=(offset=0, > logStartOffset=0, maxBytes=1048576), XXXXXXXXXX-22=(offset=0, > logStartOffset=0, maxBytes=1048576), XXXXXXXXXX-58=(offset=0, > logStartOffset=0, maxBytes=1048576), XXXXXXXXXX-11=(offset=78932482, > logStartOffset=50881481, maxBytes=1048576), XXXXXXXXXX-55=(offset=0, > logStartOffset=0, maxBytes=1048576), XXXXXXXXXX-19=(offset=0, > logStartOffset=0, maxBytes=1048576), XXXXXXXXXX-91=(offset=0, > logStartOffset=0, maxBytes=1048576), XXXXXXXXXX-5=(offset=903857106, > logStartOffset=0, maxBytes=1048576), XXXXXXXXXX-80=(offset=0, > logStartOffset=0, maxBytes=1048576), XXXXXXXXXX-88=(offset=0, > logStartOffset=0, maxBytes=1048576), XXXXXXXXXX-34=(offset=308, > logStartOffset=308, maxBytes=1048576), XXXXXXXXXX-7=(offset=369990, > logStartOffset=369990, maxBytes=1048576), XXXXXXXXXX-0=(offset=57965795, > logStartOffset=0, maxBytes=1048576)}) (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 125 was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:93) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:93) > at > kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:207) > at > kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:151) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) > As node 125 removed all the ISRs as it was locking up, a failover for any > partition without an unclean leader election is not possible. This breaks any > partition that this node was leader for. As we spread all topics across all > servers this essentials brings down the entire cluster. > Recovery: > Unforuntately with broker 125 in its deadlocked state a clean shutdown does > not work. A kill -9 is necessary. After an unclean shutdown the indexes must > be rebuilt and start up time is around 10 minutes. After node 125 finally > starts the cluster recovers. > A thread dump on 125 shows: > Found one Java-level deadlock: > ============================= > "executor-Produce": > waiting to lock monitor 0x00007f01e417ac48 (object 0x000000068d10a1a0, a > kafka.coordinator.group.GroupMetadata), > which is held by "kafka-request-handler-2" > "kafka-request-handler-2": > waiting to lock monitor 0x00007f0208f5e198 (object 0x000000068d2a45f8, a > kafka.coordinator.group.GroupMetadata), > which is held by "kafka-request-handler-1" > "kafka-request-handler-1": > waiting to lock monitor 0x00007f01e417ac48 (object 0x000000068d10a1a0, a > kafka.coordinator.group.GroupMetadata), > which is held by "kafka-request-handler-2" > Java stack information for the threads listed above: > =================================================== > "executor-Produce": > at > kafka.coordinator.group.GroupMetadataManager.putCacheCallback$2(GroupMetadataManager.scala:312) > - waiting to lock <0x000000068d10a1a0> (a > kafka.coordinator.group.GroupMetadata) > at > kafka.coordinator.group.GroupMetadataManager.$anonfun$storeOffsets$10(GroupMetadataManager.scala:381) > at > kafka.coordinator.group.GroupMetadataManager.$anonfun$storeOffsets$10$adapted(GroupMetadataManager.scala:381) > at > kafka.coordinator.group.GroupMetadataManager$$Lambda$990/350831638.apply(Unknown > Source) > at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:131) > at > kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:66) > at kafka.server.DelayedOperation.run(DelayedOperation.scala:112) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > "kafka-request-handler-2": > at > kafka.server.DelayedProduce.safeTryComplete(DelayedProduce.scala:75) > - waiting to lock <0x000000068d2a45f8> (a > kafka.coordinator.group.GroupMetadata) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:338) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:244) > at > kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:250) > at > kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:418) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:500) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:546) > at kafka.server.ReplicaManager$$Lambda$887/1936159787.apply(Unknown > Source) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$Lambda$14/1859039536.apply(Unknown Source) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:120) > at scala.collection.TraversableLike.map(TraversableLike.scala:234) > at scala.collection.TraversableLike.map$(TraversableLike.scala:227) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:532) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:374) > at > kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:246) > at > kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:381) > at > kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:465) > - locked <0x000000068d10a1a0> (a > kafka.coordinator.group.GroupMetadata) > at > kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:429) > at > kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:361) > at kafka.server.KafkaApis.handle(KafkaApis.scala:105) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66) > at java.lang.Thread.run(Thread.java:748) > "kafka-request-handler-1": > at > kafka.server.DelayedProduce.safeTryComplete(DelayedProduce.scala:75) > - waiting to lock <0x000000068d10a1a0> (a > kafka.coordinator.group.GroupMetadata) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:338) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:244) > at > kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:250) > at > kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:418) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:500) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:546) > at kafka.server.ReplicaManager$$Lambda$887/1936159787.apply(Unknown > Source) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$Lambda$14/1859039536.apply(Unknown Source) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:120) > at scala.collection.TraversableLike.map(TraversableLike.scala:234) > at scala.collection.TraversableLike.map$(TraversableLike.scala:227) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:532) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:374) > at > kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:246) > at > kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:381) > at > kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:465) > - locked <0x000000068d2a45f8> (a > kafka.coordinator.group.GroupMetadata) > at > kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:429) > at > kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:361) > at kafka.server.KafkaApis.handle(KafkaApis.scala:105) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66) > at java.lang.Thread.run(Thread.java:748) > Full thread dump attached. -- This message was sent by Atlassian JIRA (v6.4.14#64029)