[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-24 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14074122#comment-14074122
 ] 

Joe Stein commented on KAFKA-1555:
--

The simplification and guarantee provided with ack=-1 works properly if the 
client implements it in a way that works best for them.

Kafka guarantees durability on R-1 failures (with replication factor R) without 
message loss if you use ack=-1 ... so...  that is the foundation we build upon.

When we (the client application f.k.a. the producer) do not want "message loss" 
then setup at least 3 data centers (since you need a majority of data centers 
for a Zookeeper ensemble to work which requires at least three. Then you write 
to two topics each with ack=-1. The broker leaders for the partitions of each 
of these topics that you are writing to """MUST""" be in different data 
centers.  After sync writing to them them "join" their result (I say "join" 
since your specific implementation may use another function name like subscribe 
or onComplete or something) then you get... a durable write from the client 
perspective without loosing data after it has had a successful "transaction".

If that doesn't work for you then you must accept "data loss" as a failure of 
at least one data center (which has nothing todo with Kafka).  

This can be (should be) further extended to make sure that at least two racks 
in each data center get the data. 

If you are not concerned with being able to sustain a loss of 1 data center + a 
loss of 1 rack in another available data center (at the same time) then this is 
not a solution for you.

Now, all of what I just said is manual (to make sure replicas and partitions 
are in different data centers and racks) and static but it works with Kafka 
tools out of the box and some (lots) of software engineering power (scripting 
effort with a couple of late nights burning the midnight oil).

As far as producing this to multiple topics there are lots of ways to-do this 
on the client side running in parallel without much (if any) latency cost (with 
a little bit more software engineering).  You can use Akka or anything else 
where you can get a future after sending off multiple events and then 
subscribing to them onComplete before deciding (returning to your caller or 
fulfilling the promise) that the "message" has been "written".  

Hopefully this make sense and I appreciate that not all (even most) use cases 
need this multi data center + multi rack type of sustainability but it works 
with Kafka if you go by what Kafka guarantees without trying to change it 
unnecessarily.

If there are defects we should fix them but going up and down this thread I am 
getting a bit lost in what we should be doing (if anything) to the current code 
now.



> provide strong consistency with reasonable availability
> ---
>
> Key: KAFKA-1555
> URL: https://issues.apache.org/jira/browse/KAFKA-1555
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Jiang Wu
>Assignee: Neha Narkhede
>
> In a mission critical application, we expect a kafka cluster with 3 brokers 
> can satisfy two requirements:
> 1. When 1 broker is down, no message loss or service blocking happens.
> 2. In worse cases such as two brokers are down, service can be blocked, but 
> no message loss happens.
> We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
> due to its three behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less 
> messages may be chosen as the leader.
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
> has less messages than the leader.
> 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
> stored in only 1 broker.
> The following is an analytical proof. 
> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
> that at the beginning, all 3 replicas, leader A, followers B and C, are in 
> sync, i.e., they have the same messages and are all in ISR.
> According to the value of request.required.acks (acks for short), there are 
> the following cases.
> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
> time, although C hasn't received m, C is still in ISR. If A is killed, C can 
> be elected as the new leader, and consumers will miss m.
> 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
> message m to A, and receives an acknowledgement. Disk failure happens in A 
> before B and C replicate m. Message m is lost.
> In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA

[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-24 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14074070#comment-14074070
 ] 

Jun Rao commented on KAFKA-1555:


Sudarshan,

Our replication guarantees are the following.

1. If you use ack=-1 with a replication factor R, you can tolerate R-1 failures 
without message loss.

2. If you use ack>=1 and  provide strong consistency with reasonable availability
> ---
>
> Key: KAFKA-1555
> URL: https://issues.apache.org/jira/browse/KAFKA-1555
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Jiang Wu
>Assignee: Neha Narkhede
>
> In a mission critical application, we expect a kafka cluster with 3 brokers 
> can satisfy two requirements:
> 1. When 1 broker is down, no message loss or service blocking happens.
> 2. In worse cases such as two brokers are down, service can be blocked, but 
> no message loss happens.
> We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
> due to its three behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less 
> messages may be chosen as the leader.
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
> has less messages than the leader.
> 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
> stored in only 1 broker.
> The following is an analytical proof. 
> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
> that at the beginning, all 3 replicas, leader A, followers B and C, are in 
> sync, i.e., they have the same messages and are all in ISR.
> According to the value of request.required.acks (acks for short), there are 
> the following cases.
> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
> time, although C hasn't received m, C is still in ISR. If A is killed, C can 
> be elected as the new leader, and consumers will miss m.
> 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
> message m to A, and receives an acknowledgement. Disk failure happens in A 
> before B and C replicate m. Message m is lost.
> In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-24 Thread Sudarshan Kadambi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073326#comment-14073326
 ] 

