[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14209866#comment-14209866 ] saurabh agarwal commented on KAFKA-1555: Thanks Gwen for implementation of this Jira. This is useful feature for us. I have been focusing on other components of the project. I will soon get back to the Kafka component and will start using this feature. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555-DOCS.1.patch, > KAFKA-1555-DOCS.2.patch, KAFKA-1555-DOCS.3.patch, KAFKA-1555-DOCS.4.patch, > KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, > KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch, > KAFKA-1555.5.patch, KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14209151#comment-14209151 ] Jun Rao commented on KAFKA-1555: Thanks for the doc patch. Committed to svn after fixing a few typos. Let me know if you see any further issue. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555-DOCS.1.patch, > KAFKA-1555-DOCS.2.patch, KAFKA-1555-DOCS.3.patch, KAFKA-1555-DOCS.4.patch, > KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, > KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch, > KAFKA-1555.5.patch, KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14195029#comment-14195029 ] Joel Koshy commented on KAFKA-1555: --- Sounds good. In that case, can you modify it a bit? The only remaining confusion is that earlier on in that section we start by writing about acknowledgement by all replicas, but then directly (without further comment) assume it is actually acknowledgement by the current in-sync replicas. How about the following: Instead of _A message that has been acknowledged by all in-sync replicas..._ we can write _A message that has been acknowledged by all replicas..._. And then say _Note that "acknowledgement by all replicas" does not guarantee that the full set of assigned replicas have received the message. By default, acknowledgement happens as soon as all the current in-sync replicas have received the message. For example, if a topic is configured with only two replicas and one fails (i.e., only one in sync replica remains), then writes that specify required.acks=-1 will succeed. However, these writes could be lost if the remaining replica also fails. Although this ensures maximum availability ..._ (from earlier comment) As for the design itself: I just thought that the broker-side setting taking effect only with a client-setting is a bit odd especially if it does not hurt do so with the other ack settings. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555-DOCS.1.patch, > KAFKA-1555-DOCS.2.patch, KAFKA-1555-DOCS.3.patch, KAFKA-1555.0.patch, > KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, > KAFKA-1555.4.patch, KAFKA-1555.5.patch, KAFKA-1555.5.patch, > KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14192223#comment-14192223 ] Joel Koshy commented on KAFKA-1555: --- Given the current implementation, I think the documentation looks good. I do have one comment/question on the implementation though (at the end). I have a couple of minor typo corrections and suggestions here: * all (-1) _replicas_ * acknowledgement by all _replicas_ * (_which_ is achieved..) * _provides_ the _strongest_ durability guarantee * A message that _has been acknowledged_ by all in-sync replicas will not be lost as long as at least one of those in-sync replicas _remains available_. * The sentence that follows ("Note, however...") contains details that seem redundant to what has already been said in parantheses. So we can remove one or the other. * Instead of "To avoid this _unfortunately_ condition" -> _"Although this ensures maximum availability of the partition, this behavior may be undesirable to some users who prefer durability over availability. Therefore, we provide two topic-level configurations ..."_ * if all replicas _become unavailable_, _then_ the partition will remain unavailable until the _most recent_ leader becomes available again. _This effectively prefers unavailability over the risk of message loss. See _the_ previous section on unclean leader election for more details. * _Specify_ a minimum ISR size: ... above a certain minimum, _in order_ to prevent _the_ loss of messages... just a single replica, which _subsequently_ becomes unavailable... guarantees that the message will be _acknowledged at least this many in-sync replicas_. * "The trade-off here" - appears to be in a separate paragraph altogether, but it seems it should belong under the second point on min.isr * Also, perhaps we can rephrase it a bit: _"This setting offers a trade-off between consistency and availability. A higher setting for minimum ISR size guarantees better consistency since the message is guaranteed to be written to more replicas which reduces the probability that it will be lost. However, it reduces availability since the partition will be unavailable for writes if the number of in-sync replicas drops below the minimum threshold._ My only remaining concern about the current implementation is that min.isr is a broker-topic config and not explicitly a producer config. However, it currently takes effect only if {{acks == -1}}. That seems slightly odd to me. i.e., we could just as well have it take effect even if {{acks == 0/1}} - i.e., reject the append if the current {{|ISR| < min.isr}} (with the caveat of NotEnoughReplicasAfterAppend) regardless of ack setting. Do you think this is uninituitive for users? > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555-DOCS.1.patch, > KAFKA-1555-DOCS.2.patch, KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, > KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. a
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14191241#comment-14191241 ] Joel Koshy commented on KAFKA-1555: --- ack never mind.. I see what you mean. && requiredAcks == -1 I'll re-read the doc tomorrow :) > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555-DOCS.1.patch, > KAFKA-1555-DOCS.2.patch, KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, > KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14191239#comment-14191239 ] Joel Koshy commented on KAFKA-1555: --- Yeah what I wrote earlier was incorrect - sorry. It does not help decide when to commit a message, but it helps decide whether or not to append a message in the first place. I also agree that it is not completely independent of the ack config since the delayedProduce comes into the picture for ack == -1. However, {quote}min.insync.replicas has no effect unless acks=-1{quote} Even if acks == 0 or 1, in Partition.appendMessagesToLeader we reject writes if |ISR| is < minIsr (regardless of ack setting) so it definitely has an effect even in that case. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555-DOCS.1.patch, > KAFKA-1555-DOCS.2.patch, KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, > KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14191202#comment-14191202 ] Gwen Shapira commented on KAFKA-1555: - " it is independent of the producer's ack config" Actually, it is not. min.insync.replicas has no effect unless acks=-1. I hope the documentation makes that clear. min.insync.replicas is used within delayedProduce() to decide when enough replicas acked the message. Is that what you mean by "declare committed"? Part of the change (and maybe an unfortunate part) was to push awareness of request.required.acks to this part of the code, so we'll know when to throw NotEnoughReplicasException. I'm ok swapping the order and starting by explaining about ISR size and guarantees, but I'm pretty sure the connection with request.required.acks is important to point out. The feature doesn't fully makes sense without it (at least it doesn't when I explain it...) > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555-DOCS.1.patch, > KAFKA-1555-DOCS.2.patch, KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, > KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14191196#comment-14191196 ] Joel Koshy commented on KAFKA-1555: --- Sorry for the late feedback. configuration.html: - min.insync.replicas: "When a producer sets request.required.acks" - I don't think we should start with that statement, because the min.insync.replicas concept is more general than having an effect on producer result. i.e., it is independent of the producer's ack config. It is used to help decide when to declare a message is committed (This is what I was trying to convey in my earlier comment). design.html: - Availability and durability guarantees: - Similar point - there are two concepts at play here: the ack setting on the producer and durability guarantee on the broker. The mechanism for committing messages is separate from the producer's ack setting. The section starts off from the perspective of the producer and then progresses into durability. I actually think it might be clearer to do it the other way around. i.e., if we say - a message received at the broker is committed (and declared durable and exposed to consumers) when the ISR set receives the message.. ISR can shrink though.. higher min.isr facilitates stronger durability guarantee.. and then talk about the producer ack setting. This is just a suggestion - I'm not very sure if the above will be better or not but I think it would be a more intuitive progression for users. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555-DOCS.1.patch, > KAFKA-1555-DOCS.2.patch, KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, > KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14184383#comment-14184383 ] Gwen Shapira commented on KAFKA-1555: - [~kbanker] Regarding the 3-4 typical scenarios: I'll be happy to see what you have, because I can't think of more than 2 :) min.isr = 1 -> preserve current behavior, favoring availability over consistency min.isr = 2,3, etc... I'd rather not write data at all if I can't be guaranteed to have it on at least N machines. This allows me to know that I'll never lose data when suffering N-1 crashes. (I suspect that min.isr =2 and min.isr =3 will be popular choices). Note that with acks>1 gone, the design and tradeoffs are way simpler than they used to be. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555-DOCS.1.patch, > KAFKA-1555-DOCS.2.patch, KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, > KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14183411#comment-14183411 ] Gwen Shapira commented on KAFKA-1555: - [~jjkoshy]: * I did not make the suggested changes to ops.html since its not related to this JIRA and this line was not modified in this patch. (Actually, the line you refer to doesn't seem to exist in my copy of the docs...) * "The above should probably be clarified a bit. i.e., availability of a replica affects whether a message will be lost or not only during the time it is yet to be replicated to all assigned replicas." I've re-phrased the sentence per Kyle's suggestion. Let me know if it works now. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555-DOCS.1.patch, > KAFKA-1555-DOCS.2.patch, KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, > KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14183393#comment-14183393 ] Kyle Banker commented on KAFKA-1555: I like the section on Availability and Durability Guarantees, but I believe that, in addition, it would be useful to suggest 3 or 4 typical durability configurations and the trade-offs provided by each one. As of now, users still have to infer from the docs the ideal settings for all of the following: topic replication factor, min.insync.replicas, request.required.acks, and whether or not to disable unclean leader election. I'd be happy to write up a draft of these scenarios as I understand them if folks think this would be a good idea. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555-DOCS.1.patch, > KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, > KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch, > KAFKA-1555.5.patch, KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14183382#comment-14183382 ] Kyle Banker commented on KAFKA-1555: The new durability docs looks great so far. Here are a couple more proposed edits to the patch: EDIT #1: min.insync.replicas: When a producer sets request.required.acks to -1, min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend). When used together, min.insync.replicas and request.required.acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with request.required.acks of -1. This will ensure that the producer raises an exception if a majority of replicas do not receive a write. EDIT #2 (for request.required.acks): -1. The producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the greatest level of durability. However, it does not completely eliminate the risk of message loss because the number of in sync replicas may, in rare cases shrink, to 1. If you want to ensure that some minimum number of replicas (typically a majority) receive a write, the you must set the topic-level min.insync.replicas setting. Please read the Replication section of the design documentation for a more in-depth discussion. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555-DOCS.1.patch, > KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, > KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch, > KAFKA-1555.5.patch, KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14183304#comment-14183304 ] Joel Koshy commented on KAFKA-1555: --- Looks good - I just have a few minor edits to what you wrote. configuration.html _Minor edits: also, it would be good if we can also mention NotEnoughReplicasAfterAppend and document it_ min.insync.replicas: The minimum number of replicas that are required to declare a message as committed. If the number of in-sync replicas drops below this threshold, then writing messages with request.required.acks set to -1 will return a NotEnoughReplicas or NotEnoughReplicasAfterAppend error code. This is used to provide enhanced durability guarantees - i.e., all in-sync replicas need to acknowledge the message AND there needs to be at least this many replicas in the set of in-sync replicas. ops.html: log.cleanup.interval.mins=30 -> log.retention.check.interval.ms=30 design.html Couple of comments: * all (or -1) brokers - maybe make it clear up front that this is all current in-sync replicas, and later clarify that consistency can be preferred over availability via the min.isr property * bq. a message that was acked will not be lost as long as at least one in sync replica remains ** The above should probably be clarified a bit. i.e., availability of a replica affects whether a message will be lost or not only during the time it is yet to be replicated to all assigned replicas. * It would be useful to describe how min.isr helps facilitate trading off consistency vs availability * There are a couple of typos in various places > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555-DOCS.1.patch, > KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, > KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch, > KAFKA-1555.5.patch, KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14183086#comment-14183086 ] Joel Koshy commented on KAFKA-1555: --- Jun, you can just diff configuration.html, design.html between the files in the two directories. Or you can use a tool like meld I have a few comments, that I will post later after some suggested edits. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555.0.patch, > KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, > KAFKA-1555.4.patch, KAFKA-1555.5.patch, KAFKA-1555.5.patch, > KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14182988#comment-14182988 ] Jun Rao commented on KAFKA-1555: [~gwenshap], it's bit hard to see what's actually changed. I copied the 081 docs to an 082 dir. Could you create a new patch on top of the 082 dir? Thanks, > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555.0.patch, > KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, > KAFKA-1555.4.patch, KAFKA-1555.5.patch, KAFKA-1555.5.patch, > KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14170272#comment-14170272 ] Guozhang Wang commented on KAFKA-1555: -- Created KAFKA-1704 for partition configs. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, > KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14170256#comment-14170256 ] Jun Rao commented on KAFKA-1555: Committed to 0.8.2 too. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, > KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14170095#comment-14170095 ] Jun Rao commented on KAFKA-1555: When a client receives a NotEnoughReplicasAfterAppendException, it can choose to stop producing or wait and then retry. So, we can probably make NotEnoughReplicasAfterAppend a non-retriable exception and let the client decide what to do. All the topic configs are essentially per partition. However, some of those configs are applicable to the log and some others are only applicable to the logical partition. So, we can probably split topic configs into a logConfig and a partitionConfig and pass them to Log and Partition, respectively. This can be handled in a separate jira though. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, > KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14170002#comment-14170002 ] Guozhang Wang commented on KAFKA-1555: -- Another thought about the min.isr config: we put it into the log configs since we do not have partition configs besides the global server configs and log configs, and by doing so we are required to pass the requiredAcks from replica manager to partition, then to log in order to have it checked against the min isr. It may be better to add the partition configs and move min.isr into that config set instead of making it part of the log configs. Thoughts? > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, > KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14169987#comment-14169987 ] Guozhang Wang commented on KAFKA-1555: -- One question I have while trying to rebase KAFKA-1583 on KAFKA-1555: for the NotEnoughReplicasAfterAppend exception, what should be the behavior of the client? Currently the producer client would retry on all exceptions, but would that be ideal for NotEnoughReplicasAfterAppend, since the message has already been appended and retrying will guarantee duplicates? > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, > KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14168996#comment-14168996 ] Sriram Subramanian commented on KAFKA-1555: --- [~gwenshap] +1 on your suggestion. We can get the documentation ready and then do the linking during the release. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, > KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14168930#comment-14168930 ] Gwen Shapira commented on KAFKA-1555: - Awesome. So I'm updating here: http://svn.apache.org/repos/asf/kafka/site/ It looks like the right thing to do is to create a 082 subdir, copy the current documentation there and add the new reliability guarantees. We can then re-point the main doc link to that directory when we release. I think this is less confusing that modifying the docs after the release. Does that make sense? BTW. There are probably more stuff we need to document for 0.8.2 - perhaps we need to start filing doc jiras for the new version? The new producer will be very high on that list. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, > KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14168692#comment-14168692 ] Neha Narkhede commented on KAFKA-1555: -- bq. My vote would be to update our documentation - http://kafka.apache.org/documentation.html +1. Though it will be less confusing if we wait for 0.8.2 to be released. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, > KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14167464#comment-14167464 ] Sriram Subramanian commented on KAFKA-1555: --- My vote would be to update our documentation - http://kafka.apache.org/documentation.html It currently refers to 0.8.1. We should make 0.8.2 the current one after the release. The Design section can have "Guarantees" portion that talks about what guarantees that Kafka gives w.r.t consistency Vs availability and when. What do the rest think? > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, > KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14167261#comment-14167261 ] Gwen Shapira commented on KAFKA-1555: - will be happy to do that. Is the wiki the right place? > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, > KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14167113#comment-14167113 ] Sriram Subramanian commented on KAFKA-1555: --- Awesome. I suggest we document the guarantees provided by the different knobs. That would be very useful. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, > KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14167057#comment-14167057 ] Jay Kreps commented on KAFKA-1555: -- w00t! > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, > KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14166147#comment-14166147 ] Jun Rao commented on KAFKA-1555: Also created a followup jira (KAFKA-1697) to remove the ack>1 support on the broker side. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, > KAFKA-1555.8.patch, KAFKA-1555.9.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14162942#comment-14162942 ] Joel Koshy commented on KAFKA-1555: --- +1 (took a quick look at it) We could improve the retry handling on the producer in a separate jira - i.e., avoid unnecessarily sending duplicates as described in the earlier comment. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14162833#comment-14162833 ] Jun Rao commented on KAFKA-1555: The patch that Gwen provided (using a min.isr topic level config) looks good to me (other than a few minor comments). If anyone else is interested in reviewing, please take another look. If there is no objection, I will most likely commit the patch once the remaining minor comments are resolved. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, > KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14148795#comment-14148795 ] Joel Koshy commented on KAFKA-1555: --- This is an implementation detail but wrt the duplicates scenario. i.e., 4(a) - on a not-enough-replicas error, the producer implementation could just stop retrying the producer request and pounding the broker(s) until the ISR returns to the min.isr level. It can instead switch to refreshing the topic metadata with backoff and checking the current ISR cardinality which is included in the metadata response (although it is currently inaccurate and needs to be fixed - KAFKA-1367). As soon as it returns to min.isr it can retry the request. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14148557#comment-14148557 ] Neha Narkhede commented on KAFKA-1555: -- Followed the discussion and want to share some thoughts- 1. With the new setting, it is necessary to get rid of the current semantics of acks > 1. 2. Given 1, initially I liked the approach of overriding acks > 1 to be the semantics provided by what's described for min.isr in this JIRA. This is actually more intuitive compared to -2 .. -N. A downside of this approach, though, is that clients would need to know the replication factor of the topic which is known to the admin and also changeable only by the admin. So, probably moving it to be a topic level config that is also changeable by the admin makes sense. 3. Since we are adding a topic level config for the min.isr setting, it makes a lot of sense to express acks to be an enum. I think users would find it much more intuitive if the acks config was expressed as {no_ack, leader_ack, committed_ack} 4. It is important to reject the write on the server when the requested min.acks is greater than the size of the current ISR. It is true that the ISR size could change immediately after rejecting the write, but it would lead to far fewer duplicates. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14148542#comment-14148542 ] Jun Rao commented on KAFKA-1555: 4b or 4d has the same issue that the value of min.isr or ack may not work with all topics that can be sent through the same producer instance. 4c has the issue that the broker can't reject messages if isr is less than min.isr. So, personally, I think 4a is still the best option. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14148520#comment-14148520 ] Sriram Subramanian commented on KAFKA-1555: --- 2. I agree. I think what min_isr helps in is to have a way to specify "I don't want to loose my data as long as min_isr - 1 number of nodes are down". For example, if no_of_replicas=3, min_isr = 2 and ack=-1, we should not loose data as long as one node is down even when there is an unclean leader election. In this particular case, when the leader fails, it is expected that all replica nodes are up but could be out of the isr. Under such constraints it is definitely possible to prevent data loss (ignoring data loss due to system failures and data not flushed to disk) by making the node with the longest log (assuming we ensure they don't diverge) as the leader. 3. I prefer b or c. d is attractive since you could use just one variable to define your required guarantees but it is hard to understand at the API level. 4. I totally agree. The issue is ISR takes a while to reflect the actual reality. Assume we failed early before writing to the local log and did not have any checks after writing. Replicas go down. It would take a while for the isr to reflect that the replicas are not in the isr anymore. During this time, we would simply write the messages to the log and loose it later. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14148510#comment-14148510 ] Jun Rao commented on KAFKA-1555: a. I agree that unclean leader election is an orthogonal issue from min.isr. Unclean leader election only happens when all replicas in isr fail. This can happen independent of the value of min.isr. The higher the value of min.isr, the lower the probability that an unclean leader election will happen. b. The issue with exposing min.isr in the produce config is that a producer can be used to send multiple topics. Since different topics can have different replication factor, it maybe difficult to configure a proper min.isr that works for all topics. We had the same issue with ack > 1 before. However, that option will be removed in Gwen's patch. From this perspective, I think having min.isr as a topic level config makes sense. c. I agree that it's better to add a separate check to prevent the message into the leader's log if the isr at that moment is less than min.isr. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14148483#comment-14148483 ] Jay Kreps commented on KAFKA-1555: -- 1. Yeah this may have been a mistake. I think integers are fine for the protocol. We need not expose them in this way in the client config. 2. Yeah I think what I said wasn't very clear. You are correct that the loss situation is always an unclean election. I think your complaint is that min.isr is hard to reason about. I actually agree. What I was trying to say was that you can't totally prevent data loss either by replication or min.isr because regardless of the number of writes there is some chance of more failures than that. So fundamentally using the number of writes/replicas is a way to increase the probability of durability. Both are hard to reason about but I don't know if min.isr is worse than replication factor in that respect. 3. Which of the four options are you saying you like? 4. Yes, I totally agree. Let me elaborate. I claim there are really only two common cases here (a) you have the ability to block and wait for sufficient available replicas holding onto whatever unwritten data in the meantime, (b) you don't. I think probably the majority of uses are (b), for example any data produced by a web application would be this way. But there are plenty of cases where you can block (a stream processing job reading from an upstream topic, or when replicating data coming out of a database, etc). min.isr only helps the case where you can block. So the only sane way to use min.isr is to also set retries to infinity and keep trying until you are sure the write has succeeded. But if we don't fail fast on the write imagine how this would work in practice. A server fails bringing you below your min.isr setting and for the hour while someone is fixing it your process is sitting there pumping out duplicate writes. This is likely not what anyone would want. But as you say you can't guarantee the data wasn't written because the isr could shrink after the write occurred. This would be rare, but possible. However attempting to fail fast, even though it isn't perfect, fixes the problem issue--it is possible there may be a duplicate but that is true anyway just due to network timeouts, but there won't be a billion duplicates which is what I consider to be the blocker issue. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14148450#comment-14148450 ] Gwen Shapira commented on KAFKA-1555: - Call me a control freak, but "don't lose data" is a critical requirement, and I'd like to control as much as possible of it in a central location that can be monitored and audited. I can't see how we can force everyone who produces data to do the right thing. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14148407#comment-14148407 ] Sriram Subramanian commented on KAFKA-1555: --- Thank you for summarizing all the thoughts Jay. 1. I had issues with how ack was designed initially with the min_isr config and it looks a lot better now with ack = 0, ack = 1 and ack = -1. I still think ack should be an enum explaining what it does rather than using -1 or any arbitrary integers. 2. I don't see the value of min_isr if it does not prevent data loss under unclean leader election. If it was a clean leader election, we would always have one other replica that has the data and min_isr does not add any more value. It is completely possible to ensure there is no data loss with unclean leader election using the min_isr and I think that is the real benefit of it. 3. Has I had said previously, I like the sender to know what guarantees they get when they send the request and would opt for min_isr being exposed at the API level. 4. W.r.t your last point, I think it may not be possible to avoid duplicates by failing before writing to the log. The reason is that the isr could become less than min_isr just after the check and we could still end up failing the request after a timeout. Agreed, this is an edge case and we end up with a lot less duplicates. So I think, you would need the check in both places. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
Jay, thank you for the excellence. Regarding your point 6, I would just like to mention that with a partial function style interface facet for event listener registrations on top of FarRefs and RemotePromiseRefs, in ERights style distributed promises, it is easy to think about multiple registrations for different future scenarios, scheduled or failure. Even if undetected partition occurs, replays can be revoked after a previous attempt succeeds or other client defined strategies. I do not know if this is considered for 0.9. Thank you, - Rob > On Sep 25, 2014, at 3:53 PM, "Jay Kreps (JIRA)" wrote: > > >[ > https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14148371#comment-14148371 > ] > > Jay Kreps commented on KAFKA-1555: > -- > > This is a good discussion, I am glad we are taking the time to think this > through carefully. Let's aim for the right end state rather than optimizing > for what is easiest to implement now (since these features never get removed > and we end up spending a lot of time explaining them). > > [~sriramsub] it sounds like you are not totally sold on min.isr. Let me try > to summarize a few things people have said that I think are true and see if > people can agree on them: > > 1. Unclean leader election is an orthogonal issue. Regardless of settings, > choosing a leader that is not caught up means losing data. This option covers > catastrophic recovery (i.e. no server with complete data exists). We can give > finer control over whether unclean election is manual or automatic but I > think you need to have this escape hatch for the case where the authoritative > copy of the data is destroyed. > > 2. Specifying a min.isr does actually make sense. I think people have one of > two cases in mind. In one case non-availability means data loss. This is > likely the most common case. In this case even if you are down to your last > replica you still want to perform the write because there is still some hope > the data will not be lost and if you refuse the write the chance of loss is > 100%. In another case non-availability can be tolerated because something > upstream (perhaps the client or another system) can hold onto the data and > retry later. In this case you want to be sure that when you accept a write it > is safe. In this case refusing a write is okay but accepting a write and then > losing it is much worse. It's true that it is very hard to reason about the > right min.isr as that depends on the probability of failure over time. But > this criticism is also true of replication factor (e.g. to know an > appropriate replication factor to yield a particular probability of data loss > you need to know the joint probability distribution over machine failures). > > 3. With regard to min.isr there are three issues: (1) what are the settings > that actually make sense, (2) what is the best way to express these in the > protocol, and (3) what is the best way to represent this in the client > configuration. I think we need to start by agreeing on (1). > > 4. I believe people are actually in agreement that the following settings > make sense: > a. acks = 0, min.isr=0 > b. acks = 1, min.isr = 1 > c. acks = -1, min.isr in {1, ..., N} > Conversely no other settings make sense. Does everyone agree on this? If so > the question is really how to expose this to the user. > > 4. There were several proposals for how to express these options. > a. The current proposal is to have acks remain in the protocol with its > original meaning and values but add a topic configuration controlling > min.isr. I personally think this is a bit weird since both about the > definition of success for the request so it makes sense to send them with the > request. > b. Alternately we could add a new field in the produce request specifying the > min.isr. > c. Alternately we could add a new field in the response returning the actual > isr size. An advantage of this is that it allows the client to distinguish > between "write failed" and "write succeeded but not with enough replicas". > d. Finally we could overload acks = -2, -3, etc to mean acks = -1 and min.isr > = 2, 3, etc. This sounds insane, but maybe it actually isn't. Since not all > combinations of acks and min.isr make sense, this does actually enumerate the > sensible cases. > > 5. Regardless of what we do in (4) the configuration can be somewhat simpler. > Probably just by having the user specify min.isr and erroring out if they > combine min.isr and acks in a non-sensical way. > > 6. It isn't clear to me whether the right behavior is to fail fast when the > min.isr likely won't be met (because the current isr < min) and not attempt > the write at all or else to always do the write and then return an error if > the min.isr isn't met. The later case means that retries, which are
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14148371#comment-14148371 ] Jay Kreps commented on KAFKA-1555: -- This is a good discussion, I am glad we are taking the time to think this through carefully. Let's aim for the right end state rather than optimizing for what is easiest to implement now (since these features never get removed and we end up spending a lot of time explaining them). [~sriramsub] it sounds like you are not totally sold on min.isr. Let me try to summarize a few things people have said that I think are true and see if people can agree on them: 1. Unclean leader election is an orthogonal issue. Regardless of settings, choosing a leader that is not caught up means losing data. This option covers catastrophic recovery (i.e. no server with complete data exists). We can give finer control over whether unclean election is manual or automatic but I think you need to have this escape hatch for the case where the authoritative copy of the data is destroyed. 2. Specifying a min.isr does actually make sense. I think people have one of two cases in mind. In one case non-availability means data loss. This is likely the most common case. In this case even if you are down to your last replica you still want to perform the write because there is still some hope the data will not be lost and if you refuse the write the chance of loss is 100%. In another case non-availability can be tolerated because something upstream (perhaps the client or another system) can hold onto the data and retry later. In this case you want to be sure that when you accept a write it is safe. In this case refusing a write is okay but accepting a write and then losing it is much worse. It's true that it is very hard to reason about the right min.isr as that depends on the probability of failure over time. But this criticism is also true of replication factor (e.g. to know an appropriate replication factor to yield a particular probability of data loss you need to know the joint probability distribution over machine failures). 3. With regard to min.isr there are three issues: (1) what are the settings that actually make sense, (2) what is the best way to express these in the protocol, and (3) what is the best way to represent this in the client configuration. I think we need to start by agreeing on (1). 4. I believe people are actually in agreement that the following settings make sense: a. acks = 0, min.isr=0 b. acks = 1, min.isr = 1 c. acks = -1, min.isr in {1, ..., N} Conversely no other settings make sense. Does everyone agree on this? If so the question is really how to expose this to the user. 4. There were several proposals for how to express these options. a. The current proposal is to have acks remain in the protocol with its original meaning and values but add a topic configuration controlling min.isr. I personally think this is a bit weird since both about the definition of success for the request so it makes sense to send them with the request. b. Alternately we could add a new field in the produce request specifying the min.isr. c. Alternately we could add a new field in the response returning the actual isr size. An advantage of this is that it allows the client to distinguish between "write failed" and "write succeeded but not with enough replicas". d. Finally we could overload acks = -2, -3, etc to mean acks = -1 and min.isr = 2, 3, etc. This sounds insane, but maybe it actually isn't. Since not all combinations of acks and min.isr make sense, this does actually enumerate the sensible cases. 5. Regardless of what we do in (4) the configuration can be somewhat simpler. Probably just by having the user specify min.isr and erroring out if they combine min.isr and acks in a non-sensical way. 6. It isn't clear to me whether the right behavior is to fail fast when the min.isr likely won't be met (because the current isr < min) and not attempt the write at all or else to always do the write and then return an error if the min.isr isn't met. The later case means that retries, which are the sane thing to do when you get the error, will lead to potentially many duplicates. In fact in the common case where you want to kind of block and keep trying the write until durability can be guaranteed even if that takes a few minutes this might means thousands of duplicates. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFK
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14147967#comment-14147967 ] Sriram Subramanian commented on KAFKA-1555: --- ack = -1 with clean leader election already prevents data loss. If min_isr=2, I would expect the data to be never lost when the leader fails. That should be the simplest guarantee the system should provide. We should not add further clauses to this or it would be impossible to define the system. If we were to say - "with min_isr-2 and ack=-1 you just reduced the probability of loss but it could still get lost under unclean leader election", we will loose credibility on these settings. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14147943#comment-14147943 ] Joel Koshy commented on KAFKA-1555: --- I'm +1 on the concept of min.isr and acks -1, 0, 1. This is a very interesting and important thread - sorry I missed most of it until I spent a couple of hours (!) yesterday chewing on these comments. Sriram, with regard to your second point - I had a similar concern and I think we talked about it, but not sure if it is the same issue though. i.e., at the point the leader responds to the producer it knows how many followers have received the messages so if only min.isr - 1 replicas have been written to then the leader would return a NotEnoughReplicas error code. I agree that subsequent data loss is possible on unclean leader elections. However, that is sort of expected. I think Joe provided a good interpretation of min.isr - i.e., "it provides a balance between your tolerance for the probability of data loss for stored data and the need of availability of brokers to write to". For lower probability of loss - i.e., lower probability of unclean leader elections one would use a higher min.isr. Avoiding (or rather reducing) data loss on unclean leader elections I think is an orthogonal issue that other jiras such as KAFKA-1211 touch upon. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14147900#comment-14147900 ] saurabh agarwal commented on KAFKA-1555: Along with min.isr setting, the "unclean leader election" is required to set to false in order to avoid the data loss. Let's take a scenario - number of replica is 3, and one replica is falling behind. This replica will be moved out of isr. Due to "unclean leader election" is turned off, it will not become a leader. Now in isr, there will only be 2 replicas left. If the leader goes down, then the other replica in isr will become leader. Due to min.isr=2, the write will not be successful until the another replica will join the ISR. I don't think there will be any data loss in this scenario. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14147474#comment-14147474 ] Sriram Subramanian commented on KAFKA-1555: --- 1. Great. Not supporting all values above ack > 1 is a good step. We are essentially not using it as an integer any more. I would still love it to be made more explicit with an enum for clarity. 2. Also, by setting ack = -1 and min_isr = 2, we still do not avoid data loss when one broker goes down. The issue is the way we select a leader. When a request was written to the leader, the min_isr check could have succeeded and we would have written to min_isr - 1 number of replicas. However, the replicas could subsequently go out of the isr. When the leader fails after that, we would have an unclean leader election and select any replica as the leader and it could be one that was lagging. To completely guarantee no data loss, we would need to do the following a. Ensure logs do not diverge on unclean leader elections b. Choose the broker with the longest log as the leader 3. We may have not documented ack > 1 but since it is an integer, there are chances somebody could be using it. In such a case this could be a backwards incompatible change. It would be worth mentioning it in the release notes. 4. Long term, I think the min_isr config should be in the API. This gives better control per message and explicitly lets the caller know what guarantees they get. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14147395#comment-14147395 ] Gwen Shapira commented on KAFKA-1555: - Also, attached a version of the patch with few improvements pointed out by [~junrao] > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14147394#comment-14147394 ] Gwen Shapira commented on KAFKA-1555: - +1 for dropping ack > 1 The whole Jira started when someone pointed out that this parameter is incredibly misleading. Just for the record, the difference between ack=2 and ack=-1/min.isr=2 is: With ack=2, you guarantee that 2 replicas on ISR acked the message. However, if the ISR has 7 replicas, there are 5 replicas that DID NOT ack the request. Every one of them can get elected to a leader and therefore lose the message. (Aside from the obvious different that ack=2 is undocumented feature) With ack=-1 and min.isr=2, ALL replicas on ISR acked the message AND we guarantee that there are at least 2 of those, so we avoid the case where ISR is just the leader and if it crashes we lose messages. Hope this clarifies things. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14147353#comment-14147353 ] Joe Stein commented on KAFKA-1555: -- << drop the support of ack > 1 +1 > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14147352#comment-14147352 ] Jun Rao commented on KAFKA-1555: Sriram, With this patch, I am thinking that the ack mode will only be -1, 0 and 1, which is what's actually documented in the website. Our code does support ack = 2. However, it's a bit hard to understand and it's not very useful since it's not much safer than ack=1. The semantic of min.isr = 2 is actually what people want. My recommendation is to drop the support of ack > 1. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14146751#comment-14146751 ] Sriram Subramanian commented on KAFKA-1555: --- Jun, I am not arguing that we should not have the feature. I am arguing about what is the best way to expose that feature. I think ack being a number along with min isr being another number is very confusing. The ack really does not indicate if the system is opting for availability or consistency today. The min_isr also works only for ack=-1. Cases where ack = 2 and min_isr = 2 are very confusing to reason about. In this case, we would still end up writing only to the ISR and return success. If ISR = 1, it just make system not behave in any predictable way. We should either change how ack is implemented today or move these options to the API so that the caller knows what they are opting for. If this is an interim solution, I would like to see a JIRA filed to revisit this. It is usually hard to change things later if the users get used to how a system behaves. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14146173#comment-14146173 ] saurabh agarwal commented on KAFKA-1555: min.isr feature is critical for us. In our use cases, we can not affort the data loss once the data get published to Kafka. And at the same time, we would like to tolerate the failure of relica(s). In our use case, we would like to have min.isr is 2 and replication 3 or more. in that case, we will have 2 min copy in kafka and able to tolerate failure of one replica. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14145894#comment-14145894 ] Jun Rao commented on KAFKA-1555: Sriram, In my mind, min.isr can be 1 to n-1 where n is the replication factor (except when n = 1). The reason that one wants to have more than 1 replica is to tolerate failures. Setting min.isr to n means that one can't tolerant any failure, which defeats the purpose of replication. I am not sure how widely this min.isr feature will be used. Given that, the current approach is probably the least intrusive. If this is indeed a feature that many people want to use and find the configuration confusing, we can revisit the issue in the future. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14145096#comment-14145096 ] Joe Stein commented on KAFKA-1555: -- [~gwenshap] yup, in that case the issue was between the chair and the keyboard (me) {code} root@precise64:/opt/apache/kafka# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testNew --sync --request-required-acks -1 A [2014-09-23 17:36:37,127] WARN Produce request with correlation id 2 failed due to [testNew,1]: kafka.common.NotEnoughReplicasException (kafka.producer.async.DefaultEventHandler) [2014-09-23 17:36:37,248] WARN Produce request with correlation id 5 failed due to [testNew,2]: kafka.common.NotEnoughReplicasException (kafka.producer.async.DefaultEventHandler) [2014-09-23 17:36:37,364] WARN Produce request with correlation id 8 failed due to [testNew,2]: kafka.common.NotEnoughReplicasException (kafka.producer.async.DefaultEventHandler) [2014-09-23 17:36:37,480] WARN Produce request with correlation id 11 failed due to [testNew,1]: kafka.common.NotEnoughReplicasException (kafka.producer.async.DefaultEventHandler) [2014-09-23 17:36:37,591] ERROR Failed to send requests for topics testNew with correlation ids in [0,12] (kafka.producer.async.DefaultEventHandler) kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.Producer.send(Producer.scala:76) at kafka.producer.OldProducer.send(BaseProducer.scala:62) at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:95) at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala) {code} awesome! > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14144999#comment-14144999 ] Gwen Shapira commented on KAFKA-1555: - [~joestein] can you share your producer configuration? NotEnoughReplicasException is expected only where request.required.acks=-1. Can you validate that this is the case? > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14144378#comment-14144378 ] Joe Stein commented on KAFKA-1555: -- [~gwenshap] patch applied, tests passed however; I ran with 3 brokers all new code and didn't get the expected results {code} Topic:testDefault PartitionCount:4ReplicationFactor:3 Configs: Topic: testDefault Partition: 0Leader: 1 Replicas: 1,2,3 Isr: 1 Topic: testDefault Partition: 1Leader: 1 Replicas: 2,3,1 Isr: 1 Topic: testDefault Partition: 2Leader: 1 Replicas: 3,1,2 Isr: 1 Topic: testDefault Partition: 3Leader: 1 Replicas: 1,3,2 Isr: 1 Topic:testNew PartitionCount:4ReplicationFactor:3 Configs:min.insync.replicas=2 Topic: testNew Partition: 0Leader: 1 Replicas: 2,1,3 Isr: 1 Topic: testNew Partition: 1Leader: 1 Replicas: 3,2,1 Isr: 1 Topic: testNew Partition: 2Leader: 1 Replicas: 1,3,2 Isr: 1 Topic: testNew Partition: 3Leader: 1 Replicas: 2,3,1 Isr: 1 {code} I am still able to produce to topic testNew (though I shouldn't be able too since 2 brokers are down and only 1 is up with min.isr=2) I got proper exceptions trying to create invalid values for the config {code} root@precise64:/opt/apache/kafka# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic testNewA --partitions 4 --replication-factor 3 --config min.insync.replicas=-1 Error while executing topic command Wrong value -1 of min.insync.replicas in Topic configuration; Valid values are at least 1 kafka.common.InvalidConfigException: Wrong value -1 of min.insync.replicas in Topic configuration; Valid values are at least 1 at kafka.log.LogConfig$.validateMinInSyncReplicas(LogConfig.scala:191) at kafka.log.LogConfig$.validate(LogConfig.scala:179) at kafka.admin.TopicCommand$.parseTopicConfigsToBeAdded(TopicCommand.scala:204) at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:84) at kafka.admin.TopicCommand$.main(TopicCommand.scala:54) at kafka.admin.TopicCommand.main(TopicCommand.scala) root@precise64:/opt/apache/kafka# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic testNewA --partitions 4 --replication-factor 3 --config min.insync.replicas=4 Error while executing topic command replication factor: 3 larger than available brokers: 1 kafka.admin.AdminOperationException: replication factor: 3 larger than available brokers: 1 at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70) at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:171) at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:92) at kafka.admin.TopicCommand$.main(TopicCommand.scala:54) at kafka.admin.TopicCommand.main(TopicCommand.scala) {code} > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch, KAFKA-1555.3.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1.
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14144101#comment-14144101 ] Gwen Shapira commented on KAFKA-1555: - Uploaded an updated patch addressing [~joestein] comments. Few clarifications: 2) Partition.checkEnoughReplicasReachOffset() does not have its own test cases (actually no tests for Partition at all). So added tests in ProducerFailureHandlingTest and SyncProducerTest. 7) "We should have a warning message I think would make sense in KafkaApis appendToLocalLog on server side, no?" I added the warning message, but I think we should take it out. The exception doesn't get thrown at the "appendToLocalLog", it gets thrown at the producerRequestPurgatory.checkAndMaybeWatch, which doesn't do its own logging. So basically, nothing gets logged. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, > KAFKA-1555.2.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14143701#comment-14143701 ] Joe Stein commented on KAFKA-1555: -- << I am more interested in being able to specify why 2 and not N The value of the min.isr is a balance between your tolerance for data loss for stored data and need of availability of brokers to write to. Your tolerance of availability (X nodes being unavailable but still able to ACK writes) is the difference between your replication factor (Y) and the number of servers you must save data to (Z) when using ACK=-1 and min.isr = Z. X=Y-Z. If your tolerance for node failures is 3 and you require to always have 2 servers running to write to (otherwise failure to write occurs to clients) then you have to set your replication factor to 5 and use ACK=-1 with a min.isr = 3. If you require more servers to write to for sucecsfull writes but the same amount of node failures then you must also up your replication factor. ... something like that... > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14143628#comment-14143628 ] Sriram Subramanian commented on KAFKA-1555: --- Surviving the leader crash makes sense. That is provided by any value from 2 to N. I am more interested in being able to specify why 2 and not N? Performance? Availability? Probability of data loss? If so, we should be able to quantify it. I don't want to drag this discussion but I think it is a common mistake to not quantify what are the benefits of choosing one value over the other between 2...N-1 and pushing that choice to the users by providing a config that is fine grained. It would be great to document this use case with an example and indicating how performance, availability, data loss are affected by choosing one value over the other. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14143622#comment-14143622 ] Joe Stein commented on KAFKA-1555: -- I agree with [~gwenshap] A business case for this is being able to tolerate rack/server/zone related data unrecoverable failures in case of ISR shrinking to 1. If you have 2 ... N-1 min isr then you guarantee that at times of broker failures that the ISR equals your minimum hardware tolerance for acknowledging a successful write and being able to recover that write. For MANY data types this is equal to 2 (or 3) just so you have the data on another rack, dc, server, zone, whatever. As it exists now, with the ACK=-1 and ISR = 1 you could get into the condition where you are successfully writing/acking to 1 broker (assuming replication 3 and 2 nodes have failed) and then if that last broker standing dies before another comes up and replicates what it was writing for that "window of time where the ISR was 1" you will have lost the data that is "deemed saved". In that failure case everything written during "window of time where the ISR was 1" will never be recoverable even though the producer thought so and had acked it. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14143600#comment-14143600 ] Gwen Shapira commented on KAFKA-1555: - [~sriramsub] 2..N-1: We want to be sure writes exist in *at least* X nodes. Usually 2 or 3, to make sure you can survive a leader crash without losing those writes (thats the original request for this JIRA). In this case set min.insync.replicas to 2 (or 3 or whatever). > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14143578#comment-14143578 ] Sriram Subramanian commented on KAFKA-1555: --- [~gwenshap] A key thing I would like to ensure when a feature is added is if it can be easily explained to the end users of the system. A system can provide a great level of flexibility by exposing its functionalities as configs but what gets hard with these data systems is that over time there is config bloat and it gets complex to specify the guarantees that the system provides. Let us say we had N replicas. min isr = 0, 1 is trivial to explain min isr = N - use it when you need strong durability min isr = 2 ... N-1 - I need your help here. What would be a good guidance to give the users on what values to use between 2 to N-1? > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14143521#comment-14143521 ] Joe Stein commented on KAFKA-1555: -- ah, my bad. << Any suggestions regarding the problem with retries? I think that is an issue beyond this ticket that happens in other cases (e.g. MaxMessageSize and retry that 3 times won't change a thing like this same problem) that we don't have a solution for yet that "classifies" exceptions... So I think we should do some fix for it but that is not related to this ticket IMHO. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14143501#comment-14143501 ] Gwen Shapira commented on KAFKA-1555: - There is a review board (https://reviews.apache.org/r/25886/) - see link above. I agree with everything else you mentioned. I'll upload an updated patch later today. Any suggestions regarding the problem with retries? With |ISR| provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14143486#comment-14143486 ] Joe Stein commented on KAFKA-1555: -- [~gwenshap] thanks for the patch! few initial comments/can you please: 1) update review board for others to assist in reviewing/feedback a la https://cwiki.apache.org/confluence/display/KAFKA/Patch+submission+and+review 2) we need to add an extra test in Partition.checkEnoughReplicasReachOffset() please (also maybe in ProducerFailureHandlingTest too would make sense please) 3) (nit pick) in one case you use InSync and others Insync could you pick one (preferable InSync since that is more consistent with existing code) just for consistency. 4) do we want some validation if someone tries to set 0 or -1 or something for the min.isr value? 5) * note * once this is committed we should also update https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes 6) we should also update the public enums in the java producer too clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 7) We should have a warning message I think would make sense in KafkaApis appendToLocalLog on server side, no? I should be able to give this a spin in testing later tonight/tomorrow (exciting!!!) on a cluster with topics and such. I like this patch/approach too because people could apply it to 0.8.1.1 easily if they wanted (not saying (yet) it should be in 0.8.1.2) and it handles all the use cases folks have held stake on this (so far) without major changes or confusion in how it works. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14143352#comment-14143352 ] Gwen Shapira commented on KAFKA-1555: - Hi Sriram, Thank you for raising these concerns. Here are some points regarding the drawbacks: 1. This is exactly how it works right now. If you'll build Kafka with the patch I uploaded, you'll be able to use build/kafka-topics.sh to create/alter topics with min.insync.replicas parameter specified in --config flag. 2. Absolutely. [~junrao] explained how to can work (simply ignore the NotEnoughReplicas exception). The only issue we currently have is the retries, which can also be resolved by the client. 3. I disagree that this is what we are trying to solve. We are trying to give admins more control over what "durable writes" mean for specific topics. For my use-case, I'd like to have majority-write. This can be done for a 3-replica topic by setting min.insync.replicas to 2. If I wanted "all replicas", I can set min.insync.replicas=3, and if I want just ISR, I can set min.insync.replicas=1. As you can see, the current solution is very flexible and supports multiple durability requirements. It satisfies both your use-case and mine. I agree that this requires a bit more understanding of what you are trying to achieve, but I think I can document it in a way thats fairly easy to understand (with some common examples, as I explained above). > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14142967#comment-14142967 ] Sriram Subramanian commented on KAFKA-1555: --- Sorry to be late here but I think this an important change and we need to ensure this is perfectly the right behavior for the long term. To summarize the discussion and code change so far - 1. We would set the min.isr per topic in log config 2. We would use this config only when ack is set to -1 and fail the call if the number of in sync replicas is less than min isr The main drawbacks I see with this approach are - 1. If we plan to set this value at a per topic level, this should be part of create/modify topic and should be set during topic creation or modified later. This ensures that if we do expose a createTopic api in the protocol, it would be available to be set/modified. 2. I could see scenarios where multiple writers could have different requirements on the same topic and may not have any knowledge of how the topic was created. 3. I think what we are really solving for is to either make the write durable on all replicas or on just the in sync replicas. The min.isr value provides the option of a number and I think any value other than 0 or no_of_replicas is of no value. This would only confuse the clients when they create the topic. This is how I interpret the acks w.r.t the clients - 0 - No response required. I don't really care if the write happened 1 - I need a response after the write happened to the leader successfully > 1 - I need the write to happen on all replicas before a response. This has > two options - a. Response is sent after write happens to replicas in ISR b. Response is sent after write happens to all replicas Having an enum for ack as below is a lot clearer and sets the expectations right in my opinion. enum AckType { No_Response, Write_To_Leader, Write_To_ISR, (Chooses availability over consistency) Write_To_All_Replicas (Chooses consistency over availability) } > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14122771#comment-14122771 ] saurabh agarwal commented on KAFKA-1555: Jun, Yes. This approach works fine for our use case. It is critical for our application not to lose the message. Setting up it at the topic level is fine as well. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14120944#comment-14120944 ] Jun Rao commented on KAFKA-1555: Gwen, Thanks for helping out. I think we can take the approach that you suggested. Other than exposing the new config, the code change should be small. We just need to add an extra test in Partition.checkEnoughReplicasReachOffset(). We can get the topic level config from leaderReplica.log. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14120820#comment-14120820 ] Joe Stein commented on KAFKA-1555: -- +1 around the min.isr approach I would be happy to review/test what you come up with. Let me know if you need help/questions or whatever. My biggest concern is administrative/operational as long as we don't have to bounce brokers and we can make the topic configuration through kafka-topic.sh or such, cool (or even global set is fine too). I also have a few test environments we can try this out on once your done and a vm setup we can reproduce issues with, np. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Gwen Shapira > Fix For: 0.8.2 > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14120815#comment-14120815 ] Gwen Shapira commented on KAFKA-1555: - If there's a consensus (or even a lazy consensus) around the min.isr approach, I'll be happy to take a stab at it. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > Fix For: 0.8.2 > > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14120810#comment-14120810 ] Joe Stein commented on KAFKA-1555: -- Where have we ended up with this? Do we have an agreed to solution? I think different people throwing up patches would be a bad way to proceed. Can we come up with a 1) how we want to-do this and 2) have someone(s) take this on with an expected timeframe to completion. having it it in 0.8.2 would be great but at least a patch would be fantastic we could apply, please! Thanks!!! > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14115772#comment-14115772 ] Jun Rao commented on KAFKA-1555: Saurabh, Specifying "min.isr" at the topic level has the benefit that we can easily enforce validation (e.g., min.isr has to be <= number of replicas). Such validation will be harder to enforce on the producer side. Also, adding "min.isr" at the client side makes it a bit more complicated for us to explain the client api. This would be fine if it's sth that most clients care about. However, it seems to me that most clients would only care about whether a message is committed or not. Probably only a small number of users would bother to change "min.isr". Given that, my feeling is that it's probably simpler to add "min.isr" at the topic level. Does this cover your usage? Thanks, > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14112529#comment-14112529 ] Gwen Shapira commented on KAFKA-1555: - I agree that min.isr only makes sense with ack=-1. The idea is to add "extra durability" for producers who already care about durability enough to use ack=-1. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14112208#comment-14112208 ] saurabh agarwal commented on KAFKA-1555: Sorry for following up late. Busy in production release in past few weeks. IMHO, putting min.isr at topic level will conflict with ack at topic-request level. Let's take a scenario when the min.isr at topic level is 2 and ack at request level is 0. What would be the behaviors? Will Kafka honor ack or min.isr? Now it is two places. I think changing the current semantic of ack will avoid a lot of these scenarios. There is only one knob. Meaning might be confusing. That always was. What we can do we introduce the enumeration to give the right meaning. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14091787#comment-14091787 ] Jun Rao commented on KAFKA-1555: Another thought is that even with the "min.isr" topic level config, it is possible to support two writers with different durability guarantees. Let's say one writer wants the "min.isr" guarantee, but another one only wants the data to be committed, independent of the isr size. The latter can just ignore the error code NOT_ENOUGH_REPLICA. It's a bit inconvenient to code this in the producer. However, this is probably ok since it's going to be a rare requirement. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14087795#comment-14087795 ] Gwen Shapira commented on KAFKA-1555: - We are talking about topic level, and I completely agree with your interpretation. I don't think its odd to separate some of the reliability concerns between admin (who creates topics and configures them) and the developer (who controls specific requests). The admin already controls some of the safety guarantees by controlling the number of replicas. At the risk of using a not-100% relevant analogy, I'd like to compare the behavior to Oracle's commits (my favorite transaction log): A developer decides when to "commit" a transaction. "Commits" are always synchronous and always checkpoint to disk and therefore will take time, and they guarantee durability - a committed transaction will not disappear without some kind of a disaster. The admin controls how much of a disaster can make a commit disappear - the log can be written to a single local disk (high chance of disaster), an HA SAN (longer writes but safer), SAN + Disk (waiting for both to ack), Disk + SSD (waiting for one to ack), disk + remote replica (not waiting for replica), disk + replica (wait for replica), etc, etc. All this is managed by the DBA. The developer just says "commit", meaning "I want durability and I'm willing to wait", trusting the admins definition of durability. Anyway, all this to show that controlling durability in two different places is pretty normal in many other communities. We won't be "odd" by supporting this. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14087278#comment-14087278 ] Joe Stein commented on KAFKA-1555: -- So when we say property, are we saying a property in server.properties? So we have to bounce a broker? If so then that is #4 con If it is a property/setting on kafka-topic --create/--alter then I think that makes sense and is consistent to the changes >= 0.8.1 When folks ask then I think explaining that "request.required.acks" is controlling the request and how it is responds back from the broker (0/1/-1) that has a story to it in addition to the "safe" aspect. When you decide "safe" -1 ack request we are extending the guarantee to be a per topic request (since you don't always need every piece of data on at least two racks). > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14087243#comment-14087243 ] Jun Rao commented on KAFKA-1555: Since we can't always guarantee a message doesn't get committed if |ISR| is less than min.isr, I think it's easier and consistent to just do the commit based on the current ISR and return an error if |ISR| is less than min.isr. For the Pros, I will add 3. No wire protocol change. As for the Cons, 2. If we return isr size, do we also want to return the replica size? Perhaps a client needs both info to make a decision (e.g., only continue to send if |ISR| >= ReplicaSize /2). Overall, my feeling is that giving back more info to the client may provide a bit more power, but makes the api a bit harder to use. 3. Yes, this is possible, but probably unlikely. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14086984#comment-14086984 ] Jay Kreps commented on KAFKA-1555: -- Hey [~gwenshap], to answer your question, the behavior if you specify acks=5 with only 3 replicas would be an immediate error about insufficient replicas (same as what we do today if you do this). Just for completeness, another variation instead of specifying the min.isr in the request would be to return to the client the actual ISR size in the response. The min.isr is in some sense a threshold applied to the isr size on the server so we might as well just have the server say what happened and let the client interpret this appropriately. Gwen/Jun here are the pros and cons I see for doing a topic config: Pros: 1. A bit simpler to implement. 2. Since the min isr is bounded by the replication factor if this is a topic config we can make it impossible to specify an invalid min isr and we can automatically lower the min isr if you change the replication factor to be lower than the min isr. Cons: 1. I think it is a little odd that acks would be specified at the request level and min.isr at the topic level since both are about when a write is considered "safe". 2. Operationally, I think it is more convenient for the client to be able to control this in their code rather than having to do it globally. 3. You can imagine rare cases where you have two writers which actually have different durability requirements on the same topic. Another question? What happens if the ISR is less than min.isr? Do we do the write but return an error or do we refuse to do the write? Of course we can't be sure to refuse the write because we can do the write on the leader and then have a replica fail and the ISR shrink at the same time. But, when possible, do we refuse to write on the leader if the isr is less than min.isr? > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14086750#comment-14086750 ] Gwen Shapira commented on KAFKA-1555: - Did we agree on implementing min.isr.size as per-topic config? If so, I'd like to take a stab at the implementation. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14084280#comment-14084280 ] Gwen Shapira commented on KAFKA-1555: - Perfect. The producer can catch the error and decide if to proceed, log the "at risk" messages somewhere or whatever. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14084265#comment-14084265 ] Jun Rao commented on KAFKA-1555: Gwen, I like what you suggested. We can introduce a new per topic level config "min.insync.replicas" on the broker, which defaults to 1. For producer ack=-1, the broker will first wait until the message is committed to the current ISR, but will only return no_error if |ISR| >= "min.insync.replicas". Otherwise, it will return a NOT_ENOUGH_REPLICA error to indicate that although the message is committed, it's not committed to enough replicas. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14084033#comment-14084033 ] Gwen Shapira commented on KAFKA-1555: - If we go with the idea that ack=2 means "at least two replicas in ISR committed the message". What do we expect to happen if the producer specified ack=5 but the topic was created with just 3 replicas? I think it makes sense for the min.isr to be completely separated from the producer acks setting and be configured on topic level - When you create a topic you specify number of replicas and also minimum ISR size. This basically re-defines "commit" for the topic from: "A message is considered "committed" when all in sync replicas for that partition have applied it to their log." to "A message is considered "committed" when at least N in sync replicas for that partition have applied it to their log." That controls durability and is fairly clear. The producers can keep the semantics of no acks, just leader committed or all in-sync replicas committed - without having to care how many in-sync replicas are required. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14083141#comment-14083141 ] Guozhang Wang commented on KAFKA-1555: -- Agreed, ack=...,-2,-1,0,1,2,... with the current semantics will be really confusing. Plus, under the master-slave replication, the old semantics of "ack = i < num.replicas" does not really imply anything regarding to guarantees, and is probably hence seldomly used. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14083075#comment-14083075 ] Jay Kreps commented on KAFKA-1555: -- I think we may be confusing a few things: 1. The guarantee we want to provide 2. The way we express this in the protocol 3. The way the user expresses their intent in config -2 obviously makes no sense as a config. It is a bit of a hack as part of the protocol, but maybe you could argue for it. In terms of the behavior I understand -2 to be the equivalent of -1 and also 2. The best way to do this would be to have a separate min.isr parameter. This could be either a topic-level config (with global default), or it could be a new parameter in the protocol. I don't think we want to try to cram it into an existing field, although I see why that is convenient. One concern I have is that (whether we cram these two concepts together or not in the protocol or not), having two knobs makes it increasingly nuanced for users to express their intention around durability. Having previous had experience with dynamo style R+W settings my experience is it mostly just confuses people. An alternative might be to just change the behavior of acks=2. We have long felt that 0,1, and "all in-sync" are really the settings that make sense and we just allowed 2,3, etc for completeness. However since there is no hard guarantee for 2 it is pretty hard to think of a time as a user where you would actually want 2 acknowledgements (i.e. you are okay potentially losing data but did enough empirical statistics to determine that the probability of loss at 1 was too high). Basically this use case doesn't really exist. If so we should just change 2 to mean fully committed and |ISR| >= 2. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14082684#comment-14082684 ] saurabh agarwal commented on KAFKA-1555: Guozhang, The new semantic (ack <= -2) will provide us an option that enable a capability in which a message is not considered as committed until it has not been acknowledged by more than one replica. Meaning there must be more than one copy in the cluster, otherwise the write will not succeed. The switch you mentioned above do not provide that capability. I agree that it help to avoid unclean leader election. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14081094#comment-14081094 ] Guozhang Wang commented on KAFKA-1555: -- Jiang, Saurabh, If your concern with ack=-1 is that when ISR shrinks to one and that only one leader fails, unclean election will cause data loss, then you can try this per-topic config to avoid unclean election: https://issues.apache.org/jira/browse/KAFKA-1028 Both this switch and the "ack <= -2" approach effectively choose consistency over availability. The difference is just that for the former approach the producer will be blocked earlier (when ISR shrinks to 1) whereas in the latter case the producer will be blocked later (when that only replica in ISR, hence the leader, fails). > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14080970#comment-14080970 ] Gwen Shapira commented on KAFKA-1555: - I think the a property min.isr.required is clearer for users and less likely to lead to confusion and mistakes. Imagine seeing the configuration "request.required.acks=-4" how likely are you to correctly figure out the expected behavior? -1 is confusing enough, but thats already there :) > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14080956#comment-14080956 ] Jun Rao commented on KAFKA-1555: Adding the check for ack<=-2 shouldn't be too hard. We probably have to wait until KAFKA-1430 is done first. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14080210#comment-14080210 ] Joe Stein commented on KAFKA-1555: -- +1 to introduce a new semantic when ack <= -2. The semantic will be that a messages is only acked with no error if at the time the message is committed, ISR >= |ack|. If a message is committed with ISR < |ack|, we will return an UnderReplicatedError > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14079494#comment-14079494 ] saurabh agarwal commented on KAFKA-1555: Excellent. Thanks. It works for us. We are ok with either introducing new sematic in existing ack property (ack<=-2) or introducing new property "min.isr.required". They both meet the requirement. Please suggest the next step. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14079373#comment-14079373 ] Jun Rao commented on KAFKA-1555: Instead of introducing a new property "min.isr.required", I was thinking of just piggybacking on ack. We can introduce a new semantic when ack <= -2. The semantic will be that a messages is only acked with no error if at the time the message is committed, ISR >= |ack|. If a message is committed with ISR < |ack|, we will return an UnderReplicatedError. This way, we don't have to change the wire protocol. Does that match what you expect? > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14076758#comment-14076758 ] saurabh agarwal commented on KAFKA-1555: "this seems more like the ack > 1 case in Kafka. Basically, if you have R replicas and you set dfs.replication.min to a value smaller than R, you can get an ack back sooner. However, there is a slight possibility that the acked data is lost with fewer than R-1 failures" Agree on the part that ack>1 is similar to "dfs.replication.min". It is actually also same as proposed "min.isr.required". What I am suggesting is to pair with the ack =-1 which is similar to "dfs.replication". In this case, when the system is running normal, all write will get acked by all replicas ( enforced by ack=-1). When replicas are going down one by one, the ack>1 will ensure that there is min more of one replica. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14076286#comment-14076286 ] Jun Rao commented on KAFKA-1555: Hmm, I have to understand the semantics and the implementation of dfs.replication.min a bit more. For now, this seems more like the ack > 1 case in Kafka. Basically, if you have R replicas and you set dfs.replication.min to a value smaller than R, you can get an ack back sooner. However, there is a slight possibility that the acked data is lost with fewer than R-1 failures. A couple of other thoughts on "min.isr.required". 1. Let's say we do support this mode. However, immediately after a message is successfully published, some replicas could go down and bring the ISR below "min.isr.required". There is still no guarantee that you have more than one copy of the data at that point. So, "min.isr.required" can only be enforced at the time of publishing. Is that what you want? 2. I image the implementation could be when a message is committed, we check the ISR size and return an error if |ISR| is less than "min.isr.required". However, the message is still committed and will be exposed to the consumer. The error is just to tell the producer that the message may be under replicated. Does that match what you expect? > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14075875#comment-14075875 ] saurabh agarwal commented on KAFKA-1555: There are two configuration parameters- dfs.replication and dfs.replication.min. The behavior you described above relate to dfs.replication configuration. dfs.replication.min enforces that there are minimum number of replicas should be written, then only write will succeed. Otherwise it will fail. Here is abstract from Tom White's Hadoop book: "It’s possible, but unlikely, that multiple datanodes fail while a block is being written. As long as dfs.replication.min replicas (which default to one) are written, the write will succeed, and the block will be asynchronously replicated across the cluster until its target replication factor is reached (dfs.replication, which defaults to three)." As you suggest, we can increase the replication factors, it will reduce the possibility of data loss, but it does not guarantee that there are more than one copy of data. "Ace =-1" ensures that it will receive ack from the replicas in ISR. What I am suggesting that using a new config "min.isr.required", Kafka ensures that the message has been written to a min number of replicas (must be in ISR), then only the producer.send is successful. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14075263#comment-14075263 ] Jun Rao commented on KAFKA-1555: So, you are suggesting to reject a message if current replicas in ISR is less than 2? However, immediately after a message is successfully published, some replicas could go down, which could bring the ISR to below 2. So, I am not sure if this is any better than just running things with a larger replication factor, say 4. My understanding is that with dfs.replication.min in HDFS is that as you are writing data to HDFS, you can actually write data to replicas fewer than that min value. For example, suppose that you are writing 100 bytes to 3 replicas in HDFS with dfs.replication.min=2. If after the 100 bytes are written to first replica, the other 2 replicas die, HDFS will complete the write with just 1 replica. However, in the background, HDFS will try to create new replicas to make sure the total # of replicas reaches the min value. This is sth that you can do with Kafka admin tools too. We can potentially automate this somehow. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14075038#comment-14075038 ] saurabh agarwal commented on KAFKA-1555: Jun, I agree that ack=-1 works fine for the most of the use cases. Here is another suggestion that might address our use case. Can we add an additional option in producer property - "min.isr.required" ( similar to dfs.replication.min) for durability. "ack=-1" ensures that every replicas in ISR will receive the message before producer get the ack. And "min.isr.required=2" ensures that there are minimum two replicas in ISR to publish a message. Otherwise it will throw the exception that "Number of the required replicas is not in ISR". Here is the example where this will be very useful. Take a scenario where the producer was publishing at very high rate. We bring down two follower replicas. Now when we bring back up those replicas, it took a while for those replicas to catch up as there are still messages getting published at the higher rate. So there is no replicas in ISR for a while. During this time, if the disk at the leader replica fail, then we will not have any replica who has those messages. It would be good if we have more than one copy in ISR all the time. And this will address our usecase where we need strong consistency, high durability with reasonable availability. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14074532#comment-14074532 ] Jun Rao commented on KAFKA-1555: In our current design, a message is considered committed if it's written to all replicas in ISR. If ack=-1 is used, the producer will only be acked when the message is committed. It seems that approach 1 also needs coordination among multiple replicas during leader election. So both my previous comments also apply. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14074522#comment-14074522 ] Jiang Wu commented on KAFKA-1555: - Jun, In your example, what are tthe replication and acks values? When you say m1 is committed, dies that mean the producer's send(m1) returns successfully? Could you also comment on approach 1: "1. set replica.lag.max.messages=0. When new leader election happens, first update the ISR list. The replica without the lastest message will be removed out of ISR and has not chance to be new leader." Thanks, Jiang > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14074460#comment-14074460 ] Jun Rao commented on KAFKA-1555: Jiang, If you use controlled shutdown, we will make sure that there is at least another replica in ISR before shutting the broker. Two comments on your proposal. 1. In some of the corner cases, the replica with the most message shouldn't be the leader. Consider the following example. Suppose that we have 2 replicas A and B. At some point the messages in the logs look like the following. offset A B 0 m1 m1 1 m2 2 m3 Suppose that m1 is committed and m2 and m3 are not. At this point, A dies. B takes over as the new leader. B then accepts and commits message m4. The log will now like the following. offset A B 0 m1 m1 1 m2 m4 2 m3 At this point, let's say both A and B go down and come up again. Now, you can't pick replica A as the new leader even though it has more message. Otherwise, you will lose the committed message m4. Note that it's ok to lose message m2 and m3 since they are never committed. The correct way to pick the "longest" log is a bit more involved and would require knowing the leader generation id of each message. For details, see KAFKA-1211. 2. A second thing that you need to worry about is coordination. Suppose that we have the same above scenario when both A and B die. If B comes up first, we actually can elect B as the new leader without waiting for A. However, if you need to compare the length of the log, you need to wait for A to come back since A could be having the longest log. Then the question is that if A never comes back, will you block the writes forever or do you risk losing committed messages. ISR is sort of our way for remembering the new leader candidates. Therefore, without coordination among replicas, we can easily figure out who can be the new leader. > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14074343#comment-14074343 ] Jiang Wu commented on KAFKA-1555: - Jun, We may need to tolerate more than R-1 failures that are spread over a long time period, for example, all brokers are restarted in turn in 20 minutes. Our concern with acks=-1 is that, it can happen that only 1 broker is in ISR, therefore a published message may have only one copy. Lossing that copy results in the message loss permanently. For ack=2, I'm considering two approaches to avoid a lagged replica to be elected as new leader: 1. set replica.lag.max.messages=0. When new leader election happens, first update the ISR list. The replica without the lastest message will be removed out of ISR and has not chance to be new leader. 2. When new leader election happens, choose the replica with more messages as the new leader. Could you comment on the approaches? Regards, Jiang > provide strong consistency with reasonable availability > --- > > Key: KAFKA-1555 > URL: https://issues.apache.org/jira/browse/KAFKA-1555 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 0.8.1.1 >Reporter: Jiang Wu >Assignee: Neha Narkhede > > In a mission critical application, we expect a kafka cluster with 3 brokers > can satisfy two requirements: > 1. When 1 broker is down, no message loss or service blocking happens. > 2. In worse cases such as two brokers are down, service can be blocked, but > no message loss happens. > We found that current kafka versoin (0.8.1.1) cannot achieve the requirements > due to its three behaviors: > 1. when choosing a new leader from 2 followers in ISR, the one with less > messages may be chosen as the leader. > 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it > has less messages than the leader. > 3. ISR can contains only 1 broker, therefore acknowledged messages may be > stored in only 1 broker. > The following is an analytical proof. > We consider a cluster with 3 brokers and a topic with 3 replicas, and assume > that at the beginning, all 3 replicas, leader A, followers B and C, are in > sync, i.e., they have the same messages and are all in ISR. > According to the value of request.required.acks (acks for short), there are > the following cases. > 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. > 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this > time, although C hasn't received m, C is still in ISR. If A is killed, C can > be elected as the new leader, and consumers will miss m. > 3. acks=-1. B and C restart and are removed from ISR. Producer sends a > message m to A, and receives an acknowledgement. Disk failure happens in A > before B and C replicate m. Message m is lost. > In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)