[jira] [Commented] (KAFKA-10853) Replication protocol deficiencies with workloads requiring high durability guarantees

2021-03-26 Thread Kyle Ambroff-Kao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17309621#comment-17309621
 ] 

Kyle Ambroff-Kao commented on KAFKA-10853:
--

We spent a lot of time discussing this internally and explored this idea pretty 
thoroughly. I think the end result of a change in the protocol would be 
beneficial, but it's a very complex change.

One of my teammates pointed out that in the short term we could probably just 
get away with not reporting to the controller when the ISR shrinks below 
min.insync.replicas brokers. Since for our use case, no ProduceRequest will 
succeed in that state anyway, the controller can still safely move leadership 
to a broker that would have been kicked out of the ISR before. Any client which 
produces with acks=1 is already accepting such risk of data loss.

We have a document exploring these details that I'll try to share when I get 
permission.

> Replication protocol deficiencies with workloads requiring high durability 
> guarantees
> -
>
> Key: KAFKA-10853
> URL: https://issues.apache.org/jira/browse/KAFKA-10853
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Kyle Ambroff-Kao
>Priority: Major
>
> *tl;dr: The definition of ISR and the consistency model from the perspective 
> of the producer seem a bit out of sync*
> We have many systems in production that trade off availability in order to 
> provide stronger consistency guarantees. Most of these configurations look 
> like this:
> Topic configuration:
>  * replication factor 3
>  * min.insync.replicas=2
>  * unclean.leader.election.enable=false
> Producer configuration:
>  * acks=all
> Broker configuration:
>  * replica.lag.time.max.ms=1
> So the goal here is to reduce the chance of ever dropping a message that the 
> leader has acknowledged to the producer.
> This works great, except that we've found some situations in production where 
> we are forced to enable unclean leader election to recover, which we never 
> want to do. These situations all seem totally avoidable with some small 
> tweaks to the replication protocol.
> *A scenario we've seen many times*
> The following sequence of events are in time order: A replica set for a 
> topic-partition TP with leader L and replicas R1 and R2. All three replicas 
> are in ISR.
>  # Producer sends ProduceRequest R with acks=all that contains a message 
> batch to the leader L.
>  # L receives R and appends the batch it contains to the active segment of TP 
> but does not ack to the producer yet because the request was acks=all
>  # A storage fault occurs on L which makes all IOPS take a long time but 
> doesn't cause a hard failure.
>  # R1 and R2 send follower fetch requests to L which are infinitely delayed 
> due to the storage fault on L.
>  # 10 seconds after appending the batch and appending it to the log, L 
> shrinks the ISR, removing R1 and R2. This is because ISR is defined as at 
> most replica.lag.time.max.ms milliseconds behind the log append time of the 
> leader end offset. The leader end offset is a message that has not been 
> replicated yet.
> The storage fault example in step 3 could easily be another kind of fault. 
> Say for example, L is partitioned from R1 and R2 but not from ZooKeeper or 
> the producer.
> The producer never receives acknowledgement of the ProduceRequest because the 
> min.insync.replicas constraint was never satisfied. So in terms of data 
> consistency, everything is working fine.
> The problem is recovering from this situation. If the fault on L is not a 
> temporary blip, then L needs to be replaced. But since L shrunk the ISR, the 
> only way that leadership can move to either R1 or R2 is to set 
> unclean.leader.election.enable=true.
> This works but it is a potentially unsafe way to recover and move leadership. 
> It would be better to have other options.
> *Recovery could be automatic in this scenario.*
> If you think about it, from the perspective of the producer, the write was 
> not acknowledged, and therefore, L, R1 and R2 are actually in-sync. So it 
> should actually be totally safe for leadership to transition to either R1 or 
> R2.
> It seems that the producer and the leader don't have fully compatible 
> definitions for what it means for the replica set to be in-sync. If the 
> leader L used different rules for defining ISR, it could allow self-healing 
> in this or similar scenarios, since the ISR would not shrink.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10853) Replication protocol deficiencies with workloads requiring high durability guarantees

2021-01-28 Thread Kyle Ambroff-Kao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17274114#comment-17274114
 ] 

Kyle Ambroff-Kao commented on KAFKA-10853:
--

I've been discussing this with my team at LinkedIn and so far we don't see a 
better alternative than the largestAckedOffset + require acks=all in a topic 
level config. Your suggestions have been really helpful but I think our 
original approach still sounds better.

We might just prototype and test this out in our Kafka fork.

> Replication protocol deficiencies with workloads requiring high durability 
> guarantees
> -
>
> Key: KAFKA-10853
> URL: https://issues.apache.org/jira/browse/KAFKA-10853
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Kyle Ambroff-Kao
>Priority: Major
>
> *tl;dr: The definition of ISR and the consistency model from the perspective 
> of the producer seem a bit out of sync*
> We have many systems in production that trade off availability in order to 
> provide stronger consistency guarantees. Most of these configurations look 
> like this:
> Topic configuration:
>  * replication factor 3
>  * min.insync.replicas=2
>  * unclean.leader.election.enable=false
> Producer configuration:
>  * acks=all
> Broker configuration:
>  * replica.lag.time.max.ms=1
> So the goal here is to reduce the chance of ever dropping a message that the 
> leader has acknowledged to the producer.
> This works great, except that we've found some situations in production where 
> we are forced to enable unclean leader election to recover, which we never 
> want to do. These situations all seem totally avoidable with some small 
> tweaks to the replication protocol.
> *A scenario we've seen many times*
> The following sequence of events are in time order: A replica set for a 
> topic-partition TP with leader L and replicas R1 and R2. All three replicas 
> are in ISR.
>  # Producer sends ProduceRequest R with acks=all that contains a message 
> batch to the leader L.
>  # L receives R and appends the batch it contains to the active segment of TP 
> but does not ack to the producer yet because the request was acks=all
>  # A storage fault occurs on L which makes all IOPS take a long time but 
> doesn't cause a hard failure.
>  # R1 and R2 send follower fetch requests to L which are infinitely delayed 
> due to the storage fault on L.
>  # 10 seconds after appending the batch and appending it to the log, L 
> shrinks the ISR, removing R1 and R2. This is because ISR is defined as at 
> most replica.lag.time.max.ms milliseconds behind the log append time of the 
> leader end offset. The leader end offset is a message that has not been 
> replicated yet.
> The storage fault example in step 3 could easily be another kind of fault. 
> Say for example, L is partitioned from R1 and R2 but not from ZooKeeper or 
> the producer.
> The producer never receives acknowledgement of the ProduceRequest because the 
> min.insync.replicas constraint was never satisfied. So in terms of data 
> consistency, everything is working fine.
> The problem is recovering from this situation. If the fault on L is not a 
> temporary blip, then L needs to be replaced. But since L shrunk the ISR, the 
> only way that leadership can move to either R1 or R2 is to set 
> unclean.leader.election.enable=true.
> This works but it is a potentially unsafe way to recover and move leadership. 
> It would be better to have other options.
> *Recovery could be automatic in this scenario.*
> If you think about it, from the perspective of the producer, the write was 
> not acknowledged, and therefore, L, R1 and R2 are actually in-sync. So it 
> should actually be totally safe for leadership to transition to either R1 or 
> R2.
> It seems that the producer and the leader don't have fully compatible 
> definitions for what it means for the replica set to be in-sync. If the 
> leader L used different rules for defining ISR, it could allow self-healing 
> in this or similar scenarios, since the ISR would not shrink.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10853) Replication protocol deficiencies with workloads requiring high durability guarantees

2020-12-16 Thread Kyle Ambroff-Kao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250598#comment-17250598
 ] 

Kyle Ambroff-Kao commented on KAFKA-10853:
--

Thanks for the reply Jun!
{quote}As for your proposal of using largestAckedOffset, the challenge is that 
we allow producers with different ack mode even on the same topic.
{quote}
Hrm, yeah that's a great point. I think for our internal use case the 
largestAckedOffset change would be OK since a producer that doesn't use acks: 
all would be a configuration bug. But it's true that one mis-configured client 
would ruin it for everyone.

I guess my proposal would have to either require some strict ordering of 
acknowledgements to producers, blocking acknowledgment of a acks=1 
producerequest until a acks=all request that came before it completes. That's 
obviously terrible.

Or I guess it could be done with a new topic configuration that put the topic 
into durable mode, enforcing acks=all reguardless of what the ProduceRequest 
says. That sounds less terrible.
{quote}It sounds like you already have a separate mechanism to detect the 
slowness of a broker based on request.max.local.time.ms. So, another approach 
could be communicating this info to the controller so that it can choose to 
move the leader off the slow broker. It seems that this could solve the problem 
if request.max.local.time.ms < replica.lag.time.max.ms.
{quote}
Hrm, that's an interesting proposal. I like this idea. We could change our code 
to, instead of halt if request.max.local.time.ms is exceeded, just stop 
accepting client traffic (except for follower fetch requests), and signal to 
the controller that leadership should be shifted elsewhere. That sounds really 
nice.

I'm nervous about the idea of changing out replica.lag.time.max.ms though. We 
have had that set to 10 seconds for many years now and I'm nervous about 
increasing it. 10 seconds already feels like it is too long.

And decreasing replica.lag.time.max.ms seems scary too because we don't want 
false positives. We've set it to 60 seconds through experience and trial and 
error, since we generally don't see GC pauses that long, but we do see IOPS 
pause for that long when we have a real storage issue.

The other problem is that I think handling failures this way only works for 
certain failure modes, like a really slow or failing disk. It doesn't handle 
network partitions at all, since we may not be able to reach the controller. Or 
the pause may be caused by something else that prevents any thread from being 
scheduled.

I need to think about this for a bit.

