Thanks, Neha. I tried the same test with 0.8.2-beta and am happy to report I've been unable to reproduce the bad behavior. I'll follow up if this changes.
On Sun, Nov 9, 2014 at 9:30 PM, Neha Narkhede <neha.narkh...@gmail.com> wrote: > We fixed a couple issues related to automatic leader balancing and > controlled shutdown. Would you mind trying out 0.8.2-beta? > > On Fri, Nov 7, 2014 at 11:52 AM, Solon Gordon <so...@knewton.com> wrote: > > > We're using 0.8.1.1 with auto.leader.rebalance.enable=true. > > > > On Fri, Nov 7, 2014 at 2:35 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Solon, > > > > > > Which version of Kafka are you running and are you enabling auto leader > > > rebalance at the same time? > > > > > > Guozhang > > > > > > On Fri, Nov 7, 2014 at 8:41 AM, Solon Gordon <so...@knewton.com> > wrote: > > > > > > > Hi all, > > > > > > > > My team has observed that if a broker process is killed in the middle > > of > > > > the controlled shutdown procedure, the remaining brokers start > spewing > > > > errors and do not properly rebalance leadership. The cluster cannot > > > recover > > > > without major manual intervention. > > > > > > > > Here is how to reproduce the problem: > > > > 1. Create a Kafka 0.8.1.1 cluster with three brokers. (Let's call > them > > A, > > > > B, and C.) Set controlled.shutdown.enable=true. > > > > 2. Create a topic with replication_factor = 3 and a large number of > > > > partitions (say 100). > > > > 3. Send a TERM signal to broker A. This initiates controlled > shutdown. > > > > 4. Before controlled shutdown completes, quickly send a KILL signal > to > > > > broker A. > > > > > > > > Result: > > > > - Brokers B and C start logging ReplicaFetcherThread connection > errors > > > > every few milliseconds. (See below for an example.) > > > > - Broker A is still listed as a leader and ISR for any partitions > which > > > > were not transferred during controlled shutdown. This causes > connection > > > > errors when clients try to produce to or consume from these > partitions. > > > > > > > > This scenario is difficult to recover from. The only ways we have > found > > > are > > > > to restart broker A multiple times (if it still exists) or to kill > > both B > > > > and C and then start them one by one. Without this kind of > > intervention, > > > > the above issues persist indefinitely. > > > > > > > > This may sound like a contrived scenario, but it's exactly what we > have > > > > seen when a Kafka EC2 instance gets terminated by AWS. So this seems > > > like a > > > > real liability. > > > > > > > > Are there any existing JIRA tickets which cover this behavior? And do > > you > > > > have any recommendations for avoiding it, other than forsaking > > controlled > > > > shutdowns entirely? > > > > > > > > Thanks, > > > > Solon > > > > > > > > Error example: > > > > [2014-11-06 17:10:21,459] ERROR [ReplicaFetcherThread-0-1978259225], > > > Error > > > > in fetch Name: FetchRequest; Version: 0; CorrelationId: 3500; > ClientId: > > > > ReplicaFetcherThread-0-1978259225; ReplicaId: 1359390395; MaxWait: > 500 > > > ms; > > > > MinBytes: 1 bytes; RequestInfo: [my-topic,42] -> > > > > PartitionFetchInfo(503,10485760),[my-topic,63] -> > > > > PartitionFetchInfo(386,10485760),[my-topic,99] -> > > > > PartitionFetchInfo(525,10485760),[my-topic,84] -> > > > > PartitionFetchInfo(436,10485760),[my-topic,48] -> > > > > PartitionFetchInfo(484,10485760),[my-topic,75] -> > > > > PartitionFetchInfo(506,10485760),[my-topic,45] -> > > > > PartitionFetchInfo(473,10485760),[my-topic,66] -> > > > > PartitionFetchInfo(532,10485760),[my-topic,30] -> > > > > PartitionFetchInfo(435,10485760),[my-topic,96] -> > > > > PartitionFetchInfo(517,10485760),[my-topic,27] -> > > > > PartitionFetchInfo(470,10485760),[my-topic,36] -> > > > > PartitionFetchInfo(472,10485760),[my-topic,9] -> > > > > PartitionFetchInfo(514,10485760),[my-topic,33] -> > > > > PartitionFetchInfo(582,10485760),[my-topic,69] -> > > > > PartitionFetchInfo(504,10485760),[my-topic,57] -> > > > > PartitionFetchInfo(444,10485760),[my-topic,78] -> > > > > PartitionFetchInfo(559,10485760),[my-topic,12] -> > > > > PartitionFetchInfo(417,10485760),[my-topic,90] -> > > > > PartitionFetchInfo(429,10485760),[my-topic,18] -> > > > > PartitionFetchInfo(497,10485760),[my-topic,0] -> > > > > PartitionFetchInfo(402,10485760),[my-topic,6] -> > > > > PartitionFetchInfo(527,10485760),[my-topic,54] -> > > > > PartitionFetchInfo(524,10485760),[my-topic,15] -> > > > > PartitionFetchInfo(448,10485760),[console,0] -> > > > > PartitionFetchInfo(4,10485760) (kafka.server.ReplicaFetcherThread) > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at > > > sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) > > > > at > > > kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at > > kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) > > > > at > > > kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57) > > > > at > > > > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) > > > > at > > > > > > > > > > > > > > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) > > > > at > > > > > > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) > > > > at > > > > > > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > > > > at > > > > > > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > > > at > > > > > > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) > > > > at > > > > > > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > > > > at > > > > > > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > > > at > > kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) > > > > at > > > > > > > > > > > > > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) > > > > at > > > > > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > > > > at > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > [2014-11-06 17:10:21,462] WARN Reconnect due to socket error: null > > > > (kafka.consumer.SimpleConsumer) > > > > > > > > We also see these errors repeatedly in the controller log: > > > > [2014-11-06 21:37:50,945] ERROR > > > > [Controller-1359390395-to-broker-1978259225-send-thread], Controller > > > > 1359390395 epoch 6 failed to send StopReplica request with > correlation > > id > > > > 118 to broker > > id:1978259225,host:ip-10-164-59-90.ec2.internal,port:9092. > > > > Reconnecting to broker. (kafka.controller.RequestSendThread) > > > > java.nio.channels.ClosedChannelException > > > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) > > > > at > > > > > > > > > > > > > > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) > > > > at > > > > > > > > > > > > > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > [2014-11-06 21:37:50,947] ERROR > > > > [Controller-1359390395-to-broker-1978259225-send-thread], Controller > > > > 1359390395's connection to broker > > > > id:1978259225,host:ip-10-164-59-90.ec2.internal,port:9092 was > > > unsuccessful > > > > (kafka.controller.RequestSendThread) > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at > > > > > > > > > > > > > > kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173) > > > > at > > > > > > > > > > > > > > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140) > > > > at > > > > > > > > > > > > > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > >