Hi all, My team has observed that if a broker process is killed in the middle of the controlled shutdown procedure, the remaining brokers start spewing errors and do not properly rebalance leadership. The cluster cannot recover without major manual intervention.
Here is how to reproduce the problem: 1. Create a Kafka 0.8.1.1 cluster with three brokers. (Let's call them A, B, and C.) Set controlled.shutdown.enable=true. 2. Create a topic with replication_factor = 3 and a large number of partitions (say 100). 3. Send a TERM signal to broker A. This initiates controlled shutdown. 4. Before controlled shutdown completes, quickly send a KILL signal to broker A. Result: - Brokers B and C start logging ReplicaFetcherThread connection errors every few milliseconds. (See below for an example.) - Broker A is still listed as a leader and ISR for any partitions which were not transferred during controlled shutdown. This causes connection errors when clients try to produce to or consume from these partitions. This scenario is difficult to recover from. The only ways we have found are to restart broker A multiple times (if it still exists) or to kill both B and C and then start them one by one. Without this kind of intervention, the above issues persist indefinitely. This may sound like a contrived scenario, but it's exactly what we have seen when a Kafka EC2 instance gets terminated by AWS. So this seems like a real liability. Are there any existing JIRA tickets which cover this behavior? And do you have any recommendations for avoiding it, other than forsaking controlled shutdowns entirely? Thanks, Solon Error example: [2014-11-06 17:10:21,459] ERROR [ReplicaFetcherThread-0-1978259225], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 3500; ClientId: ReplicaFetcherThread-0-1978259225; ReplicaId: 1359390395; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [my-topic,42] -> PartitionFetchInfo(503,10485760),[my-topic,63] -> PartitionFetchInfo(386,10485760),[my-topic,99] -> PartitionFetchInfo(525,10485760),[my-topic,84] -> PartitionFetchInfo(436,10485760),[my-topic,48] -> PartitionFetchInfo(484,10485760),[my-topic,75] -> PartitionFetchInfo(506,10485760),[my-topic,45] -> PartitionFetchInfo(473,10485760),[my-topic,66] -> PartitionFetchInfo(532,10485760),[my-topic,30] -> PartitionFetchInfo(435,10485760),[my-topic,96] -> PartitionFetchInfo(517,10485760),[my-topic,27] -> PartitionFetchInfo(470,10485760),[my-topic,36] -> PartitionFetchInfo(472,10485760),[my-topic,9] -> PartitionFetchInfo(514,10485760),[my-topic,33] -> PartitionFetchInfo(582,10485760),[my-topic,69] -> PartitionFetchInfo(504,10485760),[my-topic,57] -> PartitionFetchInfo(444,10485760),[my-topic,78] -> PartitionFetchInfo(559,10485760),[my-topic,12] -> PartitionFetchInfo(417,10485760),[my-topic,90] -> PartitionFetchInfo(429,10485760),[my-topic,18] -> PartitionFetchInfo(497,10485760),[my-topic,0] -> PartitionFetchInfo(402,10485760),[my-topic,6] -> PartitionFetchInfo(527,10485760),[my-topic,54] -> PartitionFetchInfo(524,10485760),[my-topic,15] -> PartitionFetchInfo(448,10485760),[console,0] -> PartitionFetchInfo(4,10485760) (kafka.server.ReplicaFetcherThread) java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:465) at sun.nio.ch.Net.connect(Net.java:457) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2014-11-06 17:10:21,462] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer) We also see these errors repeatedly in the controller log: [2014-11-06 21:37:50,945] ERROR [Controller-1359390395-to-broker-1978259225-send-thread], Controller 1359390395 epoch 6 failed to send StopReplica request with correlation id 118 to broker id:1978259225,host:ip-10-164-59-90.ec2.internal,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2014-11-06 21:37:50,947] ERROR [Controller-1359390395-to-broker-1978259225-send-thread], Controller 1359390395's connection to broker id:1978259225,host:ip-10-164-59-90.ec2.internal,port:9092 was unsuccessful (kafka.controller.RequestSendThread) java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:465) at sun.nio.ch.Net.connect(Net.java:457) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)