[ 
https://issues.apache.org/jira/browse/KAFKA-4358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nelson Elhage updated KAFKA-4358:
---------------------------------
    Comment: was deleted

(was: This patch resolves the hang, but I'm not sure whether or not it's 
otherwise safe. It leaves the risk that the old fetch thread will be running 
concurrently with future operation, but we know that the fetch thread has an 
empty `partitionCount`, which should be sufficient to cause it to take no 
future observable action, as best I can tell.)

> Following a hung broker, newly elected leader is unnecessarily slow assuming 
> leadership because of ReplicaFetcherThread
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4358
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4358
>             Project: Kafka
>          Issue Type: Bug
>          Components: replication
>    Affects Versions: 0.10.0.1
>            Reporter: Nelson Elhage
>            Priority: Minor
>
> When a broker handles a `LeaderAndIsr` request, the replica manager blocks 
> waiting for idle replication fetcher threads to die before responding to the 
> message and being able to service new produce requests.
> If requests to a broker start blackholing (e.g. due to network failure, or 
> due to the broker hanging), shutting down the `ReplicaFetcherThread` can take 
> a long time (around 30s in my testing), blocking recovery of any partitions 
> previously lead by that broker.
> This is a very similar issue to KAFKA-612.
> Instructions to reproduce/demonstrate:
> Stand up three brokers and create a replicated topic:
> {code}
> bin/zookeeper-server-start.sh config/zookeeper.properties &
> bin/kafka-server-start.sh config/server1.properties &
> bin/kafka-server-start.sh config/server2.properties &
> bin/kafka-server-start.sh config/server3.properties &
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 
> 3 --partitions 1 --topic replicated.topic
> {code}
> Identify the leader, and (for simplicity in interpreting the event) make sure 
> it's not the same as the cluster controller:
> {code}
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic 
> replicated.topic
> {code}
> Start a stream of produce events (with a shortened timeout so we get faster 
> visibility into when the cluster recovers):
> {code}
> echo request.timeout.ms=1000 >> config/producer.properties
> bin/kafka-verifiable-producer.sh --throughput 2 --topic replicated.topic 
> --broker-list localhost:9092 --producer.config 
> $(pwd)/config/producer.properties
> {code}
> Now SIGSTOP the leader (3, in my example):
> {code}
> kill -STOP $(pgrep -f server3.properties)
> {code}
> The producer will log errors for about 30 seconds, and then recover. However, 
> if we read logs, we'll see (excerpting key log lines from `state-change.log`):
> {code}
> [2016-10-28 20:36:03,128] TRACE Controller 2 epoch 8 sending become-leader 
> LeaderAndIsr request (Leader:2,ISR:2,1,LeaderEpoch:22,ControllerEpoch:8) to 
> broker 2 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:36:03,131] TRACE Broker 2 handling LeaderAndIsr request 
> correlationId 2 from controller 2 epoch 8 starting the become-leader 
> transition for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:17,741] TRACE Controller 1 epoch 11 changed partition 
> [replicated.topic,0] from OnlinePartition to OnlinePartition with leader 3 
> (state.change.logger)
> [2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 changed partition 
> [replicated.topic,0] state from OnlinePartition to OfflinePartition 
> (state.change.logger)
> [2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 started leader election 
> for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,016] TRACE Controller 1 epoch 11 elected leader 2 for 
> Offline partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 changed partition 
> [replicated.topic,0] from OfflinePartition to OnlinePartition with leader 2 
> (state.change.logger)
> [2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 sending become-leader 
> LeaderAndIsr request (Leader:2,ISR:1,2,LeaderEpoch:30,ControllerEpoch:11) to 
> broker 2 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,023] TRACE Broker 2 received LeaderAndIsr request 
> PartitionState(controllerEpoch=11, leader=2, leaderEpoch=30, isr=[1, 2], 
> zkVersion=46, replicas=[1, 2, 3]) correlation id 18 from controller 1 epoch 
> 11 for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,024] TRACE Broker 2 handling LeaderAndIsr request 
> correlationId 18 from controller 1 epoch 11 starting the become-leader 
> transition for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,026] TRACE Broker 2 stopped fetchers as part of 
> become-leader request from controller 1 epoch 11 with correlation id 18 for 
> partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:33,026] TRACE Broker 2 completed LeaderAndIsr request 
> correlationId 18 from controller 1 epoch 11 for the become-leader transition 
> for partition [replicated.topic,0] (state.change.logger)
> [2016-10-28 20:48:56,058] TRACE Controller 1 epoch 11 received response 
> {error_code=0,partitions=[{topic=replicated.topic,partition=0,error_code=0}]} 
> for a request sent to broker qa-dev1.northwest.stripe.io:9093 (id: 2 rack: 
> null) (state.change.logger)
> {code}
> Note the ~23s pause between broker 2 logging completion of the LeaderAndIsr 
> request, and the controller logging receipt. If we look at broker 2 
> specifically now, we find
> {code}
> [2016-10-28 20:48:33,025] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions replicated.topic-0 (kafka.server.ReplicaFetcherManager)
> [2016-10-28 20:48:33,026] INFO [ReplicaFetcherThread-0-3], Shutting down 
> (kafka.server.ReplicaFetcherThread)
> [2016-10-28 20:48:56,055] INFO [ReplicaFetcherThread-0-3], Stopped  
> (kafka.server.ReplicaFetcherThread)
> [2016-10-28 20:48:56,055] INFO [ReplicaFetcherThread-0-3], Shutdown completed 
> (kafka.server.ReplicaFetcherThread)
> {code}
> Which timestamps exactly match the "missing" seconds. It also aligns with the 
> first successful publish from my publish job, which happened at `2016-10-28 
> 20:48:57.555`
> Aggressively dropping `request.timeout.ms` speeds up this failover process, 
> but it seems that we should be able to recover those ~25s of unavailability 
> without having to drop any additional timeouts, by either interrupting that 
> thread somehow or not having to block on its shutdown.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to