> Replication protocol deficiencies with workloads requiring high durability 
> guarantees
> -
>
> Key: KAFKA-10853
> URL: https://issues.apache.org/jira/browse/KAFKA-10853
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Kyle Ambroff-Kao
>Priority: Major
>
> *tl;dr: The definition of ISR and the consistency model from the perspective 
> of the producer seem a bit out of sync*
> We have many systems in production that trade off availability in order to 
> provide stronger consistency guarantees. Most of these configurations look 
> like this:
> Topic configuration:
>  * replication factor 3
>  * min.insync.replicas=2
>  * unclean.leader.election.enable=false
> Producer configuration:
>  * acks=all
> Broker configuration:
>  * replica.lag.time.max.ms=1
> So the goal here is to reduce the chance of ever dropping a message that the 
> leader has acknowledged to the producer.
> This works great, except that we've found some situations in production where 
> we are forced to enable unclean leader election to recover, which we never 
> want to do. These situations all seem totally avoidable with some small 
> tweaks to the replication protocol.
> *A scenario we've seen many times*
> The following sequence of events are in time order: A replica set for a 
> topic-partition TP with leader L and replicas R1 and R2. All three replicas 
> are in ISR.
>  # Producer sends ProduceRequest R with acks=all that contains a message 
> batch to the leader L.
>  # L receives R and appends the batch it contains to the active segment of TP 
> but does not ack to the producer yet because the request was acks=all
>  # A storage fault occurs on L which makes all IOPS take a long time but 
> doesn't cause a hard failure.
>  # R1 and R2 send follower fetch requests to L which are infinitely delayed 
> due to the storage fault on L.
>  # 10 seconds after appending the batch and appending it to the log, L 
> shrinks the ISR, removing R1 and R2. This is because ISR is defined as at 
> most replica.lag.time.max.ms milliseconds behind the log append time of the 
> leader end offset. The leader end offset is a message 

[jira] [Comment Edited] (KAFKA-10853) Replication protocol deficiencies with workloads requiring high durability guarantees

2020-12-14 Thread Kyle Ambroff-Kao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17249390#comment-17249390
 ] 

Kyle Ambroff-Kao edited comment on KAFKA-10853 at 12/14/20, 11:20 PM:
--

Thank you for the reply Guozhang!

In our deployments we have a few systems in place that make recovery from this 
situation automatic, without requiring intervention from an admin. The problem 
outlined in this ticket breaks this for topics with 
unclean.leader.election.enable=true and min.insync.replicas > 1.

The first mitigation we have is to just have brokers halt if they get stuck and 
cannot make progress. We have a config in our fork called 
request.max.local.time.ms which sets an upper bound on the amount of time a 
broker can spend actively handling a request. We set this to 60 seconds in 
production, so if some storage fault causes all produce requests to block 
indefinitely, after a minute the broker will halt and leadership will change to 
another broker.

