[ 
https://issues.apache.org/jira/browse/KAFKA-10853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Ambroff-Kao updated KAFKA-10853:
-------------------------------------
    Description: 
*tl;dr: The definition of ISR and the consistency model from the perspective of 
the producer seem a bit out of sync*

We have many systems in production that trade off availability in order to 
provide stronger consistency guarantees. Most of these configurations look like 
this:

Topic configuration:
 * replication factor 3
 * min.insync.replicas=2
 * unclean.leader.election.enable=false

Producer configuration:
 * acks=all

Broker configuration:
 * replica.lag.time.max.ms=10000

So the goal here is to reduce the chance of ever dropping a message that the 
leader has acknowledged to the producer.

This works great, except that we've found some situations in production where 
we are forced to enable unclean leader election to recover, which we never want 
to do. These situations all seem totally avoidable with some small tweaks to 
the replication protocol.

*A scenario we've seen many times*

The following sequence of events are in time order: A replica set for a 
topic-partition TP with leader L and replicas R1 and R2. All three replicas are 
in ISR.
 # Producer sends ProduceRequest R with acks=all that contains a message batch 
to the leader L.
 # L receives R and appends the batch it contains to the active segment of TP 
but does not ack to the producer yet because the request was acks=all
 # A storage fault occurs on L which makes all IOPS take a long time but 
doesn't cause a hard failure.
 # R1 and R2 send follower fetch requests to L which are infinitely delayed due 
to the storage fault on L.
 # 10 seconds after appending the batch and appending it to the log, L shrinks 
the ISR, removing R1 and R2. This is because ISR is defined as at most 
replica.lag.time.max.ms milliseconds behind the log append time of the leader 
end offset. The leader end offset is a message that has not been replicated yet.

The storage fault example in step 3 could easily be another kind of fault. Say 
for example, L is partitioned from R1 and R2 but not from ZooKeeper or the 
producer.

The producer never receives acknowledgement of the ProduceRequest because the 
min.insync.replicas constraint was never satisfied. So in terms of data 
consistency, everything is working fine.

The problem is recovering from this situation. If the fault on L is not a 
temporary blip, then L needs to be replaced. But since L shrunk the ISR, the 
only way that leadership can move to either R1 or R2 is to set 
unclean.leader.election.enable=true.

This works but it is a potentially unsafe way to recover and move leadership. 
It would be better to have other options.

*Recovery could be automatic in this scenario.*

If you think about it, from the perspective of the producer, the write was not 
acknowledged, and therefore, L, R1 and R2 are actually in-sync. So it should 
actually be totally safe for leadership to transition to either R1 or R2.

It seems that the producer and the leader don't have fully compatible 
definitions for what it means for the replica set to be in-sync. If the leader 
L used different rules for defining ISR, it could allow self-healing in this or 
similar scenarios, since the ISR would not shrink.

*Potential alternatives*

Today the leader tracks a couple of different offsets. One is the leader end 
offset, which is the highest offset appended to the log for the topic-partition 
TP.

Another is the high watermark, which is the highest offset appended in the ISR.

We can't really make a simple change like changing the definition of ISR to use 
the high watermark of TP, because the high watermark depends on the state of 
the ISR. So this change would be pretty broken:
{noformat}
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 2d28aa12b69b..e141ec090a28 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -901,7 +901,7 @@ class Partition(val topicPartition: TopicPartition,
      **/
     val candidateReplicaIds = inSyncReplicaIds - localBrokerId
     val currentTimeMs = time.milliseconds()
-    val leaderEndOffset = localLogOrException.logEndOffset
+    val leaderEndOffset = localLogOrException.highWatermark
     candidateReplicaIds.filter(replicaId => isFollowerOutOfSync(replicaId, 
leaderEndOffset, currentTimeMs, maxLagMs))
   } {noformat}
It seems like the a potential solution would be to track a third offset, which 
would be the largest offset acknowledged to a producer in response to a 
ProduceRequest. Say largestAckedOffset.

If the leader L defined the ISR as no more than replica.lag.time.max.ms behind 
the log append time of largestAckedOffset, then the failure mode I described 
above would not lead to the ISR shrinking. And from the perspective of the 
producer, no data has been lost which has been acknowledged by L.

I think this change is also compatible with more lossy topic configurations, 
where min.insync.replicas=1, RF=2, unclean.leader.election.enable=true and 
acks=1. In that case, the largestAckedOffset would always be equal to the 
leaderEndOffset, so effectively nothing has changed.

 

  was:
*tl;dr: The definition of ISR and the consistency model from the perspective of 
the producer seem a bit out of sync*

We have many systems in production that trade off availability in order to 
provide stronger consistency guarantees. Most of these configurations look like 
this:

Topic configuration:
 * replication factor 3
 * min.insync.replicas=2
 * unclean.leader.election.enable=false

Producer configuration:
 * acks=all

Broker configuration:
 * replica.lag.time.max.ms=10000

So the goal here is to reduce the chance of ever dropping a message that the 
leader has acknowledged to the producer.

This works great, except that we've found some situations in production where 
we are forced to enable unclean leader election to recover, which we never want 
to do. These situations all seem totally avoidable with some small tweaks to 
the replication protocol.

*A scenario we've seen many times*

The following sequence of events are in time order: A replica set for a 
topic-partition TP with leader L and replicas R1 and R2. All three replicas are 
in ISR.
 # Producer sends ProduceRequest R with acks=all that contains a message batch 
to the leader L.
 # L receives R and appends the batch it contains to the active segment of TP 
but does not ack to the producer yet because the request was acks=all
 # A storage fault occurs on L which makes all IOPS take a long time but 
doesn't cause a hard failure.
 # R1 and R2 send follower fetch requests to L which are infinitely delayed due 
to the storage fault on L.
 # 10 seconds after appending the batch and appending it to the log, L shrinks 
the ISR, removing R1 and R2. This is because ISR is defined as at most 
replica.lag.time.max.ms milliseconds behind the log append time of the leader 
end offset. The leader end offset is a message that has not been replicated yet.

The storage fault example in step 3 could easily be another kind of fault. Say 
for example, L is partitioned from R1 and R2 but not from ZooKeeper or the 
producer.

The producer never receives acknowledgement of the ProduceRequest because the 
min.insync.replicas constraint was never satisfied. So in terms of data 
consistency, everything is working fine.

The problem is recovering from this situation. If the fault on L is not a 
temporary blip, then L needs to be replaced. But since L shrunk the ISR, the 
only way that leadership can move to either R1 or R2 is to set 
unclean.leader.election.enable=true.

This works but it is a potentially unsafe way to recover and move leadership. 
It would be better to have other options.

*Recovery could be automatic in this scenario.*

If you think about it, from the perspective of the producer, the write was not 
acknowledged, and therefore, L, R1 and R2 are actually in sync. So it should 
actually be totally safe for leadership to transition to either R1 or R2.

It seems that the producer and the leader don't have fully compatible 
definitions for what it means for the replica set to be in-sync. If the leader 
L used different rules for defining ISR, it could allow self-healing in this or 
similar scenarios, since the ISR would not shrink.

*Potential alternatives*

Today the leader tracks a couple of different offsets. One is the leader end 
offset, which is the highest offset appended to the log for the topic-partition 
TP.

Another is the high watermark, which is the highest offset appended in the ISR.

We can't really make a simple change like changing the definition of ISR to use 
the high watermark of TP, because the high watermark depends on the state of 
the ISR. So this change would be pretty broken:
{noformat}
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 2d28aa12b69b..e141ec090a28 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -901,7 +901,7 @@ class Partition(val topicPartition: TopicPartition,
      **/
     val candidateReplicaIds = inSyncReplicaIds - localBrokerId
     val currentTimeMs = time.milliseconds()
-    val leaderEndOffset = localLogOrException.logEndOffset
+    val leaderEndOffset = localLogOrException.highWatermark
     candidateReplicaIds.filter(replicaId => isFollowerOutOfSync(replicaId, 
leaderEndOffset, currentTimeMs, maxLagMs))
   } {noformat}
It seems like the a potential solution would be to track a third offset, which 
would be the largest offset acknowledged to a producer in response to a 
ProduceRequest. Say largestAckedOffset.

If the leader L defined the ISR as no more than replica.lag.time.max.ms behind 
the log append time of largestAckedOffset, then the failure mode I described 
above would not lead to the ISR shrinking. And from the perspective of the 
producer, no data has been lost which has been acknowledged by L.

I think this change is also compatible with more lossy topic configurations, 
where min.insync.replicas=1, RF=2, unclean.leader.election.enable=true and 
acks=1. In that case, the largestAckedOffset would always be equal to the 
leaderEndOffset, so effectively nothing has changed.

 


> Replication protocol deficiencies with workloads requiring high durability 
> guarantees
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10853
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10853
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.4.0
>            Reporter: Kyle Ambroff-Kao
>            Priority: Major
>
> *tl;dr: The definition of ISR and the consistency model from the perspective 
> of the producer seem a bit out of sync*
> We have many systems in production that trade off availability in order to 
> provide stronger consistency guarantees. Most of these configurations look 
> like this:
> Topic configuration:
>  * replication factor 3
>  * min.insync.replicas=2
>  * unclean.leader.election.enable=false
> Producer configuration:
>  * acks=all
> Broker configuration:
>  * replica.lag.time.max.ms=10000
> So the goal here is to reduce the chance of ever dropping a message that the 
> leader has acknowledged to the producer.
> This works great, except that we've found some situations in production where 
> we are forced to enable unclean leader election to recover, which we never 
> want to do. These situations all seem totally avoidable with some small 
> tweaks to the replication protocol.
> *A scenario we've seen many times*
> The following sequence of events are in time order: A replica set for a 
> topic-partition TP with leader L and replicas R1 and R2. All three replicas 
> are in ISR.
>  # Producer sends ProduceRequest R with acks=all that contains a message 
> batch to the leader L.
>  # L receives R and appends the batch it contains to the active segment of TP 
> but does not ack to the producer yet because the request was acks=all
>  # A storage fault occurs on L which makes all IOPS take a long time but 
> doesn't cause a hard failure.
>  # R1 and R2 send follower fetch requests to L which are infinitely delayed 
> due to the storage fault on L.
>  # 10 seconds after appending the batch and appending it to the log, L 
> shrinks the ISR, removing R1 and R2. This is because ISR is defined as at 
> most replica.lag.time.max.ms milliseconds behind the log append time of the 
> leader end offset. The leader end offset is a message that has not been 
> replicated yet.
> The storage fault example in step 3 could easily be another kind of fault. 
> Say for example, L is partitioned from R1 and R2 but not from ZooKeeper or 
> the producer.
> The producer never receives acknowledgement of the ProduceRequest because the 
> min.insync.replicas constraint was never satisfied. So in terms of data 
> consistency, everything is working fine.
> The problem is recovering from this situation. If the fault on L is not a 
> temporary blip, then L needs to be replaced. But since L shrunk the ISR, the 
> only way that leadership can move to either R1 or R2 is to set 
> unclean.leader.election.enable=true.
> This works but it is a potentially unsafe way to recover and move leadership. 
> It would be better to have other options.
> *Recovery could be automatic in this scenario.*
> If you think about it, from the perspective of the producer, the write was 
> not acknowledged, and therefore, L, R1 and R2 are actually in-sync. So it 
> should actually be totally safe for leadership to transition to either R1 or 
> R2.
> It seems that the producer and the leader don't have fully compatible 
> definitions for what it means for the replica set to be in-sync. If the 
> leader L used different rules for defining ISR, it could allow self-healing 
> in this or similar scenarios, since the ISR would not shrink.
> *Potential alternatives*
> Today the leader tracks a couple of different offsets. One is the leader end 
> offset, which is the highest offset appended to the log for the 
> topic-partition TP.
> Another is the high watermark, which is the highest offset appended in the 
> ISR.
> We can't really make a simple change like changing the definition of ISR to 
> use the high watermark of TP, because the high watermark depends on the state 
> of the ISR. So this change would be pretty broken:
> {noformat}
> diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
> b/core/src/main/scala/kafka/cluster/Partition.scala
> index 2d28aa12b69b..e141ec090a28 100755
> --- a/core/src/main/scala/kafka/cluster/Partition.scala
> +++ b/core/src/main/scala/kafka/cluster/Partition.scala
> @@ -901,7 +901,7 @@ class Partition(val topicPartition: TopicPartition,
>       **/
>      val candidateReplicaIds = inSyncReplicaIds - localBrokerId
>      val currentTimeMs = time.milliseconds()
> -    val leaderEndOffset = localLogOrException.logEndOffset
> +    val leaderEndOffset = localLogOrException.highWatermark
>      candidateReplicaIds.filter(replicaId => isFollowerOutOfSync(replicaId, 
> leaderEndOffset, currentTimeMs, maxLagMs))
>    } {noformat}
> It seems like the a potential solution would be to track a third offset, 
> which would be the largest offset acknowledged to a producer in response to a 
> ProduceRequest. Say largestAckedOffset.
> If the leader L defined the ISR as no more than replica.lag.time.max.ms 
> behind the log append time of largestAckedOffset, then the failure mode I 
> described above would not lead to the ISR shrinking. And from the perspective 
> of the producer, no data has been lost which has been acknowledged by L.
> I think this change is also compatible with more lossy topic configurations, 
> where min.insync.replicas=1, RF=2, unclean.leader.election.enable=true and 
> acks=1. In that case, the largestAckedOffset would always be equal to the 
> leaderEndOffset, so effectively nothing has changed.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to