Nelson Elhage created KAFKA-4358:
------------------------------------

             Summary: Following a hung broker, newly elected leader is 
unnecessarily slow assuming leadership because of ReplicaFetcherThread
                 Key: KAFKA-4358
                 URL: https://issues.apache.org/jira/browse/KAFKA-4358
             Project: Kafka
          Issue Type: Bug
          Components: replication
    Affects Versions: 0.10.0.1
            Reporter: Nelson Elhage
            Priority: Minor


When a broker handles a `LeaderAndIsr` request, the replica manager blocks 
waiting for idle replication fetcher threads to die before responding to the 
message and being able to service new produce requests.

If requests to a broker start blackholing (e.g. due to network failure, or due 
to the broker hanging), shutting down the `ReplicaFetcherThread` can take a 
long time (around 30s in my testing), blocking recovery of any partitions 
previously lead by that broker.

This is a very similar issue to KAFKA-612.

Instructions to reproduce/demonstrate:

Stand up three brokers and create a replicated topic:

{code}
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server1.properties &
bin/kafka-server-start.sh config/server2.properties &
bin/kafka-server-start.sh config/server3.properties &

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
--partitions 1 --topic replicated.topic
{code}

Identify the leader, and (for simplicity in interpreting the event) make sure 
it's not the same as the cluster controller:

{code}
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic 
replicated.topic
{code}

Start a stream of produce events (with a shortened timeout so we get faster 
visibility into when the cluster recovers):

{code}
echo request.timeout.ms=1000 >> config/producer.properties
bin/kafka-verifiable-producer.sh --throughput 2 --topic replicated.topic 
--broker-list localhost:9092 --producer.config $(pwd)/config/producer.properties
{code}

Now SIGSTOP the leader (3, in my example):

{code}
kill -STOP $(pgrep -f server3.properties)
{code}

The producer will log errors for about 30 seconds, and then recover. However, 
if we read logs, we'll see (excerpting key log lines from `state-change.log`):

{code}
[2016-10-28 20:36:03,128] TRACE Controller 2 epoch 8 sending become-leader 
LeaderAndIsr request (Leader:2,ISR:2,1,LeaderEpoch:22,ControllerEpoch:8) to 
broker 2 for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:36:03,131] TRACE Broker 2 handling LeaderAndIsr request 
correlationId 2 from controller 2 epoch 8 starting the become-leader transition 
for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:17,741] TRACE Controller 1 epoch 11 changed partition 
[replicated.topic,0] from OnlinePartition to OnlinePartition with leader 3 
(state.change.logger)
[2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 changed partition 
[replicated.topic,0] state from OnlinePartition to OfflinePartition 
(state.change.logger)
[2016-10-28 20:48:33,012] TRACE Controller 1 epoch 11 started leader election 
for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:33,016] TRACE Controller 1 epoch 11 elected leader 2 for 
Offline partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 changed partition 
[replicated.topic,0] from OfflinePartition to OnlinePartition with leader 2 
(state.change.logger)
[2016-10-28 20:48:33,017] TRACE Controller 1 epoch 11 sending become-leader 
LeaderAndIsr request (Leader:2,ISR:1,2,LeaderEpoch:30,ControllerEpoch:11) to 
broker 2 for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:33,023] TRACE Broker 2 received LeaderAndIsr request 
PartitionState(controllerEpoch=11, leader=2, leaderEpoch=30, isr=[1, 2], 
zkVersion=46, replicas=[1, 2, 3]) correlation id 18 from controller 1 epoch 11 
for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:33,024] TRACE Broker 2 handling LeaderAndIsr request 
correlationId 18 from controller 1 epoch 11 starting the become-leader 
transition for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:33,026] TRACE Broker 2 stopped fetchers as part of 
become-leader request from controller 1 epoch 11 with correlation id 18 for 
partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:33,026] TRACE Broker 2 completed LeaderAndIsr request 
correlationId 18 from controller 1 epoch 11 for the become-leader transition 
for partition [replicated.topic,0] (state.change.logger)
[2016-10-28 20:48:56,058] TRACE Controller 1 epoch 11 received response 
{error_code=0,partitions=[{topic=replicated.topic,partition=0,error_code=0}]} 
for a request sent to broker qa-dev1.northwest.stripe.io:9093 (id: 2 rack: 
null) (state.change.logger)
{code}

Note the ~23s pause between broker 2 logging completion of the LeaderAndIsr 
request, and the controller logging receipt. If we look at broker 2 
specifically now, we find

{code}
[2016-10-28 20:48:33,025] INFO [ReplicaFetcherManager on broker 2] Removed 
fetcher for partitions replicated.topic-0 (kafka.server.ReplicaFetcherManager)
[2016-10-28 20:48:33,026] INFO [ReplicaFetcherThread-0-3], Shutting down 
(kafka.server.ReplicaFetcherThread)
[2016-10-28 20:48:56,055] INFO [ReplicaFetcherThread-0-3], Stopped  
(kafka.server.ReplicaFetcherThread)
[2016-10-28 20:48:56,055] INFO [ReplicaFetcherThread-0-3], Shutdown completed 
(kafka.server.ReplicaFetcherThread)
{code}

Which timestamps exactly match the "missing" seconds. It also aligns with the 
first successful publish from my publish job, which happened at `2016-10-28 
20:48:57.555`

Aggressively dropping `request.timeout.ms` speeds up this failover process, but 
it seems that we should be able to recover those ~25s of unavailability without 
having to drop any additional timeouts, by either interrupting that thread 
somehow or not having to block on its shutdown.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to