[ 
https://issues.apache.org/jira/browse/KAFKA-4358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nelson Elhage updated KAFKA-4358:
---------------------------------
    Status: Open  (was: Patch Available)

> Following a hung broker, newly elected leader is unnecessarily slow assuming 
> leadership because of ReplicaFetcherThread
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4358
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4358
>             Project: Kafka
>          Issue Type: Bug
>          Components: replication
>    Affects Versions: 0.10.0.1
>            Reporter: Nelson Elhage
>            Priority: Minor
>
> When a broker handles a `LeaderAndIsr` request, the replica manager blocks 
> waiting for idle replication fetcher threads to die before responding to the 
> message and being able to service new produce requests.
> If requests to a broker start blackholing (e.g. due to network failure, or 
> due to the broker hanging), shutting down the `ReplicaFetcherThread` can take 
> a long time (around 30s in my testing), blocking recovery of any partitions 
> previously lead by that broker.
> This is a very similar issue to KAFKA-612.
> Instructions to reproduce/demonstrate:
> Stand up three brokers and create a replicated topic:
> {code}
> bin/zookeeper-server-start.sh config/zookeeper.properties &
> bin/kafka-server-start.sh config/server1.properties &
> bin/kafka-server-start.sh config/server2.properties &
> bin/kafka-server-start.sh config/server3.properties &
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 
> 3 --partitions 1 --topic replicated.topic
> {code}
> Identify the leader, and (for simplicity in interpreting the event) make sure 
> it's not the same as the cluster controller:
> {code}
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic 
> replicated.topic
> {code}
> Start a stream of produce events (with a shortened timeout so we get faster 
> visibility into when the cluster recovers):
> {code}
> echo request.timeout.ms=1000 >> config/producer.properties
> bin/kafka-verifiable-producer.sh --throughput 2 --topic replicated.topic 
> --broker-list localhost:9092 --producer.config 
> $(pwd)/config/producer.properties
> {code}
> Now SIGSTOP the leader (3, in my example):
> {code}
> kill -STOP $(pgrep -f server3.properties)
> {code}
> The producer will log errors for about 30 seconds, and then recover. However, 
> if we read logs, we'll see (excerpting key log lines from `state-change.log`):
> {code}
> [2016-10-28 20:36:03,128] TRACE Controller 2 epoch 8 sending become-leader 
> LeaderAndIsr request (Leader:2,ISR:2,1,LeaderEpoch:22,ControllerEpoch:8) to 
> broker 2 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:36:03,131] TRACE Broker 2 handling LeaderAndIsr request 
> correlationId 2 from controller 2 epoch 8 starting the become-leader 
> transition for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:17,741] TRACE Controller 1 epoch 11 changed partition 
> [replicated.topic,0] from OnlinePartition to OnlinePartition with leader 3 
> (state.change.logger)
> [2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 changed partition 
> [replicated.topic,0] state from OnlinePartition to OfflinePartition 
> (state.change.logger)
> [2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 started leader election 
> for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,016] TRACE Controller 1 epoch 11 elected leader 2 for 
> Offline partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 changed partition 
> [replicated.topic,0] from OfflinePartition to OnlinePartition with leader 2 
> (state.change.logger)
> [2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 sending become-leader 
> LeaderAndIsr request (Leader:2,ISR:1,2,LeaderEpoch:30,ControllerEpoch:11) to 
> broker 2 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,023] TRACE Broker 2 received LeaderAndIsr request 
> PartitionState(controllerEpoch=11, leader=2, leaderEpoch=30, isr=[1, 2], 
> zkVersion=46, replicas=[1, 2, 3]) correlation id 18 from controller 1 epoch 
> 11 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,024] TRACE Broker 2 handling LeaderAndIsr request 
> correlationId 18 from controller 1 epoch 11 starting the become-leader 
> transition for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,026] TRACE Broker 2 stopped fetchers as part of 
> become-leader request from controller 1 epoch 11 with correlation id 18 for 
> partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,026] TRACE Broker 2 completed LeaderAndIsr request 
> correlationId 18 from controller 1 epoch 11 for the become-leader transition 
> for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:56,058] TRACE Controller 1 epoch 11 received response 
> {error_code=0,partitions=[{topic=replicated.topic,partition=0,error_code=0}]} 
> for a request sent to broker qa-dev1.northwest.stripe.io:9093 (id: 2 rack: 
> null) (state.change.logger)
> {code}
> Note the ~23s pause between broker 2 logging completion of the LeaderAndIsr 
> request, and the controller logging receipt. If we look at broker 2 
> specifically now, we find
> {code}
> [2016-10-28 20:48:33,025] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions replicated.topic-0 (kafka.server.ReplicaFetcherManager)
> [2016-10-28 20:48:33,026] INFO [ReplicaFetcherThread-0-3], Shutting down 
> (kafka.server.ReplicaFetcherThread)
> [2016-10-28 20:48:56,055] INFO [ReplicaFetcherThread-0-3], Stopped  
> (kafka.server.ReplicaFetcherThread)
> [2016-10-28 20:48:56,055] INFO [ReplicaFetcherThread-0-3], Shutdown completed 
> (kafka.server.ReplicaFetcherThread)
> {code}
> Which timestamps exactly match the "missing" seconds. It also aligns with the 
> first successful publish from my publish job, which happened at `2016-10-28 
> 20:48:57.555`
> Aggressively dropping `request.timeout.ms` speeds up this failover process, 
> but it seems that we should be able to recover those ~25s of unavailability 
> without having to drop any additional timeouts, by either interrupting that 
> thread somehow or not having to block on its shutdown.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to