Sudarshan Kadambi commented on KAFKA-1555:
--

Neha and Jun:
I want to make sure we have agreement on the issue (both, that there is an 
issue and what the issue is :) ). We plan to use a replication factor of 3 and 
acks=2. We'd like this to mean that we are tolerant to the loss of 1 machine, 
without loss of published messages or the producer being blocked. 

Let's focus on the following scenario: Let's say, L, F1 and F2 are the leader 
and 2 followers in the ISR. With acks=2, let's say L and F1 have committed all 
published messages and F2 is up to replica.max.lag.messages behind. When L goes 
down, F2 is made the new leader and not F1, even though F1 is up to date with 
the leader. We need to be able to take into account how caught up a given 
broker is in the ISR, when electing a new leader. This is also unclean leader 
election, but of a different type than what we've been discussing. 

> provide strong consistency with reasonable availability
> ---
>
> Key: KAFKA-1555
> URL: https://issues.apache.org/jira/browse/KAFKA-1555
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Jiang Wu
>Assignee: Neha Narkhede
>
> In a mission critical application, we expect a kafka cluster with 3 brokers 
> can satisfy two requirements:
> 1. When 1 broker is down, no message loss or service blocking happens.
> 2. In worse cases such as two brokers are down, service can be blocked, but 
> no message loss happens.
> We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
> due to its three behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less 
> messages may be chosen as the leader.
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
> has less messages than the leader.
> 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
> stored in only 1 broker.
> The following is an analytical proof. 
> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
> that at the beginning, all 3 replicas, leader A, followers B and C, are in 
> sync, i.e., they have the same messages and are all in ISR.
> According to the value of request.required.acks (acks for short), there are 
> the following cases.
> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
> time, although C hasn't received m, C is still in ISR. If A is killed, C can 
> be elected as the new leader, and consumers will miss m.
> 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
> message m to A, and receives an acknowledgement. Disk failure happens in A 
> before B and C replicate m. Message m is lost.
> In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-23 Thread Robert Withers (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14072790#comment-14072790
 ] 

Robert Withers commented on KAFKA-1555:
---

Jun, I see what you mean and perhaps there is no quantitative difference, I 
really do not know without prototyping it.  My instincts tell me that the 
qualitative difference may be meaningful.  

Let's consider the situation with 5 replicas and acks set to 3 versus 2 
replicas and acks set to -1 as well as 3 shadows.  Perhaps I am on the algebra 
kick a bit much but the second arrangement means that we qualitatively have 2 
well defined algebraic groups, the ISR for sure is consistent while the shadows 
may be.  Implementing the algebra as first class may have advantages.  Take a 
write chained ocap for the leader with 2 types of monitor ocaps: acking and 
forwarding.  Now the producer group constraint can be modeled: conservative or 
opportunistic and the group constraint type can emit the right kind of ocap.  
The forwarding ocap could be used to recast that mirror capability.  Now we can 
model the system algebraically, in first class objects: Metaprogramming of a 
sort.

I think that we are talking about a 4 dimensional meta matrix, now:  the front 
consumer plane with translations and rotations for consumption and rebalance, 
the 3rd dimension for ISR leader election and the 4th dimension for optimistic 
shadowing and mirroring.

I have no evidence that any of this is beneficial or not.   That's y'all's 
decision to make.

Thank you for hearing me out.

> provide strong consistency with reasonable availability
> ---
>
> Key: KAFKA-1555
> URL: https://issues.apache.org/jira/browse/KAFKA-1555
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Jiang Wu
>Assignee: Neha Narkhede
>
> In a mission critical application, we expect a kafka cluster with 3 brokers 
> can satisfy two requirements:
> 1. When 1 broker is down, no message loss or service blocking happens.
> 2. In worse cases such as two brokers are down, service can be blocked, but 
> no message loss happens.
> We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
> due to its three behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less 
> messages may be chosen as the leader.
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
> has less messages than the leader.
> 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
> stored in only 1 broker.
> The following is an analytical proof. 
> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
> that at the beginning, all 3 replicas, leader A, followers B and C, are in 
> sync, i.e., they have the same messages and are all in ISR.
> According to the value of request.required.acks (acks for short), there are 
> the following cases.
> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
> time, although C hasn't received m, C is still in ISR. If A is killed, C can 
> be elected as the new leader, and consumers will miss m.
> 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
> message m to A, and receives an acknowledgement. Disk failure happens in A 
> before B and C replicate m. Message m is lost.
> In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14072756#comment-14072756
 ] 

Jun Rao commented on KAFKA-1555:


Rob,

In order for those shadow replicas to be ready to be flipped into ISR, they 
have to be very in-sync with the leader. If that's the case, even if you count 
them in acking the producer, it won't delay the produce request much. So, I 
still don't see a clear benefit of this approach vs just having a larger 
replication factor. I am also not sure how easy the implementation will be.


> provide strong consistency with reasonable availability
> ---
>
> Key: KAFKA-1555
> URL: https://issues.apache.org/jira/browse/KAFKA-1555
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Jiang Wu
>Assignee: Neha Narkhede
>
> In a mission critical application, we expect a kafka cluster with 3 brokers 
> can satisfy two requirements:
> 1. When 1 broker is down, no message loss or service blocking happens.
> 2. In worse cases such as two brokers are down, service can be blocked, but 
> no message loss happens.
> We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
> due to its three behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less 
> messages may be chosen as the leader.
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
> has less messages than the leader.
> 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
> stored in only 1 broker.
> The following is an analytical proof. 
> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
> that at the beginning, all 3 replicas, leader A, followers B and C, are in 
> sync, i.e., they have the same messages and are all in ISR.
> According to the value of request.required.acks (acks for short), there are 
> the following cases.
> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
> time, although C hasn't received m, C is still in ISR. If A is killed, C can 
> be elected as the new leader, and consumers will miss m.
> 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
> message m to A, and receives an acknowledgement. Disk failure happens in A 
> before B and C replicate m. Message m is lost.
> In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-23 Thread Robert Withers (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14072700#comment-14072700
 ] 

Robert Withers commented on KAFKA-1555:
---

Hi Jun,

Yes, that's what I am thinking. It allows maintaining a pool of offline, but 
current and consistent replica shadows, ready to be flipped into ISR.  Due to 
their being out of ISR prevents them being counted in quorum, yet ready to go, 
so no impact to the producers.

Looking at it through algebra sunglasses means we would establish a secondary 
space of replication but with a different dimensional projection into the 
parent meta space, which is the current ISR replication space, itself projected 
into consumers' meta space as the leader partition.  I am thinking it adds 
another layer of depth, to shore the defenses.

- Rob

> provide strong consistency with reasonable availability
> ---
>
> Key: KAFKA-1555
> URL: https://issues.apache.org/jira/browse/KAFKA-1555
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Jiang Wu
>Assignee: Neha Narkhede
>
> In a mission critical application, we expect a kafka cluster with 3 brokers 
> can satisfy two requirements:
> 1. When 1 broker is down, no message loss or service blocking happens.
> 2. In worse cases such as two brokers are down, service can be blocked, but 
> no message loss happens.
> We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
> due to its three behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less 
> messages may be chosen as the leader.
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
> has less messages than the leader.
> 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
> stored in only 1 broker.
> The following is an analytical proof. 
> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
> that at the beginning, all 3 replicas, leader A, followers B and C, are in 
> sync, i.e., they have the same messages and are all in ISR.
> According to the value of request.required.acks (acks for short), there are 
> the following cases.
> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
> time, although C hasn't received m, C is still in ISR. If A is killed, C can 
> be elected as the new leader, and consumers will miss m.
> 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
> message m to A, and receives an acknowledgement. Disk failure happens in A 
> before B and C replicate m. Message m is lost.
> In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: [jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-23 Thread Robert Withers
Hi Jun,

Yes, that's what I am thinking. It allows maintaining a pool of offline, but 
current and consistent replica shadows, ready to be flipped into ISR.  Due to 
their being out of ISR prevents them being counted in quorum, yet ready to go, 
so no impact to the producers.

Looking at it through algebra sunglasses means we would establish a secondary 
space of replication but with a different dimensional projection into the 
parent meta space, which is the current ISR replication space, itself projected 
into consumers' meta space as the leader partition.  I am thinking it adds 
another layer of depth, to shore the defenses.

- Rob

> On Jul 23, 2014, at 7:46 PM, "Jun Rao (JIRA)"  wrote:
> 
> 
>[ 
> https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14072690#comment-14072690
>  ] 
> 
> Jun Rao commented on KAFKA-1555:
> 
> 
> Rob,
> 
> Is that any different from just running with a higher replication factor?
> 
>> provide strong consistency with reasonable availability
>> ---
>> 
>>Key: KAFKA-1555
>>URL: https://issues.apache.org/jira/browse/KAFKA-1555
>>Project: Kafka
>> Issue Type: Improvement
>> Components: controller
>>   Affects Versions: 0.8.1.1
>>   Reporter: Jiang Wu
>>   Assignee: Neha Narkhede
>> 
>> In a mission critical application, we expect a kafka cluster with 3 brokers 
>> can satisfy two requirements:
>> 1. When 1 broker is down, no message loss or service blocking happens.
>> 2. In worse cases such as two brokers are down, service can be blocked, but 
>> no message loss happens.
>> We found that current kafka versoin (0.8.1.1) cannot achieve the 
>> requirements due to its three behaviors:
>> 1. when choosing a new leader from 2 followers in ISR, the one with less 
>> messages may be chosen as the leader.
>> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
>> has less messages than the leader.
>> 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
>> stored in only 1 broker.
>> The following is an analytical proof. 
>> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
>> that at the beginning, all 3 replicas, leader A, followers B and C, are in 
>> sync, i.e., they have the same messages and are all in ISR.
>> According to the value of request.required.acks (acks for short), there are 
>> the following cases.
>> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
>> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
>> time, although C hasn't received m, C is still in ISR. If A is killed, C can 
>> be elected as the new leader, and consumers will miss m.
>> 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
>> message m to A, and receives an acknowledgement. Disk failure happens in A 
>> before B and C replicate m. Message m is lost.
>> In summary, any existing configuration cannot satisfy the requirements.
> 
> 
> 
> --
> This message was sent by Atlassian JIRA
> (v6.2#6252)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14072690#comment-14072690
 ] 

Jun Rao commented on KAFKA-1555:


Rob,

Is that any different from just running with a higher replication factor?

> provide strong consistency with reasonable availability
> ---
>
> Key: KAFKA-1555
> URL: https://issues.apache.org/jira/browse/KAFKA-1555
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Jiang Wu
>Assignee: Neha Narkhede
>
> In a mission critical application, we expect a kafka cluster with 3 brokers 
> can satisfy two requirements:
> 1. When 1 broker is down, no message loss or service blocking happens.
> 2. In worse cases such as two brokers are down, service can be blocked, but 
> no message loss happens.
> We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
> due to its three behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less 
> messages may be chosen as the leader.
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
> has less messages than the leader.
> 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
> stored in only 1 broker.
> The following is an analytical proof. 
> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
> that at the beginning, all 3 replicas, leader A, followers B and C, are in 
> sync, i.e., they have the same messages and are all in ISR.
> According to the value of request.required.acks (acks for short), there are 
> the following cases.
> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
> time, although C hasn't received m, C is still in ISR. If A is killed, C can 
> be elected as the new leader, and consumers will miss m.
> 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
> message m to A, and receives an acknowledgement. Disk failure happens in A 
> before B and C replicate m. Message m is lost.
> In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-23 Thread Robert Withers (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14071886#comment-14071886
 ] 

Robert Withers commented on KAFKA-1555:
---

In military satellite communications, when the statically assigned bandwidth is 
exceeded, a dynamic block is available from which to grab some leased 
bandwidth.  If we apply this idea to data production into Kafka, though 
different due to data replication, could we have replicas to the replicas: 
replica shadows?

Say we have 10 brokers with replication 2.  So partition 1 has a leader on 
broker 1 and a follower in ISR on both broker 2 and 3.  If we have replica 
shadows on brokers 4, 5 and 6 not in ISR but receiving msg production 
opportunistically, then we could have the option to dynamically assign a new 
follower into ISR if an ISR follower fails.

- Rob



> provide strong consistency with reasonable availability
> ---
>
> Key: KAFKA-1555
> URL: https://issues.apache.org/jira/browse/KAFKA-1555
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Jiang Wu
>Assignee: Neha Narkhede
>
> In a mission critical application, we expect a kafka cluster with 3 brokers 
> can satisfy two requirements:
> 1. When 1 broker is down, no message loss or service blocking happens.
> 2. In worse cases such as two brokers are down, service can be blocked, but 
> no message loss happens.
> We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
> due to its three behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less 
> messages may be chosen as the leader.
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
> has less messages than the leader.
> 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
> stored in only 1 broker.
> The following is an analytical proof. 
> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
> that at the beginning, all 3 replicas, leader A, followers B and C, are in 
> sync, i.e., they have the same messages and are all in ISR.
> According to the value of request.required.acks (acks for short), there are 
> the following cases.
> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
> time, although C hasn't received m, C is still in ISR. If A is killed, C can 
> be elected as the new leader, and consumers will miss m.
> 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
> message m to A, and receives an acknowledgement. Disk failure happens in A 
> before B and C replicate m. Message m is lost.
> In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14071792#comment-14071792
 ] 