[https://github.com/linkedin/kafka/commit/8926f588329]

In the case where performance of the leader is bad but it is still making 
progress, we use the slow broker detection code in 
[https://github.com/linkedin/cruise-control] to detect the poor performance and 
"demote" the broker, which involves changing the preferred leader of all 
topic-partitions hosted on that broker and performing a 
preferred-leader-election.

These automatons work for most of our deployments, but not for deployments 
where we use high durability configurations as outlined by this ticket.

I'm not really up on recent admin requests which could bypass 
unclean.leader.election, but in general I think a solution that prevented this 
situation in the first place would be more desirable. I don't want to have to 
build automation outside of Kafka which tries to determine the safety of such 
an operation when the controller could do it instead.


was (Author: ambroff):
In our deployments we have a few systems in place that make recovery from this 
situation automatic, without requiring intervention from an admin. The problem 
outlined in this ticket breaks this for topics with 
unclean.leader.election.enable=true and min.insync.replicas > 1.

The first mitigation we have is to just have brokers halt if they get stuck and 
cannot make progress. We have a config in our fork called 
request.max.local.time.ms which sets an upper bound on the amount of time a 
broker can spend actively handling a request. We set this to 60 seconds in 
production, so if some storage fault causes all produce requests to block 
indefinitely, after a minute the broker will halt and leadership will change to 
another broker.

[https://github.com/linkedin/kafka/commit/8926f588329]

In the case where performance of the leader is bad but it is still making 
progress, we use the slow broker detection code in 
[https://github.com/linkedin/cruise-control] to detect the poor performance and 
"demote" the broker, which involves changing the preferred leader of all 
topic-partitions hosted on that broker and performing a 
preferred-leader-election.

These automatons work for most of our deployments, but not for deployments 
where we use high durability configurations as outlined by this ticket.

I'm not really up on recent admin requests which could bypass 
unclean.leader.election, but in general I think a solution that prevented this 
situation in the first place would be more desirable. I don't want to have to 
build automation outside of Kafka which tries to determine the safety of such 
an operation when the controller could do it instead.

> Replication protocol deficiencies with workloads requiring high durability 
> guarantees
> -
>
> Key: KAFKA-10853
> URL: https://issues.apache.org/jira/browse/KAFKA-10853
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Kyle Ambroff-Kao
>Priority: Major
>
> *tl;dr: The definition of ISR and the consistency model from the perspective 
> of the producer seem a bit out of sync*
> We have many systems in production that trade off availability in order to 
> provide stronger consistency guarantees. Most of these configurations look 
> like this:
> Topic configuration:
>  * replication factor 3
>  * min.insync.replicas=2
>  * unclean.leader.election.enable=false
> Producer configuration:
>  * acks=all
> Broker configuration:
>  * replica.lag.time.max.ms=1
> So the goal here is to reduce the chance of ever dropping a message that the 
> leader has acknowledged to the producer.
> This works great, except that we've found some situations in production where 
> we are forced to enable unclean leader election to recover, which we never 
> want to do. These situations all seem totally 

[jira] [Commented] (KAFKA-10853) Replication protocol deficiencies with workloads requiring high durability guarantees

2020-12-14 Thread Kyle Ambroff-Kao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17249390#comment-17249390
 ] 

Kyle Ambroff-Kao commented on KAFKA-10853:
--

In our deployments we have a few systems in place that make recovery from this 
situation automatic, without requiring intervention from an admin. The problem 
outlined in this ticket breaks this for topics with 
unclean.leader.election.enable=true and min.insync.replicas > 1.

The first mitigation we have is to just have brokers halt if they get stuck and 
cannot make progress. We have a config in our fork called 
request.max.local.time.ms which sets an upper bound on the amount of time a 
broker can spend actively handling a request. We set this to 60 seconds in 
production, so if some storage fault causes all produce requests to block 
indefinitely, after a minute the broker will halt and leadership will change to 
another broker.

[https://github.com/linkedin/kafka/commit/8926f588329]

In the case where performance of the leader is bad but it is still making 
progress, we use the slow broker detection code in 
[https://github.com/linkedin/cruise-control] to detect the poor performance and 
"demote" the broker, which involves changing the preferred leader of all 
topic-partitions hosted on that broker and performing a 
preferred-leader-election.

These automatons work for most of our deployments, but not for deployments 
where we use high durability configurations as outlined by this ticket.

I'm not really up on recent admin requests which could bypass 
unclean.leader.election, but in general I think a solution that prevented this 
situation in the first place would be more desirable. I don't want to have to 
build automation outside of Kafka which tries to determine the safety of such 
an operation when the controller could do it instead.

> Replication protocol deficiencies with workloads requiring high durability 
> guarantees
> -
>
> Key: KAFKA-10853
> URL: https://issues.apache.org/jira/browse/KAFKA-10853
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Kyle Ambroff-Kao
>Priority: Major
>
> *tl;dr: The definition of ISR and the consistency model from the perspective 
> of the producer seem a bit out of sync*
> We have many systems in production that trade off availability in order to 
> provide stronger consistency guarantees. Most of these configurations look 
> like this:
> Topic configuration:
>  * replication factor 3
>  * min.insync.replicas=2
>  * unclean.leader.election.enable=false
> Producer configuration:
>  * acks=all
> Broker configuration:
>  * replica.lag.time.max.ms=1
> So the goal here is to reduce the chance of ever dropping a message that the 
> leader has acknowledged to the producer.
> This works great, except that we've found some situations in production where 
> we are forced to enable unclean leader election to recover, which we never 
> want to do. These situations all seem totally avoidable with some small 
> tweaks to the replication protocol.
> *A scenario we've seen many times*
> The following sequence of events are in time order: A replica set for a 
> topic-partition TP with leader L and replicas R1 and R2. All three replicas 
> are in ISR.
>  # Producer sends ProduceRequest R with acks=all that contains a message 
> batch to the leader L.
>  # L receives R and appends the batch it contains to the active segment of TP 
> but does not ack to the producer yet because the request was acks=all
>  # A storage fault occurs on L which makes all IOPS take a long time but 
> doesn't cause a hard failure.
>  # R1 and R2 send follower fetch requests to L which are infinitely delayed 
> due to the storage fault on L.
>  # 10 seconds after appending the batch and appending it to the log, L 
> shrinks the ISR, removing R1 and R2. This is because ISR is defined as at 
> most replica.lag.time.max.ms milliseconds behind the log append time of the 
> leader end offset. The leader end offset is a message that has not been 
> replicated yet.
> The storage fault example in step 3 could easily be another kind of fault. 
> Say for example, L is partitioned from R1 and R2 but not from ZooKeeper or 
> the producer.
> The producer never receives acknowledgement of the ProduceRequest because the 
> min.insync.replicas constraint was never satisfied. So in terms of data 
> consistency, everything is working fine.
> The problem is recovering from this situation. If the fault on L is not a 
> temporary blip, then L needs to be replaced. But since L shrunk the ISR, the 
> only way that leadership can move to either R1 or R2 is to set 
> unclean.leader.election.enable=true.
> This works but it is a potentially unsafe way to recover and move leadership. 
> It would be 

[jira] [Comment Edited] (KAFKA-10853) Replication protocol deficiencies with workloads requiring high durability guarantees

2020-12-14 Thread Kyle Ambroff-Kao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17249355#comment-17249355
 ] 

Kyle Ambroff-Kao edited comment on KAFKA-10853 at 12/14/20, 10:42 PM:
--

*Potential solution: Use largestAckedOffset instead of leaderEndOffset to 
determine ISR*

Today the leader tracks a couple of different offsets. One is the leader end 
offset, which is the highest offset appended to the log for the topic-partition 
TP.

Another is the high watermark, which is the highest offset appended in the ISR.

We can't really make a simple change like changing the definition of ISR to use 
the high watermark of TP, because the high watermark depends on the state of 
the ISR. So this change would be pretty broken:
{noformat}
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 2d28aa12b69b..e141ec090a28 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -901,7 +901,7 @@ class Partition(val topicPartition: TopicPartition,
  **/
 val candidateReplicaIds = inSyncReplicaIds - localBrokerId
 val currentTimeMs = time.milliseconds()
-val leaderEndOffset = localLogOrException.logEndOffset
+val leaderEndOffset = localLogOrException.highWatermark
 candidateReplicaIds.filter(replicaId => isFollowerOutOfSync(replicaId, 
leaderEndOffset, currentTimeMs, maxLagMs))
   } {noformat}
It seems like the a potential solution would be to track a third offset, which 
would be the largest offset acknowledged to a producer in response to a 
ProduceRequest. Say largestAckedOffset.

*If the leader L defined the ISR as no more than replica.lag.time.max.ms behind 
the log append time of largestAckedOffset, then the failure mode I described 
above would not lead to the ISR shrinking. And from the perspective of the 
producer, no data has been lost which has been acknowledged by L.*

I think this change is also compatible with more lossy topic configurations, 
where min.insync.replicas=1, RF=2, unclean.leader.election.enable=true and 
acks=1. In that case, the largestAckedOffset would always be equal to the 
leaderEndOffset, so effectively nothing has changed.


was (Author: ambroff):
*Potential solution: Use largestAckedOffset instead of leaderEndOffset to 
determine ISR*

Today the leader tracks a couple of different offsets. One is the leader end 
offset, which is the highest offset appended to the log for the topic-partition 
TP.

Another is the high watermark, which is the highest offset appended in the ISR.

We can't really make a simple change like changing the definition of ISR to use 
the high watermark of TP, because the high watermark depends on the state of 
the ISR. So this change would be pretty broken:
{noformat}
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 2d28aa12b69b..e141ec090a28 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -901,7 +901,7 @@ class Partition(val topicPartition: TopicPartition,
  **/
 val candidateReplicaIds = inSyncReplicaIds - localBrokerId
 val currentTimeMs = time.milliseconds()
-val leaderEndOffset = localLogOrException.logEndOffset
+val leaderEndOffset = localLogOrException.highWatermark
 candidateReplicaIds.filter(replicaId => isFollowerOutOfSync(replicaId, 
leaderEndOffset, currentTimeMs, maxLagMs))
   } {noformat}
It seems like the a potential solution would be to track a third offset, which 
would be the largest offset acknowledged to a producer in response to a 
ProduceRequest. Say largestAckedOffset.

If the leader L defined the ISR as no more than replica.lag.time.max.ms behind 
the log append time of largestAckedOffset, then the failure mode I described 
above would not lead to the ISR shrinking. And from the perspective of the 
producer, no data has been lost which has been acknowledged by L.

I think this change is also compatible with more lossy topic configurations, 
where min.insync.replicas=1, RF=2, unclean.leader.election.enable=true and 
acks=1. In that case, the largestAckedOffset would always be equal to the 
leaderEndOffset, so effectively nothing has changed.

> Replication protocol deficiencies with workloads requiring high durability 
> guarantees
> -
>
> Key: KAFKA-10853
> URL: https://issues.apache.org/jira/browse/KAFKA-10853
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Kyle Ambroff-Kao
>Priority: Major
>
> *tl;dr: The definition of ISR and the consistency model from the perspective 
> of the producer seem a bit out of sync*
> We 

[jira] [Comment Edited] (KAFKA-10853) Replication protocol deficiencies with workloads requiring high durability guarantees

2020-12-14 Thread Kyle Ambroff-Kao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17249355#comment-17249355
 ] 

Kyle Ambroff-Kao edited comment on KAFKA-10853 at 12/14/20, 10:28 PM:
--

*Potential solution: Use largestAckedOffset instead of leaderEndOffset to 
determine ISR*

Today the leader tracks a couple of different offsets. One is the leader end 
offset, which is the highest offset appended to the log for the topic-partition 
TP.

Another is the high watermark, which is the highest offset appended in the ISR.

We can't really make a simple change like changing the definition of ISR to use 
the high watermark of TP, because the high watermark depends on the state of 
the ISR. So this change would be pretty broken:
{noformat}
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 2d28aa12b69b..e141ec090a28 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -901,7 +901,7 @@ class Partition(val topicPartition: TopicPartition,
  **/
 val candidateReplicaIds = inSyncReplicaIds - localBrokerId
 val currentTimeMs = time.milliseconds()
-val leaderEndOffset = localLogOrException.logEndOffset
+val leaderEndOffset = localLogOrException.highWatermark
 candidateReplicaIds.filter(replicaId => isFollowerOutOfSync(replicaId, 
leaderEndOffset, currentTimeMs, maxLagMs))
   } {noformat}
It seems like the a potential solution would be to track a third offset, which 
would be the largest offset acknowledged to a producer in response to a 
ProduceRequest. Say largestAckedOffset.

If the leader L defined the ISR as no more than replica.lag.time.max.ms behind 
the log append time of largestAckedOffset, then the failure mode I described 
above would not lead to the ISR shrinking. And from the perspective of the 
producer, no data has been lost which has been acknowledged by L.

I think this change is also compatible with more lossy topic configurations, 
where min.insync.replicas=1, RF=2, unclean.leader.election.enable=true and 
acks=1. In that case, the largestAckedOffset would always be equal to the 
leaderEndOffset, so effectively nothing has changed.


was (Author: ambroff):
Today the leader tracks a couple of different offsets. One is the leader end 
offset, which is the highest offset appended to the log for the topic-partition 
TP.

Another is the high watermark, which is the highest offset appended in the ISR.

We can't really make a simple change like changing the definition of ISR to use 
the high watermark of TP, because the high watermark depends on the state of 
the ISR. So this change would be pretty broken:
{noformat}
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 2d28aa12b69b..e141ec090a28 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -901,7 +901,7 @@ class Partition(val topicPartition: TopicPartition,
  **/
 val candidateReplicaIds = inSyncReplicaIds - localBrokerId
 val currentTimeMs = time.milliseconds()
-val leaderEndOffset = localLogOrException.logEndOffset
+val leaderEndOffset = localLogOrException.highWatermark
 candidateReplicaIds.filter(replicaId => isFollowerOutOfSync(replicaId, 
leaderEndOffset, currentTimeMs, maxLagMs))
   } {noformat}
It seems like the a potential solution would be to track a third offset, which 
would be the largest offset acknowledged to a producer in response to a 
ProduceRequest. Say largestAckedOffset.

If the leader L defined the ISR as no more than replica.lag.time.max.ms behind 
the log append time of largestAckedOffset, then the failure mode I described 
above would not lead to the ISR shrinking. And from the perspective of the 
producer, no data has been lost which has been acknowledged by L.

I think this change is also compatible with more lossy topic configurations, 
where min.insync.replicas=1, RF=2, unclean.leader.election.enable=true and 
acks=1. In that case, the largestAckedOffset would always be equal to the 
leaderEndOffset, so effectively nothing has changed.

> Replication protocol deficiencies with workloads requiring high durability 
> guarantees
> -
>
> Key: KAFKA-10853
> URL: https://issues.apache.org/jira/browse/KAFKA-10853
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Kyle Ambroff-Kao
>Priority: Major
>
> *tl;dr: The definition of ISR and the consistency model from the perspective 
> of the producer seem a bit out of sync*
> We have many systems in production that trade off availability in order to 
> provide stronger 

[jira] [Updated] (KAFKA-10853) Replication protocol deficiencies with workloads requiring high durability guarantees

2020-12-14 Thread Kyle Ambroff-Kao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Ambroff-Kao updated KAFKA-10853:
-
Description: 
*tl;dr: The definition of ISR and the consistency model from the perspective of 
the producer seem a bit out of sync*

We have many systems in production that trade off availability in order to 
provide stronger consistency guarantees. Most of these configurations look like 
this:

Topic configuration:
 * replication factor 3
 * min.insync.replicas=2
 * unclean.leader.election.enable=false

Producer configuration:
 * acks=all

Broker configuration:
 * replica.lag.time.max.ms=1

So the goal here is to reduce the chance of ever dropping a message that the 
leader has acknowledged to the producer.

This works great, except that we've found some situations in production where 
we are forced to enable unclean leader election to recover, which we never want 
to do. These situations all seem totally avoidable with some small tweaks to 
the replication protocol.

*A scenario we've seen many times*

The following sequence of events are in time order: A replica set for a 
topic-partition TP with leader L and replicas R1 and R2. All three replicas are 
in ISR.
 # Producer sends ProduceRequest R with acks=all that contains a message batch 
to the leader L.
 # L receives R and appends the batch it contains to the active segment of TP 
but does not ack to the producer yet because the request was acks=all
 # A storage fault occurs on L which makes all IOPS take a long time but 
doesn't cause a hard failure.
 # R1 and R2 send follower fetch requests to L which are infinitely delayed due 
to the storage fault on L.
 # 10 seconds after appending the batch and appending it to the log, L shrinks 
the ISR, removing R1 and R2. This is because ISR is defined as at most 
replica.lag.time.max.ms milliseconds behind the log append time of the leader 
end offset. The leader end offset is a message that has not been replicated yet.

The storage fault example in step 3 could easily be another kind of fault. Say 
for example, L is partitioned from R1 and R2 but not from ZooKeeper or the 
producer.

The producer never receives acknowledgement of the ProduceRequest because the 
min.insync.replicas constraint was never satisfied. So in terms of data 
consistency, everything is working fine.

The problem is recovering from this situation. If the fault on L is not a 
temporary blip, then L needs to be replaced. But since L shrunk the ISR, the 
only way that leadership can move to either R1 or R2 is to set 
unclean.leader.election.enable=true.

This works but it is a potentially unsafe way to recover and move leadership. 
It would be better to have other options.

*Recovery could be automatic in this scenario.*

If you think about it, from the perspective of the producer, the write was not 
acknowledged, and therefore, L, R1 and R2 are actually in-sync. So it should 
actually be totally safe for leadership to transition to either R1 or R2.

It seems that the producer and the leader don't have fully compatible 
definitions for what it means for the replica set to be in-sync. If the leader 
L used different rules for defining ISR, it could allow self-healing in this or 
similar scenarios, since the ISR would not shrink.

 

  was:
*tl;dr: The definition of ISR and the consistency model from the perspective of 
the producer seem a bit out of sync*

We have many systems in production that trade off availability in order to 
provide stronger consistency guarantees. Most of these configurations look like 
this:

Topic configuration:
 * replication factor 3
 * min.insync.replicas=2
 * unclean.leader.election.enable=false

Producer configuration:
 * acks=all

Broker configuration:
 * replica.lag.time.max.ms=1

So the goal here is to reduce the chance of ever dropping a message that the 
leader has acknowledged to the producer.

This works great, except that we've found some situations in production where 
we are forced to enable unclean leader election to recover, which we never want 
to do. These situations all seem totally avoidable with some small tweaks to 
the replication protocol.

*A scenario we've seen many times*

The following sequence of events are in time order: A replica set for a 
topic-partition TP with leader L and replicas R1 and R2. All three replicas are 
in ISR.
 # Producer sends ProduceRequest R with acks=all that contains a message batch 
to the leader L.
 # L receives R and appends the batch it contains to the active segment of TP 
but does not ack to the producer yet because the request was acks=all
 # A storage fault occurs on L which makes all IOPS take a long time but 
doesn't cause a hard failure.
 # R1 and R2 send follower fetch requests to L which are infinitely delayed due 
to the storage fault on L.
 # 10 seconds after appending the batch and appending it to the log, L shrinks 
the ISR, 

[jira] [Commented] (KAFKA-10853) Replication protocol deficiencies with workloads requiring high durability guarantees

2020-12-14 Thread Kyle Ambroff-Kao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17249355#comment-17249355
 ] 

Kyle Ambroff-Kao commented on KAFKA-10853:
--

Today the leader tracks a couple of different offsets. One is the leader end 
offset, which is the highest offset appended to the log for the topic-partition 
TP.

Another is the high watermark, which is the highest offset appended in the ISR.

We can't really make a simple change like changing the definition of ISR to use 
the high watermark of TP, because the high watermark depends on the state of 
the ISR. So this change would be pretty broken:
{noformat}
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 2d28aa12b69b..e141ec090a28 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -901,7 +901,7 @@ class Partition(val topicPartition: TopicPartition,
  **/
 val candidateReplicaIds = inSyncReplicaIds - localBrokerId
 val currentTimeMs = time.milliseconds()
-val leaderEndOffset = localLogOrException.logEndOffset
+val leaderEndOffset = localLogOrException.highWatermark
 candidateReplicaIds.filter(replicaId => isFollowerOutOfSync(replicaId, 
leaderEndOffset, currentTimeMs, maxLagMs))
   } {noformat}
It seems like the a potential solution would be to track a third offset, which 
would be the largest offset acknowledged to a producer in response to a 
ProduceRequest. Say largestAckedOffset.

If the leader L defined the ISR as no more than replica.lag.time.max.ms behind 
the log append time of largestAckedOffset, then the failure mode I described 
above would not lead to the ISR shrinking. And from the perspective of the 
producer, no data has been lost which has been acknowledged by L.

I think this change is also compatible with more lossy topic configurations, 
where min.insync.replicas=1, RF=2, unclean.leader.election.enable=true and 
acks=1. In that case, the largestAckedOffset would always be equal to the 
leaderEndOffset, so effectively nothing has changed.

> Replication protocol deficiencies with workloads requiring high durability 
> guarantees
> -
>
> Key: KAFKA-10853
> URL: https://issues.apache.org/jira/browse/KAFKA-10853
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Kyle Ambroff-Kao
>Priority: Major
>
> *tl;dr: The definition of ISR and the consistency model from the perspective 
> of the producer seem a bit out of sync*
> We have many systems in production that trade off availability in order to 
> provide stronger consistency guarantees. Most of these configurations look 
> like this:
> Topic configuration:
>  * replication factor 3
>  * min.insync.replicas=2
>  * unclean.leader.election.enable=false
> Producer configuration:
>  * acks=all
> Broker configuration:
>  * replica.lag.time.max.ms=1
> So the goal here is to reduce the chance of ever dropping a message that the 
> leader has acknowledged to the producer.
> This works great, except that we've found some situations in production where 
> we are forced to enable unclean leader election to recover, which we never 
> want to do. These situations all seem totally avoidable with some small 
> tweaks to the replication protocol.
> *A scenario we've seen many times*
> The following sequence of events are in time order: A replica set for a 
> topic-partition TP with leader L and replicas R1 and R2. All three replicas 
> are in ISR.
>  # Producer sends ProduceRequest R with acks=all that contains a message 
> batch to the leader L.
>  # L receives R and appends the batch it contains to the active segment of TP 
> but does not ack to the producer yet because the request was acks=all
>  # A storage fault occurs on L which makes all IOPS take a long time but 
> doesn't cause a hard failure.
>  # R1 and R2 send follower fetch requests to L which are infinitely delayed 
> due to the storage fault on L.
>  # 10 seconds after appending the batch and appending it to the log, L 
> shrinks the ISR, removing R1 and R2. This is because ISR is defined as at 
> most replica.lag.time.max.ms milliseconds behind the log append time of the 
> leader end offset. The leader end offset is a message that has not been 
> replicated yet.
> The storage fault example in step 3 could easily be another kind of fault. 
> Say for example, L is partitioned from R1 and R2 but not from ZooKeeper or 
> the producer.
> The producer never receives acknowledgement of the ProduceRequest because the 
> min.insync.replicas constraint was never satisfied. So in terms of data 
> consistency, everything is working fine.
> The problem is recovering from this situation. If the fault on L is not 

[jira] [Updated] (KAFKA-10853) Replication protocol deficiencies with workloads requiring high durability guarantees

2020-12-14 Thread Kyle Ambroff-Kao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Ambroff-Kao updated KAFKA-10853:
-
Description: 
*tl;dr: The definition of ISR and the consistency model from the perspective of 
the producer seem a bit out of sync*

We have many systems in production that trade off availability in order to 
provide stronger consistency guarantees. Most of these configurations look like 
this:

Topic configuration:
 * replication factor 3
 * min.insync.replicas=2
 * unclean.leader.election.enable=false

Producer configuration:
 * acks=all

Broker configuration:
 * replica.lag.time.max.ms=1

So the goal here is to reduce the chance of ever dropping a message that the 
leader has acknowledged to the producer.

This works great, except that we've found some situations in production where 
we are forced to enable unclean leader election to recover, which we never want 
to do. These situations all seem totally avoidable with some small tweaks to 
the replication protocol.

*A scenario we've seen many times*

The following sequence of events are in time order: A replica set for a 
topic-partition TP with leader L and replicas R1 and R2. All three replicas are 
in ISR.
 # Producer sends ProduceRequest R with acks=all that contains a message batch 
to the leader L.
 # L receives R and appends the batch it contains to the active segment of TP 
but does not ack to the producer yet because the request was acks=all
 # A storage fault occurs on L which makes all IOPS take a long time but 
doesn't cause a hard failure.
 # R1 and R2 send follower fetch requests to L which are infinitely delayed due 
to the storage fault on L.
 # 10 seconds after appending the batch and appending it to the log, L shrinks 
the ISR, removing R1 and R2. This is because ISR is defined as at most 
replica.lag.time.max.ms milliseconds behind the log append time of the leader 
end offset. The leader end offset is a message that has not been replicated yet.

The storage fault example in step 3 could easily be another kind of fault. Say 
for example, L is partitioned from R1 and R2 but not from ZooKeeper or the 
producer.

The producer never receives acknowledgement of the ProduceRequest because the 
min.insync.replicas constraint was never satisfied. So in terms of data 
consistency, everything is working fine.

The problem is recovering from this situation. If the fault on L is not a 
temporary blip, then L needs to be replaced. But since L shrunk the ISR, the 
only way that leadership can move to either R1 or R2 is to set 
unclean.leader.election.enable=true.

This works but it is a potentially unsafe way to recover and move leadership. 
It would be better to have other options.

*Recovery could be automatic in this scenario.*

If you think about it, from the perspective of the producer, the write was not 
acknowledged, and therefore, L, R1 and R2 are actually in-sync. So it should 
actually be totally safe for leadership to transition to either R1 or R2.

It seems that the producer and the leader don't have fully compatible 
definitions for what it means for the replica set to be in-sync. If the leader 
L used different rules for defining ISR, it could allow self-healing in this or 
similar scenarios, since the ISR would not shrink.

*Potential alternatives*

Today the leader tracks a couple of different offsets. One is the leader end 
offset, which is the highest offset appended to the log for the topic-partition 
TP.

Another is the high watermark, which is the highest offset appended in the ISR.

We can't really make a simple change like changing the definition of ISR to use 
the high watermark of TP, because the high watermark depends on the state of 
the ISR. So this change would be pretty broken:
{noformat}
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 2d28aa12b69b..e141ec090a28 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -901,7 +901,7 @@ class Partition(val topicPartition: TopicPartition,
  **/
 val candidateReplicaIds = inSyncReplicaIds - localBrokerId
 val currentTimeMs = time.milliseconds()
-val leaderEndOffset = localLogOrException.logEndOffset
+val leaderEndOffset = localLogOrException.highWatermark
 candidateReplicaIds.filter(replicaId => isFollowerOutOfSync(replicaId, 
leaderEndOffset, currentTimeMs, maxLagMs))
   } {noformat}
It seems like the a potential solution would be to track a third offset, which 
would be the largest offset acknowledged to a producer in response to a 
ProduceRequest. Say largestAckedOffset.

If the leader L defined the ISR as no more than replica.lag.time.max.ms behind 
the log append time of largestAckedOffset, then the failure mode I described 
above would not lead to the ISR shrinking. And from the perspective of 

[jira] [Updated] (KAFKA-10853) Replication protocol deficiencies with workloads requiring high durability guarantees

2020-12-14 Thread Kyle Ambroff-Kao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Ambroff-Kao updated KAFKA-10853:
-
Description: 
*tl;dr: The definition of ISR and the consistency model from the perspective of 
the producer seem a bit out of sync*

We have many systems in production that trade off availability in order to 
provide stronger consistency guarantees. Most of these configurations look like 
this:

Topic configuration:
 * replication factor 3
 * min.insync.replicas=2
 * unclean.leader.election.enable=false

Producer configuration:
 * acks=all

Broker configuration:
 * replica.lag.time.max.ms=1

So the goal here is to reduce the chance of ever dropping a message that the 
leader has acknowledged to the producer.

This works great, except that we've found some situations in production where 
we are forced to enable unclean leader election to recover, which we never want 
to do. These situations all seem totally avoidable with some small tweaks to 
the replication protocol.

*A scenario we've seen many times*

The following sequence of events are in time order: A replica set for a 
topic-partition TP with leader L and replicas R1 and R2. All three replicas are 
in ISR.
 # Producer sends ProduceRequest R with acks=all that contains a message batch 
to the leader L.
 # L receives R and appends the batch it contains to the active segment of TP 
but does not ack to the producer yet because the request was acks=all
 # A storage fault occurs on L which makes all IOPS take a long time but 
doesn't cause a hard failure.
 # R1 and R2 send follower fetch requests to L which are infinitely delayed due 
to the storage fault on L.
 # 10 seconds after appending the batch and appending it to the log, L shrinks 
the ISR, removing R1 and R2. This is because ISR is defined as at most 
replica.lag.time.max.ms milliseconds behind the log append time of the leader 
end offset. The leader end offset is a message that has not been replicated yet.

The storage fault example in step 3 could easily be another kind of fault. Say 
for example, L is partitioned from R1 and R2 but not from ZooKeeper or the 
producer.

The producer never receives acknowledgement of the ProduceRequest because the 
min.insync.replicas constraint was never satisfied. So in terms of data 
consistency, everything is working fine.

The problem is recovering from this situation. If the fault on L is not a 
temporary blip, then L needs to be replaced. But since L shrunk the ISR, the 
only way that leadership can move to either R1 or R2 is to set 
unclean.leader.election.enable=true.

This works but it is a potentially unsafe way to recover and move leadership. 
It would be better to have other options.

*Recovery could be automatic in this scenario.*

If you think about it, from the perspective of the producer, the write was not 
acknowledged, and therefore, L, R1 and R2 are actually in sync. So it should 
actually be totally safe for leadership to transition to either R1 or R2.

It seems that the producer and the leader don't have fully compatible 
definitions for what it means for the replica set to be in-sync. If the leader 
L used different rules for defining ISR, it could allow self-healing in this or 
similar scenarios, since the ISR would not shrink.

*Potential alternatives*

Today the leader tracks a couple of different offsets. One is the leader end 
offset, which is the highest offset appended to the log for the topic-partition 
TP.

Another is the high watermark, which is the highest offset appended in the ISR.

We can't really make a simple change like changing the definition of ISR to use 
the high watermark of TP, because the high watermark depends on the state of 
the ISR. So this change would be pretty broken:
{noformat}
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 2d28aa12b69b..e141ec090a28 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -901,7 +901,7 @@ class Partition(val topicPartition: TopicPartition,
  **/
 val candidateReplicaIds = inSyncReplicaIds - localBrokerId
 val currentTimeMs = time.milliseconds()
-val leaderEndOffset = localLogOrException.logEndOffset
+val leaderEndOffset = localLogOrException.highWatermark
 candidateReplicaIds.filter(replicaId => isFollowerOutOfSync(replicaId, 
leaderEndOffset, currentTimeMs, maxLagMs))
   } {noformat}
It seems like the a potential solution would be to track a third offset, which 
would be the largest offset acknowledged to a producer in response to a 
ProduceRequest. Say largestAckedOffset.

If the leader L defined the ISR as no more than replica.lag.time.max.ms behind 
the log append time of largestAckedOffset, then the failure mode I described 
above would not lead to the ISR shrinking. And from the perspective of 

[jira] [Created] (KAFKA-10853) Replication protocol deficiencies with workloads requiring high durability guarantees

2020-12-14 Thread Kyle Ambroff-Kao (Jira)
Kyle Ambroff-Kao created KAFKA-10853:


 Summary: Replication protocol deficiencies with workloads 
requiring high durability guarantees
 Key: KAFKA-10853
 URL: https://issues.apache.org/jira/browse/KAFKA-10853
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.4.0
Reporter: Kyle Ambroff-Kao


*tl;dr: The definition of ISR and the consistency model from the perspective of 
the producer seem a bit out of sync*

We have many systems in production that trade off availability in order to 
provide stronger consistency guarantees. Most of these configurations look like 
this:

Topic configuration:
 * replication factor 3
 * min.insync.replicas=2
 * unclean.leader.election.enable=false

Producer configuration:
 * acks=all

Broker configuration:
 * replica.lag.time.max.ms=1

So the goal here is to reduce the chance of ever dropping a message that the 
leader has acknowledged to the producer.

This works great, except that we've found some situations in production where 
we are forced to enable unclean leader election to recover, which we never want 
to do. These situations all seem totally avoidable with some small tweaks to 
the replication protocol.

*A scenario we've seen many times*

The following sequence of events are in time order: A replica set for a 
topic-partition TP with leader L and replicas R1 and R2. All three replicas are 
in ISR.
 # Producer sends ProduceRequest R with acks=all that contains a message batch 
to the leader L.
 # L receives R and appends the batch it contains to the active segment of TP 
but does not ack to the producer yet because the request was acks=all
 # A storage fault occurs on L which makes all IOPS take a long time but 
doesn't cause a hard failure.
 # R1 and R2 send follower fetch requests to L which are delayed due to the 
storage fault on L.
 # 10 seconds after appending the batch and appending it to the log, L shrinks 
the ISR, removing R1 and R2. This is because ISR is defined as at most 
replica.lag.time.max.ms milliseconds behind the log append time of the leader 
end offset. The leader end offset is a message that has not been replicated yet.

The storage fault example in step 3 could easily be another kind of fault. Say 
for example, L is partitioned from R1 and R2 but not from ZooKeeper or the 
producer.

The producer never receives acknowledgement of the ProduceRequest because the 
min.insync.replicas constraint was never satisfied. So in terms of data 
consistency, everything is working fine.

The problem is recovering from this situation. If the fault on L is not a 
temporary blip, then L needs to be replaced. But since L shrunk the ISR, the 
only way that leadership can move to either R1 or R2 is to set 
unclean.leader.election.enable=true.

This works but it is a potentially unsafe way to recover and move leadership. 
It would be better to have other options.

*Recovery could be automatic in this scenario.*

If you think about it, from the perspective of the producer, the write was not 
acknowledged, and therefore, L, R1 and R2 are actually in sync. So it should 
actually be totally safe for leadership to transition to either R1 or R2.

It seems that the producer and the leader don't have fully compatible 
definitions for what it means for the replica set to be in-sync. If the leader 
L used different rules for defining ISR, it could allow self-healing in this or 
similar scenarios, since the ISR would not shrink.

*Potential alternatives*

Today the leader tracks a couple of different offsets. One is the leader end 
offset, which is the highest offset appended to the log for the topic-partition 
TP.

Another is the high watermark, which is the highest offset appended in the ISR.

We can't really make a simple change like changing the definition of ISR to use 
the high watermark of TP, because the high watermark depends on the state of 
the ISR. So this change would be pretty broken:
{noformat}
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 2d28aa12b69b..e141ec090a28 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -901,7 +901,7 @@ class Partition(val topicPartition: TopicPartition,
  **/
 val candidateReplicaIds = inSyncReplicaIds - localBrokerId
 val currentTimeMs = time.milliseconds()
-val leaderEndOffset = localLogOrException.logEndOffset
+val leaderEndOffset = localLogOrException.highWatermark
 candidateReplicaIds.filter(replicaId => isFollowerOutOfSync(replicaId, 
leaderEndOffset, currentTimeMs, maxLagMs))
   } {noformat}
It seems like the a potential solution would be to track a third offset, which 
would be the largest offset acknowledged to a producer in response to a 
ProduceRequest. Say largestAckedOffset.

If the leader L defined the ISR as no more than 

[jira] [Commented] (KAFKA-6468) Replication high watermark checkpoint file read for every LeaderAndIsrRequest

2020-05-05 Thread Kyle Ambroff-Kao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100047#comment-17100047
 ] 

Kyle Ambroff-Kao commented on KAFKA-6468:
-

Linking as a duplicate of KAFKA-8333, which has a patch that fixes this problem.

> Replication high watermark checkpoint file read for every LeaderAndIsrRequest
> -
>
> Key: KAFKA-6468
> URL: https://issues.apache.org/jira/browse/KAFKA-6468
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Assignee: Kyle Ambroff-Kao
>Priority: Major
>
> The high watermark for each partition in a given log directory is written to 
> disk every _replica.high.watermark.checkpoint.interval.ms_ milliseconds. This 
> checkpoint file is used to create replicas when joining the cluster.
> [https://github.com/apache/kafka/blob/b73c765d7e172de4742a3aa023d5a0a4b7387247/core/src/main/scala/kafka/cluster/Partition.scala#L180]
> Unfortunately this file is read every time 
> kafka.cluster.Partition#getOrCreateReplica is invoked. For most clusters this 
> isn't a big deal, but for a small cluster with lots of partitions all of the 
> reads of this file really add up.
> On my local test cluster of three brokers with around 40k partitions, the 
> initial LeaderAndIsrRequest refers to every partition in the cluster, and it 
> can take 20 to 30 minutes to create all of the replicas because the 
> _replication-offset-checkpoint_ is nearly 2MB.
> Changing this code so that we only read this file once on startup reduces the 
> time to create all replicas to around one minute.
> Credit to [~onurkaraman] for finding this one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6468) Replication high watermark checkpoint file read for every LeaderAndIsrRequest

2020-05-05 Thread Kyle Ambroff-Kao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Ambroff-Kao resolved KAFKA-6468.
-
Resolution: Fixed

> Replication high watermark checkpoint file read for every LeaderAndIsrRequest
> -
>
> Key: KAFKA-6468
> URL: https://issues.apache.org/jira/browse/KAFKA-6468
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Assignee: Kyle Ambroff-Kao
>Priority: Major
>
> The high watermark for each partition in a given log directory is written to 
> disk every _replica.high.watermark.checkpoint.interval.ms_ milliseconds. This 
> checkpoint file is used to create replicas when joining the cluster.
> [https://github.com/apache/kafka/blob/b73c765d7e172de4742a3aa023d5a0a4b7387247/core/src/main/scala/kafka/cluster/Partition.scala#L180]
> Unfortunately this file is read every time 
> kafka.cluster.Partition#getOrCreateReplica is invoked. For most clusters this 
> isn't a big deal, but for a small cluster with lots of partitions all of the 
> reads of this file really add up.
> On my local test cluster of three brokers with around 40k partitions, the 
> initial LeaderAndIsrRequest refers to every partition in the cluster, and it 
> can take 20 to 30 minutes to create all of the replicas because the 
> _replication-offset-checkpoint_ is nearly 2MB.
> Changing this code so that we only read this file once on startup reduces the 
> time to create all replicas to around one minute.
> Credit to [~onurkaraman] for finding this one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6569) Reflection in OffsetIndex and TimeIndex construction

2018-02-18 Thread Kyle Ambroff-Kao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16368701#comment-16368701
 ] 

