Thanks, Neha. I tried the same test with 0.8.2-beta and am happy to report
I've been unable to reproduce the bad behavior. I'll follow up if this
changes.

On Sun, Nov 9, 2014 at 9:30 PM, Neha Narkhede <neha.narkh...@gmail.com>
wrote:

> We fixed a couple issues related to automatic leader balancing and
> controlled shutdown. Would you mind trying out 0.8.2-beta?
>
> On Fri, Nov 7, 2014 at 11:52 AM, Solon Gordon <so...@knewton.com> wrote:
>
> > We're using 0.8.1.1 with auto.leader.rebalance.enable=true.
> >
> > On Fri, Nov 7, 2014 at 2:35 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Solon,
> > >
> > > Which version of Kafka are you running and are you enabling auto leader
> > > rebalance at the same time?
> > >
> > > Guozhang
> > >
> > > On Fri, Nov 7, 2014 at 8:41 AM, Solon Gordon <so...@knewton.com>
> wrote:
> > >
> > > > Hi all,
> > > >
> > > > My team has observed that if a broker process is killed in the middle
> > of
> > > > the controlled shutdown procedure, the remaining brokers start
> spewing
> > > > errors and do not properly rebalance leadership. The cluster cannot
> > > recover
> > > > without major manual intervention.
> > > >
> > > > Here is how to reproduce the problem:
> > > > 1. Create a Kafka 0.8.1.1 cluster with three brokers. (Let's call
> them
> > A,
> > > > B, and C.) Set controlled.shutdown.enable=true.
> > > > 2. Create a topic with replication_factor = 3 and a large number of
> > > > partitions (say 100).
> > > > 3. Send a TERM signal to broker A. This initiates controlled
> shutdown.
> > > > 4. Before controlled shutdown completes, quickly send a KILL signal
> to
> > > > broker A.
> > > >
> > > > Result:
> > > > - Brokers B and C start logging ReplicaFetcherThread connection
> errors
> > > > every few milliseconds. (See below for an example.)
> > > > - Broker A is still listed as a leader and ISR for any partitions
> which
> > > > were not transferred during controlled shutdown. This causes
> connection
> > > > errors when clients try to produce to or consume from these
> partitions.
> > > >
> > > > This scenario is difficult to recover from. The only ways we have
> found
> > > are
> > > > to restart broker A multiple times (if it still exists) or to kill
> > both B
> > > > and C and then start them one by one. Without this kind of
> > intervention,
> > > > the above issues persist indefinitely.
> > > >
> > > > This may sound like a contrived scenario, but it's exactly what we
> have
> > > > seen when a Kafka EC2 instance gets terminated by AWS. So this seems
> > > like a
> > > > real liability.
> > > >
> > > > Are there any existing JIRA tickets which cover this behavior? And do
> > you
> > > > have any recommendations for avoiding it, other than forsaking
> > controlled
> > > > shutdowns entirely?
> > > >
> > > > Thanks,
> > > > Solon
> > > >
> > > > Error example:
> > > > [2014-11-06 17:10:21,459] ERROR [ReplicaFetcherThread-0-1978259225],
> > > Error
> > > > in fetch Name: FetchRequest; Version: 0; CorrelationId: 3500;
> ClientId:
> > > > ReplicaFetcherThread-0-1978259225; ReplicaId: 1359390395; MaxWait:
> 500
> > > ms;
> > > > MinBytes: 1 bytes; RequestInfo: [my-topic,42] ->
> > > > PartitionFetchInfo(503,10485760),[my-topic,63] ->
> > > > PartitionFetchInfo(386,10485760),[my-topic,99] ->
> > > > PartitionFetchInfo(525,10485760),[my-topic,84] ->
> > > > PartitionFetchInfo(436,10485760),[my-topic,48] ->
> > > > PartitionFetchInfo(484,10485760),[my-topic,75] ->
> > > > PartitionFetchInfo(506,10485760),[my-topic,45] ->
> > > > PartitionFetchInfo(473,10485760),[my-topic,66] ->
> > > > PartitionFetchInfo(532,10485760),[my-topic,30] ->
> > > > PartitionFetchInfo(435,10485760),[my-topic,96] ->
> > > > PartitionFetchInfo(517,10485760),[my-topic,27] ->
> > > > PartitionFetchInfo(470,10485760),[my-topic,36] ->
> > > > PartitionFetchInfo(472,10485760),[my-topic,9] ->
> > > > PartitionFetchInfo(514,10485760),[my-topic,33] ->
> > > > PartitionFetchInfo(582,10485760),[my-topic,69] ->
> > > > PartitionFetchInfo(504,10485760),[my-topic,57] ->
> > > > PartitionFetchInfo(444,10485760),[my-topic,78] ->
> > > > PartitionFetchInfo(559,10485760),[my-topic,12] ->
> > > > PartitionFetchInfo(417,10485760),[my-topic,90] ->
> > > > PartitionFetchInfo(429,10485760),[my-topic,18] ->
> > > > PartitionFetchInfo(497,10485760),[my-topic,0] ->
> > > > PartitionFetchInfo(402,10485760),[my-topic,6] ->
> > > > PartitionFetchInfo(527,10485760),[my-topic,54] ->
> > > > PartitionFetchInfo(524,10485760),[my-topic,15] ->
> > > > PartitionFetchInfo(448,10485760),[console,0] ->
> > > > PartitionFetchInfo(4,10485760) (kafka.server.ReplicaFetcherThread)
> > > > java.net.ConnectException: Connection refused
> > > >         at sun.nio.ch.Net.connect0(Native Method)
> > > >         at sun.nio.ch.Net.connect(Net.java:465)
> > > >         at sun.nio.ch.Net.connect(Net.java:457)
> > > >         at
> > > sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> > > >         at
> > > kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > >         at
> > kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > >         at
> > > kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > >         at
> > > > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > >         at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > >         at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > >         at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > > >         at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > >         at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > > >         at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > > >         at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > >         at
> > kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> > > >         at
> > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > > >         at
> > > >
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > > >         at
> > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > [2014-11-06 17:10:21,462] WARN Reconnect due to socket error: null
> > > > (kafka.consumer.SimpleConsumer)
> > > >
> > > > We also see these errors repeatedly in the controller log:
> > > > [2014-11-06 21:37:50,945] ERROR
> > > > [Controller-1359390395-to-broker-1978259225-send-thread], Controller
> > > > 1359390395 epoch 6 failed to send StopReplica request with
> correlation
> > id
> > > > 118 to broker
> > id:1978259225,host:ip-10-164-59-90.ec2.internal,port:9092.
> > > > Reconnecting to broker. (kafka.controller.RequestSendThread)
> > > > java.nio.channels.ClosedChannelException
> > > >   at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
> > > >   at
> > > >
> > > >
> > >
> >
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
> > > >   at
> > > >
> > > >
> > >
> >
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
> > > >   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > [2014-11-06 21:37:50,947] ERROR
> > > > [Controller-1359390395-to-broker-1978259225-send-thread], Controller
> > > > 1359390395's connection to broker
> > > > id:1978259225,host:ip-10-164-59-90.ec2.internal,port:9092 was
> > > unsuccessful
> > > > (kafka.controller.RequestSendThread)
> > > > java.net.ConnectException: Connection refused
> > > >   at sun.nio.ch.Net.connect0(Native Method)
> > > >   at sun.nio.ch.Net.connect(Net.java:465)
> > > >   at sun.nio.ch.Net.connect(Net.java:457)
> > > >   at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> > > >   at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > >   at
> > > >
> > > >
> > >
> >
> kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
> > > >   at
> > > >
> > > >
> > >
> >
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
> > > >   at
> > > >
> > > >
> > >
> >
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
> > > >   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Reply via email to