Hi all,

My team has observed that if a broker process is killed in the middle of
the controlled shutdown procedure, the remaining brokers start spewing
errors and do not properly rebalance leadership. The cluster cannot recover
without major manual intervention.

Here is how to reproduce the problem:
1. Create a Kafka 0.8.1.1 cluster with three brokers. (Let's call them A,
B, and C.) Set controlled.shutdown.enable=true.
2. Create a topic with replication_factor = 3 and a large number of
partitions (say 100).
3. Send a TERM signal to broker A. This initiates controlled shutdown.
4. Before controlled shutdown completes, quickly send a KILL signal to
broker A.

Result:
- Brokers B and C start logging ReplicaFetcherThread connection errors
every few milliseconds. (See below for an example.)
- Broker A is still listed as a leader and ISR for any partitions which
were not transferred during controlled shutdown. This causes connection
errors when clients try to produce to or consume from these partitions.

This scenario is difficult to recover from. The only ways we have found are
to restart broker A multiple times (if it still exists) or to kill both B
and C and then start them one by one. Without this kind of intervention,
the above issues persist indefinitely.

This may sound like a contrived scenario, but it's exactly what we have
seen when a Kafka EC2 instance gets terminated by AWS. So this seems like a
real liability.

Are there any existing JIRA tickets which cover this behavior? And do you
have any recommendations for avoiding it, other than forsaking controlled
shutdowns entirely?

Thanks,
Solon

Error example:
[2014-11-06 17:10:21,459] ERROR [ReplicaFetcherThread-0-1978259225], Error
in fetch Name: FetchRequest; Version: 0; CorrelationId: 3500; ClientId:
ReplicaFetcherThread-0-1978259225; ReplicaId: 1359390395; MaxWait: 500 ms;
MinBytes: 1 bytes; RequestInfo: [my-topic,42] ->
PartitionFetchInfo(503,10485760),[my-topic,63] ->
PartitionFetchInfo(386,10485760),[my-topic,99] ->
PartitionFetchInfo(525,10485760),[my-topic,84] ->
PartitionFetchInfo(436,10485760),[my-topic,48] ->
PartitionFetchInfo(484,10485760),[my-topic,75] ->
PartitionFetchInfo(506,10485760),[my-topic,45] ->
PartitionFetchInfo(473,10485760),[my-topic,66] ->
PartitionFetchInfo(532,10485760),[my-topic,30] ->
PartitionFetchInfo(435,10485760),[my-topic,96] ->
PartitionFetchInfo(517,10485760),[my-topic,27] ->
PartitionFetchInfo(470,10485760),[my-topic,36] ->
PartitionFetchInfo(472,10485760),[my-topic,9] ->
PartitionFetchInfo(514,10485760),[my-topic,33] ->
PartitionFetchInfo(582,10485760),[my-topic,69] ->
PartitionFetchInfo(504,10485760),[my-topic,57] ->
PartitionFetchInfo(444,10485760),[my-topic,78] ->
PartitionFetchInfo(559,10485760),[my-topic,12] ->
PartitionFetchInfo(417,10485760),[my-topic,90] ->
PartitionFetchInfo(429,10485760),[my-topic,18] ->
PartitionFetchInfo(497,10485760),[my-topic,0] ->
PartitionFetchInfo(402,10485760),[my-topic,6] ->
PartitionFetchInfo(527,10485760),[my-topic,54] ->
PartitionFetchInfo(524,10485760),[my-topic,15] ->
PartitionFetchInfo(448,10485760),[console,0] ->
PartitionFetchInfo(4,10485760) (kafka.server.ReplicaFetcherThread)
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:465)
        at sun.nio.ch.Net.connect(Net.java:457)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
        at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
        at
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
        at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
        at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
        at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-11-06 17:10:21,462] WARN Reconnect due to socket error: null
(kafka.consumer.SimpleConsumer)

We also see these errors repeatedly in the controller log:
[2014-11-06 21:37:50,945] ERROR
[Controller-1359390395-to-broker-1978259225-send-thread], Controller
1359390395 epoch 6 failed to send StopReplica request with correlation id
118 to broker id:1978259225,host:ip-10-164-59-90.ec2.internal,port:9092.
Reconnecting to broker. (kafka.controller.RequestSendThread)
java.nio.channels.ClosedChannelException
  at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
  at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
  at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-11-06 21:37:50,947] ERROR
[Controller-1359390395-to-broker-1978259225-send-thread], Controller
1359390395's connection to broker
id:1978259225,host:ip-10-164-59-90.ec2.internal,port:9092 was unsuccessful
(kafka.controller.RequestSendThread)
java.net.ConnectException: Connection refused
  at sun.nio.ch.Net.connect0(Native Method)
  at sun.nio.ch.Net.connect(Net.java:465)
  at sun.nio.ch.Net.connect(Net.java:457)
  at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
  at
kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
  at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
  at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

Reply via email to