Kyle Ambroff-Kao commented on KAFKA-6569:
-

The logger name doesn't change for anything written from kafka.log.OffsetIndex, 
since the companion object overrides loggerName. However, messages logged from 
methods in kafka.log.AbstractIndex[K, V] now have the logger name 
"kafka.log.AbstractIndex$", where before they would be the class name of object 
(kafka.log.OffsetIndex).

> Reflection in OffsetIndex and TimeIndex construction
> 
>
> Key: KAFKA-6569
> URL: https://issues.apache.org/jira/browse/KAFKA-6569
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Kyle Ambroff-Kao
>Assignee: Kyle Ambroff-Kao
>Priority: Major
> Attachments: after.png, before.png
>
>
> kafka.log.AbstractIndex uses the Logging mixin to lazily initialize loggers 
> for any concrete type that inherits from it. This works great, except that 
> the LazyLogging trait uses reflection to compute the logger name.
> When you have hundreds of thousands of log segments to load on startup the 
> extra cost adds up.
> I've attached flame graphs from broker startup on a broker that has about 
> 12TB of log segments to load, and a second flame graph after changing 
> AbstractIndex to statically initialize a logger.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6569) Reflection in OffsetIndex and TimeIndex construction

2018-02-18 Thread Kyle Ambroff-Kao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16368693#comment-16368693
 ] 

