[jira] [Comment Edited] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-22 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14143521#comment-14143521
 ] 

Joe Stein edited comment on KAFKA-1555 at 9/22/14 5:51 PM:
---

ah, my bad. 

<< Any suggestions regarding the problem with retries? 

I think that is an issue beyond this ticket that happens in other cases (e.g. 
MaxMessageSize and retry that 3 times won't change a thing like this same 
problem) that we don't have a solution for yet that "classifies" exceptions... 
So I think we should do some fix for it but that is not related to this ticket 
IMHO... unlike MaxMessageSize though it is possible after the first failure 
another replica comes online and succeeds so that functionality might be 
desirable ( I could see how it would be).


was (Author: joestein):
ah, my bad. 

<< Any suggestions regarding the problem with retries? 

I think that is an issue beyond this ticket that happens in other cases (e.g. 
MaxMessageSize and retry that 3 times won't change a thing like this same 
problem) that we don't have a solution for yet that "classifies" exceptions... 
So I think we should do some fix for it but that is not related to this ticket 
IMHO.

> provide strong consistency with reasonable availability
> ---
>
> Key: KAFKA-1555
> URL: https://issues.apache.org/jira/browse/KAFKA-1555
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Jiang Wu
>Assignee: Gwen Shapira
> Fix For: 0.8.2
>
> Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch
>
>
> In a mission critical application, we expect a kafka cluster with 3 brokers 
> can satisfy two requirements:
> 1. When 1 broker is down, no message loss or service blocking happens.
> 2. In worse cases such as two brokers are down, service can be blocked, but 
> no message loss happens.
> We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
> due to its three behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less 
> messages may be chosen as the leader.
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
> has less messages than the leader.
> 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
> stored in only 1 broker.
> The following is an analytical proof. 
> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
> that at the beginning, all 3 replicas, leader A, followers B and C, are in 
> sync, i.e., they have the same messages and are all in ISR.
> According to the value of request.required.acks (acks for short), there are 
> the following cases.
> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
> time, although C hasn't received m, C is still in ISR. If A is killed, C can 
> be elected as the new leader, and consumers will miss m.
> 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
> message m to A, and receives an acknowledgement. Disk failure happens in A 
> before B and C replicate m. Message m is lost.
> In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-22 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14143701#comment-14143701
 ] 

Joe Stein edited comment on KAFKA-1555 at 9/22/14 7:53 PM:
---

<<  I am more interested in being able to specify why 2 and not N

The value of the min.isr is a balance between your tolerance for data loss for 
stored data and need of availability of brokers to write to. Your tolerance of 
availability (X nodes being unavailable but still able to ACK writes) is the 
difference between your replication factor (Y) and the number of servers you 
must save data to (Z) when using ACK=-1 and min.isr = Z.  X=Y-Z. If your 
tolerance for node failures is 3 and you require to always have 2 servers 
running to write to (otherwise failure to write occurs to clients) then you 
have to set your replication factor to 5 and use ACK=-1 with a min.isr = 3.  If 
you require more servers to write to for successful writes but the same amount 
of node failures then you must also up your replication factor.

... something like that...


was (Author: joestein):
<<  I am more interested in being able to specify why 2 and not N

The value of the min.isr is a balance between your tolerance for data loss for 
stored data and need of availability of brokers to write to. Your tolerance of 
availability (X nodes being unavailable but still able to ACK writes) is the 
difference between your replication factor (Y) and the number of servers you 
must save data to (Z) when using ACK=-1 and min.isr = Z.  X=Y-Z. If your 
tolerance for node failures is 3 and you require to always have 2 servers 
running to write to (otherwise failure to write occurs to clients) then you 
have to set your replication factor to 5 and use ACK=-1 with a min.isr = 3.  If 
you require more servers to write to for sucecsfull writes but the same amount 
of node failures then you must also up your replication factor.

... something like that...

> provide strong consistency with reasonable availability
> ---
>
> Key: KAFKA-1555
> URL: https://issues.apache.org/jira/browse/KAFKA-1555
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Jiang Wu
>Assignee: Gwen Shapira
> Fix For: 0.8.2
>
> Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch
>
>
> In a mission critical application, we expect a kafka cluster with 3 brokers 
> can satisfy two requirements:
> 1. When 1 broker is down, no message loss or service blocking happens.
> 2. In worse cases such as two brokers are down, service can be blocked, but 
> no message loss happens.
> We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
> due to its three behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less 
> messages may be chosen as the leader.
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
> has less messages than the leader.
> 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
> stored in only 1 broker.
> The following is an analytical proof. 
> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
> that at the beginning, all 3 replicas, leader A, followers B and C, are in 
> sync, i.e., they have the same messages and are all in ISR.
> According to the value of request.required.acks (acks for short), there are 
> the following cases.
> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
> time, although C hasn't received m, C is still in ISR. If A is killed, C can 
> be elected as the new leader, and consumers will miss m.
> 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
> message m to A, and receives an acknowledgement. Disk failure happens in A 
> before B and C replicate m. Message m is lost.
> In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-22 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14143701#comment-14143701
 ] 

Joe Stein edited comment on KAFKA-1555 at 9/22/14 8:03 PM:
---

<<  I am more interested in being able to specify why 2 and not N

The value of the min.isr is a balance between your tolerance for the 
probability of data loss for stored data and need of availability of brokers to 
write to. Your tolerance of availability (X nodes being unavailable but still 
able to ACK writes) is the difference between your replication factor (Y) and 
the number of servers you must save data to (Z) when using ACK=-1 and min.isr = 
Z.  X=Y-Z. If your tolerance for node failures is 3 and you require to always 
have 2 servers running to write to (otherwise failure to write occurs to 
clients) then you have to set your replication factor to 5 and use ACK=-1 with 
a min.isr = 3.  If you require more servers to write to for successful writes 
but the same amount of node failures then you must also up your replication 
factor.

... something like that...


was (Author: joestein):
<<  I am more interested in being able to specify why 2 and not N

The value of the min.isr is a balance between your tolerance for data loss for 
stored data and need of availability of brokers to write to. Your tolerance of 
availability (X nodes being unavailable but still able to ACK writes) is the 
difference between your replication factor (Y) and the number of servers you 
must save data to (Z) when using ACK=-1 and min.isr = Z.  X=Y-Z. If your 
tolerance for node failures is 3 and you require to always have 2 servers 
running to write to (otherwise failure to write occurs to clients) then you 
have to set your replication factor to 5 and use ACK=-1 with a min.isr = 3.  If 
you require more servers to write to for successful writes but the same amount 
of node failures then you must also up your replication factor.

... something like that...

> provide strong consistency with reasonable availability
> ---
>
> Key: KAFKA-1555
> URL: https://issues.apache.org/jira/browse/KAFKA-1555
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Jiang Wu
>Assignee: Gwen Shapira
> Fix For: 0.8.2
>
> Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch
>
>
> In a mission critical application, we expect a kafka cluster with 3 brokers 
> can satisfy two requirements:
> 1. When 1 broker is down, no message loss or service blocking happens.
> 2. In worse cases such as two brokers are down, service can be blocked, but 
> no message loss happens.
> We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
> due to its three behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less 
> messages may be chosen as the leader.
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
> has less messages than the leader.
> 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
> stored in only 1 broker.
> The following is an analytical proof. 
> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
> that at the beginning, all 3 replicas, leader A, followers B and C, are in 
> sync, i.e., they have the same messages and are all in ISR.
> According to the value of request.required.acks (acks for short), there are 
> the following cases.
> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
> time, although C hasn't received m, C is still in ISR. If A is killed, C can 
> be elected as the new leader, and consumers will miss m.
> 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
> message m to A, and receives an acknowledgement. Disk failure happens in A 
> before B and C replicate m. Message m is lost.
> In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-23 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14145096#comment-14145096
 ] 

Joe Stein edited comment on KAFKA-1555 at 9/23/14 5:39 PM:
---

[~gwenshap] yup, in that case the issue was between the chair and the keyboard 
(me)

{code}

root@precise64:/opt/apache/kafka# bin/kafka-console-producer.sh --broker-list 
localhost:9092 --topic testNew --sync --request-required-acks -1
A
[2014-09-23 17:36:37,127] WARN Produce request with correlation id 2 failed due 
to [testNew,1]: kafka.common.NotEnoughReplicasException 
(kafka.producer.async.DefaultEventHandler)
[2014-09-23 17:36:37,248] WARN Produce request with correlation id 5 failed due 
to [testNew,2]: kafka.common.NotEnoughReplicasException 
(kafka.producer.async.DefaultEventHandler)
[2014-09-23 17:36:37,364] WARN Produce request with correlation id 8 failed due 
to [testNew,2]: kafka.common.NotEnoughReplicasException 
(kafka.producer.async.DefaultEventHandler)
[2014-09-23 17:36:37,480] WARN Produce request with correlation id 11 failed 
due to [testNew,1]: kafka.common.NotEnoughReplicasException 
(kafka.producer.async.DefaultEventHandler)
[2014-09-23 17:36:37,591] ERROR Failed to send requests for topics testNew with 
correlation ids in [0,12] (kafka.producer.async.DefaultEventHandler)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 
tries.
at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.producer.OldProducer.send(BaseProducer.scala:62)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:95)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)

{code}

It is working, awesome!


was (Author: joestein):
[~gwenshap] yup, in that case the issue was between the chair and the keyboard 
(me)

{code}

root@precise64:/opt/apache/kafka# bin/kafka-console-producer.sh --broker-list 
localhost:9092 --topic testNew --sync --request-required-acks -1
A
[2014-09-23 17:36:37,127] WARN Produce request with correlation id 2 failed due 
to [testNew,1]: kafka.common.NotEnoughReplicasException 
(kafka.producer.async.DefaultEventHandler)
[2014-09-23 17:36:37,248] WARN Produce request with correlation id 5 failed due 
to [testNew,2]: kafka.common.NotEnoughReplicasException 
(kafka.producer.async.DefaultEventHandler)
[2014-09-23 17:36:37,364] WARN Produce request with correlation id 8 failed due 
to [testNew,2]: kafka.common.NotEnoughReplicasException 
(kafka.producer.async.DefaultEventHandler)
[2014-09-23 17:36:37,480] WARN Produce request with correlation id 11 failed 
due to [testNew,1]: kafka.common.NotEnoughReplicasException 
(kafka.producer.async.DefaultEventHandler)
[2014-09-23 17:36:37,591] ERROR Failed to send requests for topics testNew with 
correlation ids in [0,12] (kafka.producer.async.DefaultEventHandler)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 
tries.
at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.producer.OldProducer.send(BaseProducer.scala:62)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:95)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)

{code}

awesome!

> provide strong consistency with reasonable availability
> ---
>
> Key: KAFKA-1555
> URL: https://issues.apache.org/jira/browse/KAFKA-1555
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Jiang Wu
>Assignee: Gwen Shapira
> Fix For: 0.8.2
>
> Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
> KAFKA-1555.2.patch, KAFKA-1555.3.patch
>
>
> In a mission critical application, we expect a kafka cluster with 3 brokers 
> can satisfy two requirements:
> 1. When 1 broker is down, no message loss or service blocking happens.
> 2. In worse cases such as two brokers are down, service can be blocked, but 
> no message loss happens.
> We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
> due to its three behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less 
> messages may be chosen as the leader.
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
> has less messages than the leader.
> 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
> stored in only 1 broker.
> The following is an analytical proof. 
> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
> that at the beginning, all 3 replicas, leader A, followers B and C, are in 
> sync, i.e., they have the

[jira] [Comment Edited] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-24 Thread Sriram Subramanian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14146751#comment-14146751
 ] 

Sriram Subramanian edited comment on KAFKA-1555 at 9/24/14 9:32 PM:


Jun,

I am not arguing that we should not have the feature. I am arguing about what 
is the best way to expose that feature. I think ack being a number along with 
min isr being another number is very confusing. The ack really does not 
indicate if the system is opting for availability or consistency today. The 
min_isr also works only for ack=-1. 

What is the difference between ack = 2 and ack -1 with min_isr=2? These 
differences are so subtle that it gets hard to explain what the system does.
 
We should either change how ack is implemented today or move these options to 
the API so that the caller knows what they are opting for. If this is an 
interim solution, I would like to see a JIRA filed to revisit this. It is 
usually hard to change things later if the users get used to how a system 
behaves.


was (Author: sriramsub):
Jun,

I am not arguing that we should not have the feature. I am arguing about what 
is the best way to expose that feature. I think ack being a number along with 
min isr being another number is very confusing. The ack really does not 
indicate if the system is opting for availability or consistency today. The 
min_isr also works only for ack=-1. Cases where ack = 2 and min_isr = 2 are 
very confusing to reason about. In this case, we would still end up writing 
only to the ISR and return success. If ISR = 1, it just make system not behave 
in any predictable way. We should either change how ack is implemented today or 
move these options to the API so that the caller knows what they are opting 
for. If this is an interim solution, I would like to see a JIRA filed to 
revisit this. It is usually hard to change things later if the users get used 
to how a system behaves.

> provide strong consistency with reasonable availability
> ---
>
> Key: KAFKA-1555
> URL: https://issues.apache.org/jira/browse/KAFKA-1555
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Jiang Wu
>Assignee: Gwen Shapira
> Fix For: 0.8.2
>
> Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
> KAFKA-1555.2.patch, KAFKA-1555.3.patch
>
>
> In a mission critical application, we expect a kafka cluster with 3 brokers 
> can satisfy two requirements:
> 1. When 1 broker is down, no message loss or service blocking happens.
> 2. In worse cases such as two brokers are down, service can be blocked, but 
> no message loss happens.
> We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
> due to its three behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less 
> messages may be chosen as the leader.
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
> has less messages than the leader.
> 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
> stored in only 1 broker.
> The following is an analytical proof. 
> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
> that at the beginning, all 3 replicas, leader A, followers B and C, are in 
> sync, i.e., they have the same messages and are all in ISR.
> According to the value of request.required.acks (acks for short), there are 
> the following cases.
> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
> time, although C hasn't received m, C is still in ISR. If A is killed, C can 
> be elected as the new leader, and consumers will miss m.
> 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
> message m to A, and receives an acknowledgement. Disk failure happens in A 
> before B and C replicate m. Message m is lost.
> In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1555) provide strong consistency with reasonable availability

2014-10-24 Thread Kyle Banker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14183382#comment-14183382
 ] 

Kyle Banker edited comment on KAFKA-1555 at 10/24/14 8:03 PM:
--

The new durability docs looks great so far. Here are a couple more proposed 
edits to the patch:

EDIT #1:
min.insync.replicas: When a producer sets request.required.acks to -1, 
min.insync.replicas specifies the minimum number of replicas that must 
acknowledge a write for the write to be considered successful. If this minimum 
cannot be met, then the producer will raise an exception (either 
NotEnoughReplicas or NotEnoughReplicasAfterAppend).

When used together, min.insync.replicas and request.required.acks allow you to 
enforce greater durability guarantees. A typical scenario would be to create a 
topic with a replication factor of 3, set min.insync.replicas to 2, and produce 
with request.required.acks of -1. This will ensure that the producer raises an 
exception if a majority of replicas do not receive a write.

EDIT #2 (for request.required.acks):
  -1. The producer gets an acknowledgement after all in-sync replicas have 
received the data. This option provides the greatest level of durability. 
However, it does not completely eliminate the risk of message loss because the 
number of in sync replicas may, in rare cases, shrink to 1. If you want to 
ensure that some minimum number of replicas (typically a majority) receive a 
write, then you must set the topic-level min.insync.replicas setting. Please 
read the Replication section of the design documentation for a more in-depth 
discussion.


was (Author: kbanker):
The new durability docs looks great so far. Here are a couple more proposed 
edits to the patch:

EDIT #1:
min.insync.replicas: When a producer sets request.required.acks to -1, 
min.insync.replicas specifies the minimum number of replicas that must 
acknowledge a write for the write to be considered successful. If this minimum 
cannot be met, then the producer will raise an exception (either 
NotEnoughReplicas or NotEnoughReplicasAfterAppend).

When used together, min.insync.replicas and request.required.acks allow you to 
enforce greater durability guarantees. A typical scenario would be to create a 
topic with a replication factor of 3, set min.insync.replicas to 2, and produce 
with request.required.acks of -1. This will ensure that the producer raises an 
exception if a majority of replicas do not receive a write.

EDIT #2 (for request.required.acks):
  -1. The producer gets an acknowledgement after all in-sync replicas have 
received the data. This option provides the greatest level of durability. 
However, it does not completely eliminate the risk of message loss because the 
number of in sync replicas may, in rare cases, shrink to 1. If you want to 
ensure that some minimum number of replicas (typically a majority) receive a 
write, the you must set the topic-level min.insync.replicas setting. Please 
read the Replication section of the design documentation for a more in-depth 
discussion.

> provide strong consistency with reasonable availability
> ---
>
> Key: KAFKA-1555
> URL: https://issues.apache.org/jira/browse/KAFKA-1555
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Jiang Wu
>Assignee: Gwen Shapira
> Fix For: 0.8.2
>
> Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555-DOCS.1.patch, 
> KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, 
> KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch, 
> KAFKA-1555.5.patch, KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch
>
>
> In a mission critical application, we expect a kafka cluster with 3 brokers 
> can satisfy two requirements:
> 1. When 1 broker is down, no message loss or service blocking happens.
> 2. In worse cases such as two brokers are down, service can be blocked, but 
> no message loss happens.
> We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
> due to its three behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less 
> messages may be chosen as the leader.
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
> has less messages than the leader.
> 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
> stored in only 1 broker.
> The following is an analytical proof. 
> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
> that at the beginning, all 3 replicas, leader A, followers B and C, are in 
> sync, i.e., they have the same messages and are all in ISR.
> According to the value of request.required.acks (acks for short), there are 
> the following

[jira] [Comment Edited] (KAFKA-1555) provide strong consistency with reasonable availability

2014-10-24 Thread Kyle Banker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14183382#comment-14183382
 ] 

Kyle Banker edited comment on KAFKA-1555 at 10/24/14 8:03 PM:
--

The new durability docs looks great so far. Here are a couple more proposed 
edits to the patch:

EDIT #1:
min.insync.replicas: When a producer sets request.required.acks to -1, 
min.insync.replicas specifies the minimum number of replicas that must 
acknowledge a write for the write to be considered successful. If this minimum 
cannot be met, then the producer will raise an exception (either 
NotEnoughReplicas or NotEnoughReplicasAfterAppend).

When used together, min.insync.replicas and request.required.acks allow you to 
enforce greater durability guarantees. A typical scenario would be to create a 
topic with a replication factor of 3, set min.insync.replicas to 2, and produce 
with request.required.acks of -1. This will ensure that the producer raises an 
exception if a majority of replicas do not receive a write.

EDIT #2 (for request.required.acks):
  -1. The producer gets an acknowledgement after all in-sync replicas have 
received the data. This option provides the greatest level of durability. 
However, it does not completely eliminate the risk of message loss because the 
number of in sync replicas may, in rare cases, shrink to 1. If you want to 
ensure that some minimum number of replicas (typically a majority) receive a 
write, the you must set the topic-level min.insync.replicas setting. Please 
read the Replication section of the design documentation for a more in-depth 
discussion.


was (Author: kbanker):
The new durability docs looks great so far. Here are a couple more proposed 
edits to the patch:

EDIT #1:
min.insync.replicas: When a producer sets request.required.acks to -1, 
min.insync.replicas specifies the minimum number of replicas that must 
acknowledge a write for the write to be considered successful. If this minimum 
cannot be met, then the producer will raise an exception (either 
NotEnoughReplicas or NotEnoughReplicasAfterAppend).

When used together, min.insync.replicas and request.required.acks allow you to 
enforce greater durability guarantees. A typical scenario would be to create a 
topic with a replication factor of 3, set min.insync.replicas to 2, and produce 
with request.required.acks of -1. This will ensure that the producer raises an 
exception if a majority of replicas do not receive a write.

EDIT #2 (for request.required.acks):
  -1. The producer gets an acknowledgement after all in-sync replicas have 
received the data. This option provides the greatest level of durability. 
However, it does not completely eliminate the risk of message loss because the 
number of in sync replicas may, in rare cases shrink, to 1. If you want to 
ensure that some minimum number of replicas (typically a majority) receive a 
write, the you must set the topic-level min.insync.replicas setting. Please 
read the Replication section of the design documentation for a more in-depth 
discussion.

> provide strong consistency with reasonable availability
> ---
>
> Key: KAFKA-1555
> URL: https://issues.apache.org/jira/browse/KAFKA-1555
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Jiang Wu
>Assignee: Gwen Shapira
> Fix For: 0.8.2
>
> Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555-DOCS.1.patch, 
> KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, 
> KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch, 
> KAFKA-1555.5.patch, KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch
>
>
> In a mission critical application, we expect a kafka cluster with 3 brokers 
> can satisfy two requirements:
> 1. When 1 broker is down, no message loss or service blocking happens.
> 2. In worse cases such as two brokers are down, service can be blocked, but 
> no message loss happens.
> We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
> due to its three behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less 
> messages may be chosen as the leader.
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
> has less messages than the leader.
> 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
> stored in only 1 broker.
> The following is an analytical proof. 
> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
> that at the beginning, all 3 replicas, leader A, followers B and C, are in 
> sync, i.e., they have the same messages and are all in ISR.
> According to the value of request.required.acks (acks for short), there are 
> the following 