Jun Rao commented on KAFKA-1555:


In case 3.1, when C restarts, the protocol is that C can only join ISR if it 
has received all messages up to the current high watermark.

For example, let's assume that M is 10. Let's say A, B, C all have messages at 
offset 100 and all those messages are committed (therefore high watermark is at 
100). Then C dies. After that, we commit 5 more messages with both A and B 
(high watermark is at 105). Now, C is restarted. C is actually not allowed to 
rejoin ISR until its log end offset has passed 105. This means that C must 
first fetch the 5 newly committed messages before being added to ISR.

> provide strong consistency with reasonable availability
> ---
>
> Key: KAFKA-1555
> URL: https://issues.apache.org/jira/browse/KAFKA-1555
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Jiang Wu
>Assignee: Neha Narkhede
>
> In a mission critical application, we expect a kafka cluster with 3 brokers 
> can satisfy two requirements:
> 1. When 1 broker is down, no message loss or service blocking happens.
> 2. In worse cases such as two brokers are down, service can be blocked, but 
> no message loss happens.
> We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
> due to its three behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less 
> messages may be chosen as the leader.
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
> has less messages than the leader.
> 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
> stored in only 1 broker.
> The following is an analytical proof. 
> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
> that at the beginning, all 3 replicas, leader A, followers B and C, are in 
> sync, i.e., they have the same messages and are all in ISR.
> According to the value of request.required.acks (acks for short), there are 
> the following cases.
> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
> time, although C hasn't received m, C is still in ISR. If A is killed, C can 
> be elected as the new leader, and consumers will miss m.
> 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
> message m to A, and receives an acknowledgement. Disk failure happens in A 
> before B and C replicate m. Message m is lost.
> In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


<    1   2