Kyle Ambroff-Kao commented on KAFKA-6569:
-

Submitted a PR for the patch we deployed internally. It's not perfect though 
since the logger name does change. This wasn't important for us, but I'm not 
sure whether this matters to you guys.

> Reflection in OffsetIndex and TimeIndex construction
> 
>
> Key: KAFKA-6569
> URL: https://issues.apache.org/jira/browse/KAFKA-6569
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Kyle Ambroff-Kao
>Assignee: Kyle Ambroff-Kao
>Priority: Major
> Attachments: after.png, before.png
>
>
> kafka.log.AbstractIndex uses the Logging mixin to lazily initialize loggers 
> for any concrete type that inherits from it. This works great, except that 
> the LazyLogging trait uses reflection to compute the logger name.
> When you have hundreds of thousands of log segments to load on startup the 
> extra cost adds up.
> I've attached flame graphs from broker startup on a broker that has about 
> 12TB of log segments to load, and a second flame graph after changing 
> AbstractIndex to statically initialize a logger.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6569) Reflection in OffsetIndex and TimeIndex construction

2018-02-16 Thread Kyle Ambroff-Kao (JIRA)
Kyle Ambroff-Kao created KAFKA-6569:
---

 Summary: Reflection in OffsetIndex and TimeIndex construction
 Key: KAFKA-6569
 URL: https://issues.apache.org/jira/browse/KAFKA-6569
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Kyle Ambroff-Kao
Assignee: Kyle Ambroff-Kao
 Attachments: after.png, before.png

