Hi, I encountered a strange issue in our kafka cluster, where randomly a single broker entered a state where it seemed to think it was the only broker in the cluster (it shrank all of its ISRs to just existing on itself). Some details about the kafka cluster:
- running in an EC2 VPC on AWS - 3 nodes (d2.xlarge) - Kafka version : 0.10.1.0 More information about the incident: Around 19:57 yesterday, one of the nodes somehow lost its connection to the cluster and started reporting messages like this for what seemed to be all of its hosted topic partitions: [2016-11-28 19:57:05,426] INFO Partition [arches_stage,0] on broker 1002: > Shrinking ISR for partition [arches_stage,0] from 1003,1002,1001 to 1002 > (kafka.cluster.Partition) > [2016-11-28 19:57:05,466] INFO Partition [connect-offsets,13] on broker > 1002: Shrinking ISR for partition [connect-offsets,13] from 1003,1002,1001 > to 1002 (kafka.cluster.Partition) > [2016-11-28 19:57:05,489] INFO Partition [lasagna_prod_memstore,2] on > broker 1002: Shrinking ISR for partition [lasagna_prod_memstore,2] from > 1003,1002,1001 to 1002 (kafka.cluster.Partition) > ... > It then added the ISRs from the other machines back in: [2016-11-28 19:57:18,013] INFO Partition [arches_stage,0] on broker 1002: > Expanding ISR for partition [arches_stage,0] from 1002 to 1002,1003 > (kafka.cluster.Partition) > [2016-11-28 19:57:18,015] INFO Partition [connect-offsets,13] on broker > 1002: Expanding ISR for partition [connect-offsets,13] from 1002 to > 1002,1003 (kafka.cluster.Partition) > [2016-11-28 19:57:18,018] INFO Partition [lasagna_prod_memstore,2] on > broker 1002: Expanding ISR for partition [lasagna_prod_memstore,2] from > 1002 to 1002,1003 (kafka.cluster.Partition) > ... > [2016-11-28 19:57:18,222] INFO Partition [arches_stage,0] on broker 1002: > Expanding ISR for partition [arches_stage,0] from 1002,1003 to > 1002,1003,1001 (kafka.cluster.Partition) > [2016-11-28 19:57:18,224] INFO Partition [connect-offsets,13] on broker > 1002: Expanding ISR for partition [connect-offsets,13] from 1002,1003 to > 1002,1003,1001 (kafka.cluster.Partition) > [2016-11-28 19:57:18,227] INFO Partition [lasagna_prod_memstore,2] on > broker 1002: Expanding ISR for partition [lasagna_prod_memstore,2] from > 1002,1003 to 1002,1003,1001 (kafka.cluster.Partition) and eventually removed them again before going on its merry way: [2016-11-28 19:58:05,408] INFO Partition [arches_stage,0] on broker 1002: > Shrinking ISR for partition [arches_stage,0] from 1002,1003,1001 to 1002 > (kafka.cluster.Partition) > [2016-11-28 19:58:05,415] INFO Partition [connect-offsets,13] on broker > 1002: Shrinking ISR for partition [connect-offsets,13] from 1002,1003,1001 > to 1002 (kafka.cluster.Partition) > [2016-11-28 19:58:05,416] INFO Partition [lasagna_prod_memstore,2] on > broker 1002: Shrinking ISR for partition [lasagna_prod_memstore,2] from > 1002,1003,1001 to 1002 (kafka.cluster.Partition) Node 1002 continued running from that point on normally (outside of the fact that all of it's partitions were under replicated). Also there were no WARN/ERROR before/after this. The other two nodes were not so happy however, with both failing to connect to via the ReplicaFetcherThread to the node in question. The reported this around the same time as that error: [2016-11-28 19:57:16,087] WARN [ReplicaFetcherThread-0-1002], Error in > fetch kafka.server.ReplicaFetcherThread$FetchRequest@6eb44718 > (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 1002 was disconnected before the > response was read > at > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115) > at > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112) > at scala.Option.foreach(Option.scala:257) > at > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112) > at > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108) > at > kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:137) > at > kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143) > at > kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108) > at > kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253) > at > kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238) > at > kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) and then got stuck trying this every 30 seconds until I restarted node 1002: [2016-11-28 20:02:04,513] WARN [ReplicaFetcherThread-0-1002], Error in > fetch kafka.server.ReplicaFetcherThread$FetchRequest@1cd61a02 > (kafka.server.ReplicaFetcherThread) > java.net.SocketTimeoutException: Failed to connect within 30000 ms > at > kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:249) > at > kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238) > at > kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) I restarted the node when I noticed this, however because the replicas were out of sync, we ended up having an unclean leader election and ultimately losing data for the partitions on that machine. Some potentially interesting things about the cluster state at that time: - I *was* able to telnet to port 9092 on the machine that went rogue from each of the other two kafka brokers (even while they were reporting failed to connect) - The number of open file descriptors on that machine started increased linearly for the entire 1.5 hours the cluster was in this state, eventually ending up at ~4x the usual open file descriptors. The number of open file descriptors went back to normal after the restart. - The heap size on the node in question started fluctuating very rapidly. The usual behavior is the heap size slowly grows over a period of ~10 hours and then I assume a large GC occurs and it starts this again. The node that had this issue had the period of that behavior drop to ~5 mins. - The heap size spiked to a size way higher than normal - While the node was in this state the System/Process CPU dropped to ~1/8th of its usual level. I have the full logs and more metrics collected for all 3 nodes for that time period and would be happy to share them, but I wasn't sure if the user list supported attachments. Any help would be greatly appreciated. Thanks, <http://dataminr.com/> *Tom DeVoe* Sofware Engineer, Data 6 East 32nd Street, 2nd Floor New York, NY 10016 Dataminr is a Twitter Official Partner. Dataminr in the news: The Economist <http://www.economist.com/news/business/21705369-alternative-data-firms-are-shedding-new-light-corporate-performance-watchers> | International Business Times <http://www.ibtimes.co.uk/dataminr-solves-twitters-needle-haystack-problem-hedge-funds-banks-1576692> | Equities.com <https://www.equities.com/news/from-novelty-to-utility-how-dataminr-and-the-alternative-data-industry-is-becoming-mainstream> | SIA <http://newsmanager.commpartners.com/sianews2/issues/2016-08-19/11.html>