Hi All,
I'm currently doing some work into investigating why our Kafka deployment has a 
prolonged period of URPs after a restart 30-45 minutes. At the moment we use a 
replication factor of 2 due to the cost associated with having another copy of 
each partition, however, we do plan to move to replication factor of 3 in 
future. We are using version 1.1.0. On the producer side we use 'acks=all' and 
'min.insync.replicas=1' which I believe means that while URPs exist then we 
will only have one copy of the data (single point of failure) until the 
replicas can start fetching from the leader again and expand the ISR list. 
I've done some digging into the logs and the Kafka source code and have some 
questions around the behavior of the replica threads after start up. I should 
state that this prolonged period of URPs only occurs when we have >3000-4000 
partitions per broker. At the moment we perform a rolling restart each weekend 
because we need the broker to get new Kerberos tickets. During the shutdown of 
the broker the leader partitions are moved to the current replica partition 
which means when the broker starts again all of it's partitions are replicas. 
What I noticed in the logs was that after the server had completed startup was 
around 30-45 minutes of the messages similar to the below in the logs:

    INFO [Log partition=topic-100-6, dir=/d/d4/kafka-data/kafka-data] 
Truncating to offset 170 (kafka.log.Log)    INFO [Log partition=topic-100-6, 
dir=/d/d4/kafka-data/kafka-data] Loading producer state from offset 170 with 
message format version 2 (kafka.log.Log)
My understanding is that the truncation occurs because in order to have 
consistency the new replica has to truncate to the highest offset which exists 
in both replicas (the High watermark?) so that there aren't messages in one 
replica and not in another, is this correct? I'm not exactly sure why the 
producer state needs to be loaded, what information does this contain? The 
current offset which the producer has received an acknowledgement for? I 
assumed this is used is 'enable.idempotence' is set to true? 
It seems as if these two steps need to be performed for each replica partition 
before the replica threads can start fetching records from the leaders (and 
bring the replica back in sync). Is this correct, and if so, is there anyway to 
reduce the time taken to perform these steps? 
I did increase the number of replica fetcher threads in my testing environment 
and it did seem to shorten the time taken, I assume the replicas on the broker 
are assigned in a round robin way to the replica fetcher threads so if you have 
more replica fetcher threads then each thread is responsible for setting up 
less partitions? 
Any help with the above would be greatly appreciated. 
Many Thanks, 
Jamie 

Reply via email to