kafka.log.AbstractIndex uses the Logging mixin to lazily initialize loggers for 
any concrete type that inherits from it. This works great, except that the 
LazyLogging trait uses reflection to compute the logger name.

When you have hundreds of thousands of log segments to load on startup the 
extra cost adds up.

I've attached flame graphs from broker startup on a broker that has about 12TB 
of log segments to load, and a second flame graph after changing AbstractIndex 
to statically initialize a logger.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6469) ISR change notification queue can prevent controller from making progress

2018-02-07 Thread Kyle Ambroff-Kao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355829#comment-16355829
 ] 

Kyle Ambroff-Kao commented on KAFKA-6469:
-

[~junrao] The number of children of isr_change_notification isn't the problem. 
It's the size of the children of isr_change_notification that is the problem. 
In theory with a large enough cluster (I would guess around 400 or 500 with a 
large number of partitions per broker) you'd run in to problems with the number 
of children.

The specific situation I'm referring to is the broker attempting to write a 
child of isr_change_notification which exceeds 1MB. I just submitted a PR that 
addresses this.

> ISR change notification queue can prevent controller from making progress
> -
>
> Key: KAFKA-6469
> URL: https://issues.apache.org/jira/browse/KAFKA-6469
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Assignee: Kyle Ambroff-Kao
>Priority: Major
>
> When the writes /isr_change_notification in ZooKeeper (which is effectively a 
> queue of ISR change events for the controller) happen at a rate high enough 
> that the node with a watch can't dequeue them, the trouble starts.
> The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
> controller when a new entry is written to /isr_change_notification, and the 
> zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
> znodes.
> We've failures in one of our test clusters as the partition count started to 
> climb north of 60k per broker. We had brokers writing child nodes under 
> /isr_change_notification that were larger than the jute.maxbuffer size in 
> ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's 
> session, effectively bricking the cluster.
> This can be partially mitigated by chunking ISR notifications to increase the 
> maximum number of partitions a broker can host.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6469) ISR change notification queue can prevent controller from making progress

2018-02-07 Thread Kyle Ambroff-Kao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16355828#comment-16355828
 ] 

Kyle Ambroff-Kao commented on KAFKA-6469:
-

Heh, yeah James, we don't typically run clusters like that in production. We 
try not to exceed 5k per node in practice. We have some test clusters which end 
up in this state when people experiment though. It doesn't work well, but 
having enough replica fetcher and IO threads definitely helps. It isn't 
straightforward to tune the broker to deal with this kind of load.

> ISR change notification queue can prevent controller from making progress
> -
>
> Key: KAFKA-6469
> URL: https://issues.apache.org/jira/browse/KAFKA-6469
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Assignee: Kyle Ambroff-Kao
>Priority: Major
>
> When the writes /isr_change_notification in ZooKeeper (which is effectively a 
> queue of ISR change events for the controller) happen at a rate high enough 
> that the node with a watch can't dequeue them, the trouble starts.
> The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
> controller when a new entry is written to /isr_change_notification, and the 
> zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
> znodes.
> We've failures in one of our test clusters as the partition count started to 
> climb north of 60k per broker. We had brokers writing child nodes under 
> /isr_change_notification that were larger than the jute.maxbuffer size in 
> ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's 
> session, effectively bricking the cluster.
> This can be partially mitigated by chunking ISR notifications to increase the 
> maximum number of partitions a broker can host.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6469) ISR change notification queue can prevent controller from making progress

2018-01-22 Thread Kyle Ambroff-Kao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Ambroff-Kao updated KAFKA-6469:

Description: 
When the writes /isr_change_notification in ZooKeeper (which is effectively a 
queue of ISR change events for the controller) happen at a rate high enough 
that the node with a watch can't dequeue them, the trouble starts.

The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
controller when a new entry is written to /isr_change_notification, and the 
zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
znodes.