[jira] [Comment Edited] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-24 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14074122#comment-14074122
 ] 

Joe Stein edited comment on KAFKA-1555 at 7/25/14 6:11 AM:
---

The simplification and guarantee provided with ack=-1 works properly if the 
client implements it in a way that works best for them.

Kafka guarantees durability on R-1 failures (with replication factor R) without 
message loss if you use ack=-1 ... so...  that is the foundation we build upon.

When we (the client application f.k.a. the producer) do not want "message loss" 
then setup at least 3 data centers (since you need a majority of data centers 
for a Zookeeper ensemble to work which requires at least three). Then you write 
to two topics each with ack=-1. The broker leaders for the partitions of each 
of these topics that you are writing to """MUST""" be in different data 
centers.  After sync writing to them them "join" their result (I say "join" 
since your specific implementation may use another function name like subscribe 
or onComplete or something) then you get... a durable write from the client 
perspective without loosing data after it has had a successful "transaction".

If that doesn't work for you then you must accept "data loss" as a failure of 
at least one data center (which has nothing todo with Kafka).  

This can be (should be) further extended to make sure that at least two racks 
in each data center get the data. 

If you are not concerned with being able to sustain a loss of 1 data center + a 
loss of 1 rack in another available data center (at the same time) then this is 
not a solution for you.

Now, all of what I just said is manual (to make sure replicas and partitions 
are in different data centers and racks) and static but it works with Kafka 
tools out of the box and some (lots) of software engineering power (scripting 
effort with a couple of late nights burning the midnight oil).

As far as producing this to multiple topics there are lots of ways to-do this 
on the client side running in parallel without much (if any) latency cost (with 
a little bit more software engineering).  You can use Akka or anything else 
where you can get a future after sending off multiple events and then 
subscribing to them onComplete before deciding (returning to your caller or 
fulfilling the promise) that the "message" has been "written".  

Hopefully this make sense and I appreciate that not all (even most) use cases 
need this multi data center + multi rack type of sustainability but it works 
with Kafka if you go by what Kafka guarantees without trying to change it 
unnecessarily.

If there are defects we should fix them but going up and down this thread I am 
getting a bit lost in what we should be doing (if anything) to the current code 
now.




was (Author: joestein):
The simplification and guarantee provided with ack=-1 works properly if the 
client implements it in a way that works best for them.

Kafka guarantees durability on R-1 failures (with replication factor R) without 
message loss if you use ack=-1 ... so...  that is the foundation we build upon.

When we (the client application f.k.a. the producer) do not want "message loss" 
then setup at least 3 data centers (since you need a majority of data centers 
for a Zookeeper ensemble to work which requires at least three. Then you write 
to two topics each with ack=-1. The broker leaders for the partitions of each 
of these topics that you are writing to """MUST""" be in different data 
centers.  After sync writing to them them "join" their result (I say "join" 
since your specific implementation may use another function name like subscribe 
or onComplete or something) then you get... a durable write from the client 
perspective without loosing data after it has had a successful "transaction".

If that doesn't work for you then you must accept "data loss" as a failure of 
at least one data center (which has nothing todo with Kafka).  

This can be (should be) further extended to make sure that at least two racks 
in each data center get the data. 

If you are not concerned with being able to sustain a loss of 1 data center + a 
loss of 1 rack in another available data center (at the same time) then this is 
not a solution for you.

Now, all of what I just said is manual (to make sure replicas and partitions 
are in different data centers and racks) and static but it works with Kafka 
tools out of the box and some (lots) of software engineering power (scripting 
effort with a couple of late nights burning the midnight oil).

As far as producing this to multiple topics there are lots of ways to-do this 
on the client side running in parallel without much (if any) latency cost (with 
a little bit more software engineering).  You can use Akka or anything else 
where you can get a future after sending off multiple events and then 
subscribin