[ 
https://issues.apache.org/jira/browse/KAFKA-10853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17249390#comment-17249390
 ] 

Kyle Ambroff-Kao edited comment on KAFKA-10853 at 12/14/20, 11:20 PM:
----------------------------------------------------------------------

Thank you for the reply Guozhang!

In our deployments we have a few systems in place that make recovery from this 
situation automatic, without requiring intervention from an admin. The problem 
outlined in this ticket breaks this for topics with 
unclean.leader.election.enable=true and min.insync.replicas > 1.

The first mitigation we have is to just have brokers halt if they get stuck and 
cannot make progress. We have a config in our fork called 
request.max.local.time.ms which sets an upper bound on the amount of time a 
broker can spend actively handling a request. We set this to 60 seconds in 
production, so if some storage fault causes all produce requests to block 
indefinitely, after a minute the broker will halt and leadership will change to 
another broker.

[https://github.com/linkedin/kafka/commit/8926f588329]

In the case where performance of the leader is bad but it is still making 
progress, we use the slow broker detection code in 
[https://github.com/linkedin/cruise-control] to detect the poor performance and 
"demote" the broker, which involves changing the preferred leader of all 
topic-partitions hosted on that broker and performing a 
preferred-leader-election.

These automatons work for most of our deployments, but not for deployments 
where we use high durability configurations as outlined by this ticket.

I'm not really up on recent admin requests which could bypass 
unclean.leader.election, but in general I think a solution that prevented this 
situation in the first place would be more desirable. I don't want to have to 
build automation outside of Kafka which tries to determine the safety of such 
an operation when the controller could do it instead.


was (Author: ambroff):
In our deployments we have a few systems in place that make recovery from this 
situation automatic, without requiring intervention from an admin. The problem 
outlined in this ticket breaks this for topics with 
unclean.leader.election.enable=true and min.insync.replicas > 1.

The first mitigation we have is to just have brokers halt if they get stuck and 
cannot make progress. We have a config in our fork called 
request.max.local.time.ms which sets an upper bound on the amount of time a 
broker can spend actively handling a request. We set this to 60 seconds in 
production, so if some storage fault causes all produce requests to block 
indefinitely, after a minute the broker will halt and leadership will change to 
another broker.

[https://github.com/linkedin/kafka/commit/8926f588329]

In the case where performance of the leader is bad but it is still making 
progress, we use the slow broker detection code in 
[https://github.com/linkedin/cruise-control] to detect the poor performance and 
"demote" the broker, which involves changing the preferred leader of all 
topic-partitions hosted on that broker and performing a 
preferred-leader-election.

These automatons work for most of our deployments, but not for deployments 
where we use high durability configurations as outlined by this ticket.

I'm not really up on recent admin requests which could bypass 
unclean.leader.election, but in general I think a solution that prevented this 
situation in the first place would be more desirable. I don't want to have to 
build automation outside of Kafka which tries to determine the safety of such 
an operation when the controller could do it instead.

> Replication protocol deficiencies with workloads requiring high durability 
> guarantees
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10853
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10853
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.4.0
>            Reporter: Kyle Ambroff-Kao
>            Priority: Major
>
> *tl;dr: The definition of ISR and the consistency model from the perspective 
> of the producer seem a bit out of sync*
> We have many systems in production that trade off availability in order to 
> provide stronger consistency guarantees. Most of these configurations look 
> like this:
> Topic configuration:
>  * replication factor 3
>  * min.insync.replicas=2
>  * unclean.leader.election.enable=false
> Producer configuration:
>  * acks=all
> Broker configuration:
>  * replica.lag.time.max.ms=10000
> So the goal here is to reduce the chance of ever dropping a message that the 
> leader has acknowledged to the producer.
> This works great, except that we've found some situations in production where 
> we are forced to enable unclean leader election to recover, which we never 
> want to do. These situations all seem totally avoidable with some small 
> tweaks to the replication protocol.
> *A scenario we've seen many times*
> The following sequence of events are in time order: A replica set for a 
> topic-partition TP with leader L and replicas R1 and R2. All three replicas 
> are in ISR.
>  # Producer sends ProduceRequest R with acks=all that contains a message 
> batch to the leader L.
>  # L receives R and appends the batch it contains to the active segment of TP 
> but does not ack to the producer yet because the request was acks=all
>  # A storage fault occurs on L which makes all IOPS take a long time but 
> doesn't cause a hard failure.
>  # R1 and R2 send follower fetch requests to L which are infinitely delayed 
> due to the storage fault on L.
>  # 10 seconds after appending the batch and appending it to the log, L 
> shrinks the ISR, removing R1 and R2. This is because ISR is defined as at 
> most replica.lag.time.max.ms milliseconds behind the log append time of the 
> leader end offset. The leader end offset is a message that has not been 
> replicated yet.
> The storage fault example in step 3 could easily be another kind of fault. 
> Say for example, L is partitioned from R1 and R2 but not from ZooKeeper or 
> the producer.
> The producer never receives acknowledgement of the ProduceRequest because the 
> min.insync.replicas constraint was never satisfied. So in terms of data 
> consistency, everything is working fine.
> The problem is recovering from this situation. If the fault on L is not a 
> temporary blip, then L needs to be replaced. But since L shrunk the ISR, the 
> only way that leadership can move to either R1 or R2 is to set 
> unclean.leader.election.enable=true.
> This works but it is a potentially unsafe way to recover and move leadership. 
> It would be better to have other options.
> *Recovery could be automatic in this scenario.*
> If you think about it, from the perspective of the producer, the write was 
> not acknowledged, and therefore, L, R1 and R2 are actually in-sync. So it 
> should actually be totally safe for leadership to transition to either R1 or 
> R2.
> It seems that the producer and the leader don't have fully compatible 
> definitions for what it means for the replica set to be in-sync. If the 
> leader L used different rules for defining ISR, it could allow self-healing 
> in this or similar scenarios, since the ISR would not shrink.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to