We've failures in one of our test clusters as the partition count started to 
climb north of 60k per broker. We had brokers writing child nodes under 
/isr_change_notification that were larger than the jute.maxbuffer size in 
ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's session, 
effectively bricking the cluster.

This can be partially mitigated by chunking ISR notifications to increase the 
maximum number of partitions a broker can host.

 

  was:
When the writes /isr_change_notification in ZooKeeper (which is effectively a 
queue of ISR change events for the controller) happen at a rate high enough 
that the node with a watch can't dequeue them, the trouble starts.

The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
controller when a new entry is written to /isr_change_notification, and the 
zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
znodes.

We've seen this happen in one of our test clusters as the partition count 
started to climb north of 60k per broker. We had brokers writing child nodes 
under /isr_change_notification that were larger than the jute.maxbuffer size in 
ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's session, 
effectively bricking the cluster.

This can be partially mitigated by chunking ISR notifications to increase the 
maximum number of partitions a broker can host.

 


> ISR change notification queue can prevent controller from making progress
> -
>
> Key: KAFKA-6469
> URL: https://issues.apache.org/jira/browse/KAFKA-6469
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Assignee: Kyle Ambroff-Kao
>Priority: Major
>
> When the writes /isr_change_notification in ZooKeeper (which is effectively a 
> queue of ISR change events for the controller) happen at a rate high enough 
> that the node with a watch can't dequeue them, the trouble starts.
> The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
> controller when a new entry is written to /isr_change_notification, and the 
> zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
> znodes.
> We've failures in one of our test clusters as the partition count started to 
> climb north of 60k per broker. We had brokers writing child nodes under 
> /isr_change_notification that were larger than the jute.maxbuffer size in 
> ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's 
> session, effectively bricking the cluster.
> This can be partially mitigated by chunking ISR notifications to increase the 
> maximum number of partitions a broker can host.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6468) Replication high watermark checkpoint file read for every LeaderAndIsrRequest

2018-01-22 Thread Kyle Ambroff-Kao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Ambroff-Kao reassigned KAFKA-6468:
---

Assignee: Kyle Ambroff-Kao

> Replication high watermark checkpoint file read for every LeaderAndIsrRequest
> -
>
> Key: KAFKA-6468
> URL: https://issues.apache.org/jira/browse/KAFKA-6468
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Assignee: Kyle Ambroff-Kao
>Priority: Major
>
> The high watermark for each partition in a given log directory is written to 
> disk every _replica.high.watermark.checkpoint.interval.ms_ milliseconds. This 
> checkpoint file is used to create replicas when joining the cluster.
> [https://github.com/apache/kafka/blob/b73c765d7e172de4742a3aa023d5a0a4b7387247/core/src/main/scala/kafka/cluster/Partition.scala#L180]
> Unfortunately this file is read every time 
> kafka.cluster.Partition#getOrCreateReplica is invoked. For most clusters this 
> isn't a big deal, but for a small cluster with lots of partitions all of the 
> reads of this file really add up.
> On my local test cluster of three brokers with around 40k partitions, the 
> initial LeaderAndIsrRequest refers to every partition in the cluster, and it 
> can take 20 to 30 minutes to create all of the replicas because the 
> _replication-offset-checkpoint_ is nearly 2MB.
> Changing this code so that we only read this file once on startup reduces the 
> time to create all replicas to around one minute.
> Credit to [~onurkaraman] for finding this one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6469) ISR change notification queue can prevent controller from making progress

2018-01-22 Thread Kyle Ambroff-Kao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Ambroff-Kao reassigned KAFKA-6469:
---

Assignee: Kyle Ambroff-Kao

> ISR change notification queue can prevent controller from making progress
> -
>
> Key: KAFKA-6469
> URL: https://issues.apache.org/jira/browse/KAFKA-6469
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Assignee: Kyle Ambroff-Kao
>Priority: Major
>
> When the writes /isr_change_notification in ZooKeeper (which is effectively a 
> queue of ISR change events for the controller) happen at a rate high enough 
> that the node with a watch can't dequeue them, the trouble starts.
> The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
> controller when a new entry is written to /isr_change_notification, and the 
> zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
> znodes.
> We've seen this happen in one of our test clusters as the partition count 
> started to climb north of 60k per broker. We had brokers writing child nodes 
> under /isr_change_notification that were larger than the jute.maxbuffer size 
> in ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's 
> session, effectively bricking the cluster.
> This can be partially mitigated by chunking ISR notifications to increase the 
> maximum number of partitions a broker can host.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6469) ISR change notification queue can prevent controller from making progress

2018-01-22 Thread Kyle Ambroff-Kao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Ambroff-Kao updated KAFKA-6469:

Description: 
When the writes /isr_change_notification in ZooKeeper (which is effectively a 
queue of ISR change events for the controller) happen at a rate high enough 
that the node with a watch can't dequeue them, the trouble starts.

The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
controller when a new entry is written to /isr_change_notification, and the 
zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
znodes.

We've seen this happen in one of our test clusters as the partition count 
started to climb north of 60k per broker. We had brokers writing child nodes 
under /isr_change_notification that were larger than the jute.maxbuffer size in 
ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's session, 
effectively bricking the cluster.

This can be partially mitigated by chunking ISR notifications to increase the 
maximum number of partitions a broker can host.

 

  was:
When the writes /isr_change_notification in ZooKeeper (which is effectively a 
queue of ISR change events for the controller) happen at a rate high enough 
that the node with a watch can't dequeue them, the trouble starts.

 

The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
controller when a new entry is written to /isr_change_notification, and the 
zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
znodes.

We've seen this happen in one of our test clusters as the partition count 
started to climb north of 60k per broker. We had brokers writing child nodes 
under /isr_change_notification that were larger than the jute.maxbuffer size in 
ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's session, 
effectively bricking the cluster.

This can be partially mitigated by chunking ISR notifications to increase the 
maximum number of partitions a broker can host.

 


> ISR change notification queue can prevent controller from making progress
> -
>
> Key: KAFKA-6469
> URL: https://issues.apache.org/jira/browse/KAFKA-6469
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Priority: Major
>
> When the writes /isr_change_notification in ZooKeeper (which is effectively a 
> queue of ISR change events for the controller) happen at a rate high enough 
> that the node with a watch can't dequeue them, the trouble starts.
> The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
> controller when a new entry is written to /isr_change_notification, and the 
> zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
> znodes.
> We've seen this happen in one of our test clusters as the partition count 
> started to climb north of 60k per broker. We had brokers writing child nodes 
> under /isr_change_notification that were larger than the jute.maxbuffer size 
> in ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's 
> session, effectively bricking the cluster.
> This can be partially mitigated by chunking ISR notifications to increase the 
> maximum number of partitions a broker can host.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6469) ISR change notification queue can prevent controller from making progress

2018-01-22 Thread Kyle Ambroff-Kao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335368#comment-16335368
 ] 

Kyle Ambroff-Kao commented on KAFKA-6469:
-

It's worth noting that there are other possible failures here. The size of the 
GetChildrenResponse returned by ZooKeeper is can also be a problem. Reading 
through the code and running some tests to confirm shows that an empty 
GetChildrenResponse is 4 bytes on the wire, and every child node name minimum 4 
bytes as well. Since these znodes are length 21, that means every child znode 
will account for 25 bytes in the response.

A GetChildrenResponse with 42k child nodes of the same length will be just 
about 1.001MB, which is larger than the 1MB data frame that ZooKeeper uses. 
This causes the ZooKeeper server to drop the broker's session.

So if 42k ISR changes happen at once and are enqueued separately, and the 
controller pauses at just the right time, you'll end up with a queue that can 
no longer be drained.

In the long run this needs to be redesigned. In the short term we can try to 
make sure that these ISR notifications are batched appropriately to avoid 
breaking the cluster.

> ISR change notification queue can prevent controller from making progress
> -
>
> Key: KAFKA-6469
> URL: https://issues.apache.org/jira/browse/KAFKA-6469
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Priority: Major
>
> When the writes /isr_change_notification in ZooKeeper (which is effectively a 
> queue of ISR change events for the controller) happen at a rate high enough 
> that the node with a watch can't dequeue them, the trouble starts.
>  
> The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
> controller when a new entry is written to /isr_change_notification, and the 
> zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
> znodes.
> We've seen this happen in one of our test clusters as the partition count 
> started to climb north of 60k per broker. We had brokers writing child nodes 
> under /isr_change_notification that were larger than the jute.maxbuffer size 
> in ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's 
> session, effectively bricking the cluster.
> This can be partially mitigated by chunking ISR notifications to increase the 
> maximum number of partitions a broker can host.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6469) ISR change notification queue can prevent controller from making progress

2018-01-22 Thread Kyle Ambroff-Kao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Ambroff-Kao updated KAFKA-6469:

Description: 
When the writes /isr_change_notification in ZooKeeper (which is effectively a 
queue of ISR change events for the controller) happen at a rate high enough 
that the node with a watch can't dequeue them, the trouble starts.

 

The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
controller when a new entry is written to /isr_change_notification, and the 
zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
znodes.

We've seen this happen in one of our test clusters as the partition count 
started to climb north of 60k per broker. We had brokers writing child nodes 
under /isr_change_notification that were larger than the jute.maxbuffer size in 
ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's session, 
effectively bricking the cluster.

This can be partially mitigated by chunking ISR notifications to increase the 
maximum number of partitions a broker can host.

 

  was:
When the writes /isr_change_notification in ZooKeeper (which is effectively a 
queue of ISR change events for the controller) happen at a rate high enough 
that the node with a watch can't keep up dequeuing them, the trouble starts.

The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
controller when a new entry is written to /isr_change_notification, and the 
zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
znodes. The size of the GetChildrenResponse returned by ZooKeeper is the 
problem. Reading through the code and running some tests to confirm shows that 
an empty GetChildrenResponse is 4 bytes on the wire, and every child node name 
minimum 4 bytes as well. Since these znodes are length 21, that means every 
child znode will account for 25 bytes in the response.

