Hi,

I encountered a strange issue in our kafka cluster, where randomly a single
broker entered a state where it seemed to think it was the only broker in
the cluster (it shrank all of its ISRs to just existing on itself). Some
details about the kafka cluster:

- running in an EC2 VPC on AWS
- 3 nodes (d2.xlarge)
- Kafka version : 0.10.1.0

More information about the incident:

Around 19:57 yesterday, one of the nodes somehow lost its connection to the
cluster and started reporting messages like this for what seemed to be all
of its hosted topic partitions:

[2016-11-28 19:57:05,426] INFO Partition [arches_stage,0] on broker 1002:
> Shrinking ISR for partition [arches_stage,0] from 1003,1002,1001 to 1002
> (kafka.cluster.Partition)
> [2016-11-28 19:57:05,466] INFO Partition [connect-offsets,13] on broker
> 1002: Shrinking ISR for partition [connect-offsets,13] from 1003,1002,1001
> to 1002 (kafka.cluster.Partition)
> [2016-11-28 19:57:05,489] INFO Partition [lasagna_prod_memstore,2] on
> broker 1002: Shrinking ISR for partition [lasagna_prod_memstore,2] from
> 1003,1002,1001 to 1002 (kafka.cluster.Partition)
> ...
>

It then added the ISRs from the other machines back in:

[2016-11-28 19:57:18,013] INFO Partition [arches_stage,0] on broker 1002:
> Expanding ISR for partition [arches_stage,0] from 1002 to 1002,1003
> (kafka.cluster.Partition)
> [2016-11-28 19:57:18,015] INFO Partition [connect-offsets,13] on broker
> 1002: Expanding ISR for partition [connect-offsets,13] from 1002 to
> 1002,1003 (kafka.cluster.Partition)
> [2016-11-28 19:57:18,018] INFO Partition [lasagna_prod_memstore,2] on
> broker 1002: Expanding ISR for partition [lasagna_prod_memstore,2] from
> 1002 to 1002,1003 (kafka.cluster.Partition)
> ...
> [2016-11-28 19:57:18,222] INFO Partition [arches_stage,0] on broker 1002:
> Expanding ISR for partition [arches_stage,0] from 1002,1003 to
> 1002,1003,1001 (kafka.cluster.Partition)
> [2016-11-28 19:57:18,224] INFO Partition [connect-offsets,13] on broker
> 1002: Expanding ISR for partition [connect-offsets,13] from 1002,1003 to
> 1002,1003,1001 (kafka.cluster.Partition)
> [2016-11-28 19:57:18,227] INFO Partition [lasagna_prod_memstore,2] on
> broker 1002: Expanding ISR for partition [lasagna_prod_memstore,2] from
> 1002,1003 to 1002,1003,1001 (kafka.cluster.Partition)


and eventually removed them again before going on its merry way:

[2016-11-28 19:58:05,408] INFO Partition [arches_stage,0] on broker 1002:
> Shrinking ISR for partition [arches_stage,0] from 1002,1003,1001 to 1002
> (kafka.cluster.Partition)
> [2016-11-28 19:58:05,415] INFO Partition [connect-offsets,13] on broker
> 1002: Shrinking ISR for partition [connect-offsets,13] from 1002,1003,1001
> to 1002 (kafka.cluster.Partition)
> [2016-11-28 19:58:05,416] INFO Partition [lasagna_prod_memstore,2] on
> broker 1002: Shrinking ISR for partition [lasagna_prod_memstore,2] from
> 1002,1003,1001 to 1002 (kafka.cluster.Partition)


Node 1002 continued running from that point on normally (outside of the
fact that all of it's partitions were under replicated). Also there were no
WARN/ERROR before/after this.


The other two nodes were not so happy however, with both failing to connect
to via the ReplicaFetcherThread to the node in question. The reported this
around the same time as that error:

[2016-11-28 19:57:16,087] WARN [ReplicaFetcherThread-0-1002], Error in
> fetch kafka.server.ReplicaFetcherThread$FetchRequest@6eb44718
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 1002 was disconnected before the
> response was read
>         at
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115)
>         at
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112)
>         at scala.Option.foreach(Option.scala:257)
>         at
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112)
>         at
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108)
>         at
> kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:137)
>         at
> kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
>         at
> kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108)
>         at
> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253)
>         at
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
>         at
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
>         at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
>         at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)


and then got stuck trying this every 30 seconds until I restarted node 1002:

[2016-11-28 20:02:04,513] WARN [ReplicaFetcherThread-0-1002], Error in
> fetch kafka.server.ReplicaFetcherThread$FetchRequest@1cd61a02
> (kafka.server.ReplicaFetcherThread)
> java.net.SocketTimeoutException: Failed to connect within 30000 ms
>         at
> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:249)
>         at
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
>         at
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
>         at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
>         at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)


I restarted the node when I noticed this, however because the replicas were
out of sync, we ended up having an unclean leader election and ultimately
losing data for the partitions on that machine. Some potentially
interesting things about the cluster state at that time:

- I *was* able to telnet to port 9092 on the machine that went rogue from
each of the other two kafka brokers (even while they were reporting failed
to connect)
- The number of open file descriptors on that machine started increased
linearly for the entire 1.5 hours the cluster was in this state, eventually
ending up at ~4x the usual open file descriptors. The number of open file
descriptors went back to normal after the restart.
- The heap size on the node in question started fluctuating very rapidly.
The usual behavior is the heap size slowly grows over a period of ~10 hours
and then I assume a large GC occurs and it starts this again. The node that
had this issue had the period of that behavior drop to ~5 mins.
- The heap size spiked to a size way higher than normal
- While the node was in this state the System/Process CPU dropped to ~1/8th
of its usual level.

I have the full logs and more metrics collected for all 3 nodes for that
time period and would be happy to share them, but I wasn't sure if the user
list supported attachments.

Any help would be greatly appreciated.

Thanks,


<http://dataminr.com/>


*Tom DeVoe*
Sofware Engineer, Data

6 East 32nd Street, 2nd Floor
New York, NY 10016



Dataminr is a Twitter Official Partner.
Dataminr in the news: The Economist
<http://www.economist.com/news/business/21705369-alternative-data-firms-are-shedding-new-light-corporate-performance-watchers>
 | International Business Times
<http://www.ibtimes.co.uk/dataminr-solves-twitters-needle-haystack-problem-hedge-funds-banks-1576692>
 | Equities.com
<https://www.equities.com/news/from-novelty-to-utility-how-dataminr-and-the-alternative-data-industry-is-becoming-mainstream>
 | SIA
<http://newsmanager.commpartners.com/sianews2/issues/2016-08-19/11.html>

Reply via email to