[ https://issues.apache.org/jira/browse/KAFKA-4358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nelson Elhage updated KAFKA-4358: --------------------------------- Status: Open (was: Patch Available) > Following a hung broker, newly elected leader is unnecessarily slow assuming > leadership because of ReplicaFetcherThread > ----------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-4358 > URL: https://issues.apache.org/jira/browse/KAFKA-4358 > Project: Kafka > Issue Type: Bug > Components: replication > Affects Versions: 0.10.0.1 > Reporter: Nelson Elhage > Priority: Minor > > When a broker handles a `LeaderAndIsr` request, the replica manager blocks > waiting for idle replication fetcher threads to die before responding to the > message and being able to service new produce requests. > If requests to a broker start blackholing (e.g. due to network failure, or > due to the broker hanging), shutting down the `ReplicaFetcherThread` can take > a long time (around 30s in my testing), blocking recovery of any partitions > previously lead by that broker. > This is a very similar issue to KAFKA-612. > Instructions to reproduce/demonstrate: > Stand up three brokers and create a replicated topic: > {code} > bin/zookeeper-server-start.sh config/zookeeper.properties & > bin/kafka-server-start.sh config/server1.properties & > bin/kafka-server-start.sh config/server2.properties & > bin/kafka-server-start.sh config/server3.properties & > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor > 3 --partitions 1 --topic replicated.topic > {code} > Identify the leader, and (for simplicity in interpreting the event) make sure > it's not the same as the cluster controller: > {code} > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic > replicated.topic > {code} > Start a stream of produce events (with a shortened timeout so we get faster > visibility into when the cluster recovers): > {code} > echo request.timeout.ms=1000 >> config/producer.properties > bin/kafka-verifiable-producer.sh --throughput 2 --topic replicated.topic > --broker-list localhost:9092 --producer.config > $(pwd)/config/producer.properties > {code} > Now SIGSTOP the leader (3, in my example): > {code} > kill -STOP $(pgrep -f server3.properties) > {code} > The producer will log errors for about 30 seconds, and then recover. However, > if we read logs, we'll see (excerpting key log lines from `state-change.log`): > {code} > [2016-10-28 20:36:03,128] TRACE Controller 2 epoch 8 sending become-leader > LeaderAndIsr request (Leader:2,ISR:2,1,LeaderEpoch:22,ControllerEpoch:8) to > broker 2 for partition [replicated.topic,0] (state.change.logger) > [2016-10-28 20:36:03,131] TRACE Broker 2 handling LeaderAndIsr request > correlationId 2 from controller 2 epoch 8 starting the become-leader > transition for partition [replicated.topic,0] (state.change.logger) > [2016-10-28 20:48:17,741] TRACE Controller 1 epoch 11 changed partition > [replicated.topic,0] from OnlinePartition to OnlinePartition with leader 3 > (state.change.logger) > [2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 changed partition > [replicated.topic,0] state from OnlinePartition to OfflinePartition > (state.change.logger) > [2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 started leader election > for partition [replicated.topic,0] (state.change.logger) > [2016-10-28 20:48:33,016] TRACE Controller 1 epoch 11 elected leader 2 for > Offline partition [replicated.topic,0] (state.change.logger) > [2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 changed partition > [replicated.topic,0] from OfflinePartition to OnlinePartition with leader 2 > (state.change.logger) > [2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 sending become-leader > LeaderAndIsr request (Leader:2,ISR:1,2,LeaderEpoch:30,ControllerEpoch:11) to > broker 2 for partition [replicated.topic,0] (state.change.logger) > [2016-10-28 20:48:33,023] TRACE Broker 2 received LeaderAndIsr request > PartitionState(controllerEpoch=11, leader=2, leaderEpoch=30, isr=[1, 2], > zkVersion=46, replicas=[1, 2, 3]) correlation id 18 from controller 1 epoch > 11 for partition [replicated.topic,0] (state.change.logger) > [2016-10-28 20:48:33,024] TRACE Broker 2 handling LeaderAndIsr request > correlationId 18 from controller 1 epoch 11 starting the become-leader > transition for partition [replicated.topic,0] (state.change.logger) > [2016-10-28 20:48:33,026] TRACE Broker 2 stopped fetchers as part of > become-leader request from controller 1 epoch 11 with correlation id 18 for > partition [replicated.topic,0] (state.change.logger) > [2016-10-28 20:48:33,026] TRACE Broker 2 completed LeaderAndIsr request > correlationId 18 from controller 1 epoch 11 for the become-leader transition > for partition [replicated.topic,0] (state.change.logger) > [2016-10-28 20:48:56,058] TRACE Controller 1 epoch 11 received response > {error_code=0,partitions=[{topic=replicated.topic,partition=0,error_code=0}]} > for a request sent to broker qa-dev1.northwest.stripe.io:9093 (id: 2 rack: > null) (state.change.logger) > {code} > Note the ~23s pause between broker 2 logging completion of the LeaderAndIsr > request, and the controller logging receipt. If we look at broker 2 > specifically now, we find > {code} > [2016-10-28 20:48:33,025] INFO [ReplicaFetcherManager on broker 2] Removed > fetcher for partitions replicated.topic-0 (kafka.server.ReplicaFetcherManager) > [2016-10-28 20:48:33,026] INFO [ReplicaFetcherThread-0-3], Shutting down > (kafka.server.ReplicaFetcherThread) > [2016-10-28 20:48:56,055] INFO [ReplicaFetcherThread-0-3], Stopped > (kafka.server.ReplicaFetcherThread) > [2016-10-28 20:48:56,055] INFO [ReplicaFetcherThread-0-3], Shutdown completed > (kafka.server.ReplicaFetcherThread) > {code} > Which timestamps exactly match the "missing" seconds. It also aligns with the > first successful publish from my publish job, which happened at `2016-10-28 > 20:48:57.555` > Aggressively dropping `request.timeout.ms` speeds up this failover process, > but it seems that we should be able to recover those ~25s of unavailability > without having to drop any additional timeouts, by either interrupting that > thread somehow or not having to block on its shutdown. -- This message was sent by Atlassian JIRA (v6.3.4#6332)