A GetChildrenResponse with 42k child nodes of the same length will be just 
about 1.001MB, which is larger than the 1MB data frame that ZooKeeper uses. 
This causes the ZooKeeper server to drop the broker's session.

So if 42k ISR changes happen at once, and the controller pauses at just the 
right time, you'll end up with a queue that can no longer be drained.

We've seen this happen in one of our test clusters as the partition count 
started to climb north of 60k per broker. We had a hardware failure that lead 
to the cluster writing so many child nodes to /isr_change_notification that the 
controller could no longer list its children, effectively bricking the cluster.

This can be partially mitigated by chunking ISR notifications to increase the 
maximum number of partitions a broker can host.


> ISR change notification queue can prevent controller from making progress
> -
>
> Key: KAFKA-6469
> URL: https://issues.apache.org/jira/browse/KAFKA-6469
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Priority: Major
>
> When the writes /isr_change_notification in ZooKeeper (which is effectively a 
> queue of ISR change events for the controller) happen at a rate high enough 
> that the node with a watch can't dequeue them, the trouble starts.
>  
> The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
> controller when a new entry is written to /isr_change_notification, and the 
> zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
> znodes.
> We've seen this happen in one of our test clusters as the partition count 
> started to climb north of 60k per broker. We had brokers writing child nodes 
> under /isr_change_notification that were larger than the jute.maxbuffer size 
> in ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's 
> session, effectively bricking the cluster.
> This can be partially mitigated by chunking ISR notifications to increase the 
> maximum number of partitions a broker can host.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6469) ISR change notification queue can prevent controller from making progress

2018-01-22 Thread Kyle Ambroff-Kao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Ambroff-Kao updated KAFKA-6469:

Summary: ISR change notification queue can prevent controller from making 
progress  (was: ISR change notification queue has a maximum size)

> ISR change notification queue can prevent controller from making progress
> -
>
> Key: KAFKA-6469
> URL: https://issues.apache.org/jira/browse/KAFKA-6469
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Priority: Major
>
> When the writes /isr_change_notification in ZooKeeper (which is effectively a 
> queue of ISR change events for the controller) happen at a rate high enough 
> that the node with a watch can't keep up dequeuing them, the trouble starts.
> The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
> controller when a new entry is written to /isr_change_notification, and the 
> zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
> znodes. The size of the GetChildrenResponse returned by ZooKeeper is the 
> problem. Reading through the code and running some tests to confirm shows 
> that an empty GetChildrenResponse is 4 bytes on the wire, and every child 
> node name minimum 4 bytes as well. Since these znodes are length 21, that 
> means every child znode will account for 25 bytes in the response.
> A GetChildrenResponse with 42k child nodes of the same length will be just 
> about 1.001MB, which is larger than the 1MB data frame that ZooKeeper uses. 
> This causes the ZooKeeper server to drop the broker's session.
> So if 42k ISR changes happen at once, and the controller pauses at just the 
> right time, you'll end up with a queue that can no longer be drained.
> We've seen this happen in one of our test clusters as the partition count 
> started to climb north of 60k per broker. We had a hardware failure that lead 
> to the cluster writing so many child nodes to /isr_change_notification that 
> the controller could no longer list its children, effectively bricking the 
> cluster.
> This can be partially mitigated by chunking ISR notifications to increase the 
> maximum number of partitions a broker can host.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6469) ISR change notification queue has a maximum size

2018-01-22 Thread Kyle Ambroff-Kao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335351#comment-16335351
 ] 

Kyle Ambroff-Kao commented on KAFKA-6469:
-

I'm going to work on a patch for this.

> ISR change notification queue has a maximum size
> 
>
> Key: KAFKA-6469
> URL: https://issues.apache.org/jira/browse/KAFKA-6469
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Priority: Major
>
> When the writes /isr_change_notification in ZooKeeper (which is effectively a 
> queue of ISR change events for the controller) happen at a rate high enough 
> that the node with a watch can't keep up dequeuing them, the trouble starts.
> The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
> controller when a new entry is written to /isr_change_notification, and the 
> zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
> znodes. The size of the GetChildrenResponse returned by ZooKeeper is the 
> problem. Reading through the code and running some tests to confirm shows 
> that an empty GetChildrenResponse is 4 bytes on the wire, and every child 
> node name minimum 4 bytes as well. Since these znodes are length 21, that 
> means every child znode will account for 25 bytes in the response.
> A GetChildrenResponse with 42k child nodes of the same length will be just 
> about 1.001MB, which is larger than the 1MB data frame that ZooKeeper uses. 
> This causes the ZooKeeper server to drop the broker's session.
> So if 42k ISR changes happen at once, and the controller pauses at just the 
> right time, you'll end up with a queue that can no longer be drained.
> We've seen this happen in one of our test clusters as the partition count 
> started to climb north of 60k per broker. We had a hardware failure that lead 
> to the cluster writing so many child nodes to /isr_change_notification that 
> the controller could no longer list its children, effectively bricking the 
> cluster.
> This can be partially mitigated by chunking ISR notifications to increase the 
> maximum number of partitions a broker can host.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6468) Replication high watermark checkpoint file read for every LeaderAndIsrRequest

2018-01-22 Thread Kyle Ambroff-Kao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335350#comment-16335350
 ] 

Kyle Ambroff-Kao commented on KAFKA-6468:
-

I'll have a patch ready for this shortly.

> Replication high watermark checkpoint file read for every LeaderAndIsrRequest
> -
>
> Key: KAFKA-6468
> URL: https://issues.apache.org/jira/browse/KAFKA-6468
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Priority: Major
>
> The high watermark for each partition in a given log directory is written to 
> disk every _replica.high.watermark.checkpoint.interval.ms_ milliseconds. This 
> checkpoint file is used to create replicas when joining the cluster.
> [https://github.com/apache/kafka/blob/b73c765d7e172de4742a3aa023d5a0a4b7387247/core/src/main/scala/kafka/cluster/Partition.scala#L180]
> Unfortunately this file is read every time 
> kafka.cluster.Partition#getOrCreateReplica is invoked. For most clusters this 
> isn't a big deal, but for a small cluster with lots of partitions all of the 
> reads of this file really add up.
> On my local test cluster of three brokers with around 40k partitions, the 
> initial LeaderAndIsrRequest refers to every partition in the cluster, and it 
> can take 20 to 30 minutes to create all of the replicas because the 
> _replication-offset-checkpoint_ is nearly 2MB.
> Changing this code so that we only read this file once on startup reduces the 
> time to create all replicas to around one minute.
> Credit to [~onurkaraman] for finding this one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6469) ISR change notification queue has a maximum size

2018-01-22 Thread Kyle Ambroff-Kao (JIRA)
Kyle Ambroff-Kao created KAFKA-6469:
---

 Summary: ISR change notification queue has a maximum size
 Key: KAFKA-6469
 URL: https://issues.apache.org/jira/browse/KAFKA-6469
 Project: Kafka
  Issue Type: Bug
Reporter: Kyle Ambroff-Kao


When the writes /isr_change_notification in ZooKeeper (which is effectively a 
queue of ISR change events for the controller) happen at a rate high enough 
that the node with a watch can't keep up dequeuing them, the trouble starts.

The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
controller when a new entry is written to /isr_change_notification, and the 
zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
znodes. The size of the GetChildrenResponse returned by ZooKeeper is the 
problem. Reading through the code and running some tests to confirm shows that 
an empty GetChildrenResponse is 4 bytes on the wire, and every child node name 
minimum 4 bytes as well. Since these znodes are length 21, that means every 
child znode will account for 25 bytes in the response.

A GetChildrenResponse with 42k child nodes of the same length will be just 
about 1.001MB, which is larger than the 1MB data frame that ZooKeeper uses. 
This causes the ZooKeeper server to drop the broker's session.

So if 42k ISR changes happen at once, and the controller pauses at just the 
right time, you'll end up with a queue that can no longer be drained.

We've seen this happen in one of our test clusters as the partition count 
started to climb north of 60k per broker. We had a hardware failure that lead 
to the cluster writing so many child nodes to /isr_change_notification that the 
controller could no longer list its children, effectively bricking the cluster.

This can be partially mitigated by chunking ISR notifications to increase the 
maximum number of partitions a broker can host.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6468) Replication high watermark checkpoint file read for every LeaderAndIsrRequest

2018-01-22 Thread Kyle Ambroff-Kao (JIRA)
Kyle Ambroff-Kao created KAFKA-6468:
---

 Summary: Replication high watermark checkpoint file read for every 
LeaderAndIsrRequest
 Key: KAFKA-6468
 URL: https://issues.apache.org/jira/browse/KAFKA-6468
 Project: Kafka
  Issue Type: Bug
Reporter: Kyle Ambroff-Kao


The high watermark for each partition in a given log directory is written to 
disk every _replica.high.watermark.checkpoint.interval.ms_ milliseconds. This 
checkpoint file is used to create replicas when joining the cluster.

[https://github.com/apache/kafka/blob/b73c765d7e172de4742a3aa023d5a0a4b7387247/core/src/main/scala/kafka/cluster/Partition.scala#L180]

Unfortunately this file is read every time 
kafka.cluster.Partition#getOrCreateReplica is invoked. For most clusters this 
isn't a big deal, but for a small cluster with lots of partitions all of the 
reads of this file really add up.

On my local test cluster of three brokers with around 40k partitions, the 
initial LeaderAndIsrRequest refers to every partition in the cluster, and it 
can take 20 to 30 minutes to create all of the replicas because the 
_replication-offset-checkpoint_ is nearly 2MB.

Changing this code so that we only read this file once on startup reduces the 
time to create all replicas to around one minute.

Credit to [~onurkaraman] for finding this one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)