Author: jkreps
Date: Fri Aug 30 05:33:32 2013
New Revision: 1518892

URL: http://svn.apache.org/r1518892
Log:
Document replication in a little more detail. Cover unclean leader election.


Modified:
    kafka/site/08/design.html
    kafka/site/08/introduction.html

Modified: kafka/site/08/design.html
URL: 
http://svn.apache.org/viewvc/kafka/site/08/design.html?rev=1518892&r1=1518891&r2=1518892&view=diff
==============================================================================
--- kafka/site/08/design.html (original)
+++ kafka/site/08/design.html Fri Aug 30 05:33:32 2013
@@ -145,9 +145,9 @@ Now that we understand a little about ho
 
 It's worth noting that this breaks down into two problems: the durability 
guarantees for publishing a message and the guarantees when consuming a message.
 <p>
-Many systems claim to provide "exactly once" delivery semantics, but it is 
important to read the fine print, most of these claims are misleading (i.e. 
they don't translate to the case where consumers or producers can fail or cases 
where there are multiple consumer processes).
+Many systems claim to provide "exactly once" delivery semantics, but it is 
important to read the fine print, most of these claims are misleading (i.e. 
they don't translate to the case where consumers or producers can fail, or 
cases where there are multiple consumer processes, or cases where data written 
to disk can be lost).
 <p>
-Kafka's semantics are straight-forward. When publishing a message we have a 
notion of the message being "committed" to the log. Once a published message is 
committed it will not be lost as long as one broker remains "alive". The 
definition of alive, which will be described in more detail later, translates 
roughly to "not crashed" and able to keep up with the leader. If a producer 
attempts to publish a message and experiences a network error it cannot be sure 
if this error happened before or after the message was committed. This is 
similar to the semantics of inserting into a database table with an 
autogenerated key.
+Kafka's semantics are straight-forward. When publishing a message we have a 
notion of the message being "committed" to the log. Once a published message is 
committed it will not be lost as long as one broker that replicates the 
partition to which this message was written remains "alive". The definition of 
alive as well as a description of which types of failures we attempt to handle 
will be described in more detail in the next section. For now let's assume a 
perfect, lossless broker and try to understand the guarantees to the producer 
and consumer. If a producer attempts to publish a message and experiences a 
network error it cannot be sure if this error happened before or after the 
message was committed. This is similar to the semantics of inserting into a 
database table with an autogenerated key.
 <p>
 These are not the strongest possible semantics for publishers. Although we 
cannot be sure of what happened in the case of a network error, it is possible 
to allow the producer to generate a sort of "primary key" that makes retrying 
the produce request idempotent. This feature is not trivial for a replicated 
system because of course it must work even (or especially) in the case of a 
server failure. With this feature it would suffice for the producer to retry 
until it receives acknowledgement of a successfully committed message at which 
point we would guarantee the message had been published exactly once. We hope 
to add this in a future Kafka version.
 <p>
@@ -164,40 +164,69 @@ So effectively Kafka guarantees at-least
 
 <h3><a id="replication">4.7 Replication</a></h3>
 <p>
-Kafka replicates the log for each topic's partitions across the number of 
servers configured with each topic. This allows automatic failover when a 
server in the cluster fails so messages remain available in the presence of 
failures.
+Kafka replicates the log for each topic's partitions across a configurable 
number of servers (you can set this replication factor on a topic-by-topic 
basis). This allows automatic failover to these replicas when a server in the 
cluster fails so messages remain available in the presence of failures.
 <p>
-Other messaging systems provide some replication-related features but in our 
(biased) opinion this appears to be a tacked-on thing not heavily used and with 
large downsides: slaves are inactive, throughput is heavily impacted, it 
requires fiddly manual configuration, etc. Kafka is meant to be used with 
replication by default&mdash;in fact we implement un-replicated topics as 
replicated topics where the replication factor is one.
+Other messaging systems provide some replication-related features, but, in our 
(totally biased) opinion, this appears to be a tacked-on thing, not heavily 
used, and with large downsides: slaves are inactive, throughput is heavily 
impacted, it requires fiddly manual configuration, etc. Kafka is meant to be 
used with replication by default&mdash;in fact we implement un-replicated 
topics as replicated topics where the replication factor is one.
 <p>
-The unit of replication is the topic partition. For each partition Kafka has a 
single leader and zero or more followers. The number of replicas is 
configurable at the topic level at topic creation time. All reads and writes go 
to the leader of the partition; each node is the leader for a share of 
partitions and a follower for others. The logs on the followers are 
identical&mdash;all have the same offsets and messages in the same order 
(though, of course, at any given time the leader may have a few as-yet 
unreplicated messages at the end of its log).
+The unit of replication is the topic partition. Under non-failure conditions 
each partition Kafka has a single leader and zero or more followers. We call 
the total number of replicas including the leader the replication factor. All 
reads and writes go to the leader of the partition; each node is the leader for 
a share of partitions and a follower for others. The logs on the followers are 
identical to the leader's log&mdash;all have the same offsets and messages in 
the same order (though, of course, at any given time the leader may have a few 
as-yet unreplicated messages at the end of its log).
 <p>
-Followers consume messages from the leader just as a normal Kafka consumer 
would and applying them to their own log. Having the followers pull from the 
leader has the nice property of allowing the follower to naturally batch 
together log entries they are applying to their log. The leader keeps track of 
which slaves are "in sync"&mdash;meaning not too far behind the leader's own 
log. If a consumer dies or falls behind, the leader will remove it from the 
list of in sync replicas.
+Followers consume messages from the leader just as a normal Kafka consumer 
would and apply them to their own log. Having the followers pull from the 
leader has the nice property of allowing the follower to naturally batch 
together log entries they are applying to their log.
 <p>
-A message is considered "committed" when all in sync replicas for that 
partition have applied it to their log. The leader only gives out committed 
messages to the consumer. This means that the consumer need not worry about 
potentially seeing a message that could be lost if the leader fails. Producers, 
on the other hand, have the option of either waiting for the message to be 
committed or not, depending on their preference for latency and durability. 
+As with most distributed systems automatically handling failures requires 
having a precise definition of what it means for a node to be "alive". For 
Kafka node liveness has two conditions
+<ol>
+       <li>A node must be able to maintain its session with Zookeeper (via 
Zookeeper's heartbeat mechanism)
+       <li>If it is a slave it must replicate the writes happening on the 
leader and not fall "too far" behind
+</ol>
+We refer to nodes satisfying these two conditions as being "in sync" to avoid 
the vagueness of "alive" or "failed". The leader keeps track of the set of "in 
sync" nodes. If a consumer dies or falls behind, the leader will remove it from 
the list of in sync replicas. The definition of how far behind is too far is 
controlled by the replica.lag.max.messages configuration.
+<p>
+In distributed systems terminology we only attempt to handle a "fail/recover" 
model of failures where nodes suddenly cease working and then later recover 
(perhaps without knowing that they have died). Kafka does not handle so-called 
"Byzantine" failures in which nodes produce arbitrary or malicious responses 
(perhaps due to bugs or foul play).
+<p>
+A message is considered "committed" when all in sync replicas for that 
partition have applied it to their log. Only committed messages are ever given 
out to the consumer. This means that the consumer need not worry about 
potentially seeing a message that could be lost if the leader fails. Producers, 
on the other hand, have the option of either waiting for the message to be 
committed or not, depending on their preference for latency and durability. 
This preference is controlled by the request.required.acks setting the producer 
uses.
 <p>
-The guarantee that Kafka offers is that a committed message will not be lost 
as long as a single in sync replica survives.
+The guarantee that Kafka offers is that a committed message will not be lost 
as long as a single in sync replica remains.
+<p>
+Kafka will remain available in the presence of node failures after a short 
fail-over period, but may not remain available in the presence of network 
partitions.
 
 <h4>Replicated Logs: Quorums, ISRs, and State Machines (Oh my!)</h4>
 
 At it's heart a Kafka partition is a replicated log. The replicated log is one 
of the most basic primitives in distributed data systems, and there are many 
approaches to implementation. A replicated log can be used by other systems as 
a primitive for implementing other distributed systems in the <a 
href="http://en.wikipedia.org/wiki/State_machine_replication";>state-machine 
style</a>.
 <p>
-A replicated log models the process of coming into consensus on the order of a 
series of values (entries 0, 1, 2, ...). There are many ways to implement this, 
but the simplest and fastest is with a leader who chooses the ordering of 
values provided to it. As long as the leader remains alive and all followers 
need only copy the values and ordering it chooses.
+A replicated log models the process of coming into consensus on the order of a 
series of values (generally numbering the log entries 0, 1, 2, ...). There are 
many ways to implement this, but the simplest and fastest is with a leader who 
chooses the ordering of values provided to it. As long as the leader remains 
alive and all followers need only copy the values and ordering it chooses.
+<p>
+Of course if leaders didn't fail we wouldn't need followers! When the leader 
does die we need to choose a new leader from among the followers. But followers 
themselves may fall behind or crash so we must ensure we choose an up-to-date 
follower. The fundamental guarantee a log replication algorithm must provide is 
that if we tell the client a message is committed, and the leader fails, the 
new leader we elect must also have that message. This yields a tradeoff: if the 
leader waits for more followers to acknowledge a message before declaring it 
committed then there will be more potentially electable leaders.
 <p>
-Of course if leaders didn't fail we wouldn't need followers! When the leader 
does die we need to choose a new leader from among the followers. But followers 
themselves may fall behind or crash so we must ensure we choose an up-to-date 
follower. The fundamental guarantee a log replication algorithm must provide is 
that if we tell the client a message is committed, and the leader fails, the 
new leader we elect must also have that message. This yields a tradeoff: if the 
leader waits for more followers to replicate a message before declaring it 
committed then there will be more potentially electable leaders.
+If you choose the number of acknowledgements required and the number of logs 
that must be compared to elect a leader such that there is guaranteed to be an 
overlap then this is called a Quorum.
 <p>
-A common approach to this tradeoff is to use a majority (quorum) for both the 
commit decision and the leader election. This is not what Kafka does, but let's 
explore it anyway to understand the tradeoffs. Let's say we want to tolerate 
<i>f</i> failures. If 2<i>f</i>+1 servers must replicate a message prior to a 
commit being declared by the leader, and if we elect a new leader by electing 
the follower with the most complete log from at least 2<i>f</i>+1 replicas, 
then, with no more than <i>f</i> failures there must be at least one server in 
common between those that committed the message and the servers that were 
available to take over as leader and hence the message will be preserved. There 
are many remaining details that each algorithm must handle (such as ensuring 
log consistency during leader failure or changing the set of servers in the 
replica set) but we will ignore these for now.
+A common approach to this tradeoff is to use a majority vote for both the 
commit decision and the leader election. This is not what Kafka does, but let's 
explore it anyway to understand the tradeoffs. Let's say we want to tolerate 
<i>f</i> failures. If 2<i>f</i>+1 servers must replicate a message prior to a 
commit being declared by the leader, and if we elect a new leader by electing 
the follower with the most complete log from at least 2<i>f</i>+1 replicas, 
then, with no more than <i>f</i> failures there must be at least one server in 
common between those that committed the message and the servers that were 
available to take over as leader and hence the message will be preserved.  
There are many remaining details that each algorithm must handle (such as 
precisely defined what makes a log more complete, ensuring log consistency 
during leader failure or changing the set of servers in the replica set) but we 
will ignore these for now.
 <p>
-This quorum approach has a very nice property: the latency is dependent on 
only the fastest servers. That is, if the replication factor is three, the 
latency is determined by the faster of the two slaves.
+This majority vote approach has a very nice property: the latency is dependent 
on only the fastest servers. That is, if the replication factor is three, the 
latency is determined by the faster slave not the slower one.
 <p>
 There are a rich variety of algorithms in this family including Zookeeper's <a 
href="http://www.stanford.edu/class/cs347/reading/zab.pdf";>Zab</a>, <a 
href="https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf";>Raft</a>,
 and <a href="http://pmg.csail.mit.edu/papers/vr-revisited.pdf";>Viewstamped 
Replication</a>. The most similar academic publication we are aware of to 
Kafka's actual implementation is <a 
href="http://research.microsoft.com/apps/pubs/default.aspx?id=66814";>PacificA</a>
 from Microsoft.
 <p>
-The downside of the quorum is that the amount of replication to tolerate 
failures is a bit high. To tolerate one failure requires five copies of the 
data, and to tolerate two failures requires five copies of the data. In our 
experience a single failure is not enough, but doing every write five times, 
with 5x the disk space requirements and 1/5th the throughput, is just not 
practical for large volume data problems. This is likely why quorum algorithms 
more commonly appear for shared cluster configuration such as in Zookeeper but 
are less common for primary data storage. For example in HDFS the namenode's 
high-availability feature is built on quorum-based journal but quorum 
journalling is not used for the data itself.
+The downside of majority vote is that it doesn't take many failures to leave 
you with no electable leaders. To tolerate one failure requires three copies of 
the data, and to tolerate two failures requires five copies of the data. In our 
experience having only enough redundancy to tolerate a single failure is not 
enough for a practical system, but doing every write five times, with 5x the 
disk space requirements and 1/5th the throughput, is not very practical for 
large volume data problems. This is likely why quorum algorithms more commonly 
appear for shared cluster configuration such as Zookeeper but are less common 
for primary data storage. For example in HDFS the namenode's high-availability 
feature is built on a <a 
href="http://blog.cloudera.com/blog/2012/10/quorum-based-journaling-in-cdh4-1";>majority-vote-based
 journal</a> but this more expensive approach is not used for the data itself.
+<p>
+Kafka takes a slightly different approach to choosing its quorum set. Instead 
of majority vote Kafka dynamically maintains a set of in-sync replicas that are 
caught-up to the leader. Only members of this set are eligible for election as 
leader. A write to a Kafka partition is not considered committed until 
<i>all</i> in-sync replicas have received the write. This ISR set is persisted 
to zookeeper whenever it changes. Because of this, any replica in the in-sync 
set is eligible to be elected leader. This is an important factor for Kafka's 
usage model where there are many partitions and ensuring leadership balance is 
important. With this ISR model and <i>N</i> replicas a Kafka topic can tolerate 
<i>N</i>-1 failures without losing committed messages.
+<p>
+In practice for most use cases we hope to handle we think this tradeoff is a 
reasonable one. In practice to tolerate <i>f</i> failures both the majority 
vote and ISR approach will wait for the same number of replicas to acknowledge 
before committing a message (e.g. to survive one failure a majority quorum 
needs three replicas and one acknowledgement and the ISR approach requires two 
replicas and one acknowledgement). The ability to commit without the slowest 
servers is an advantage of the majority vote approach but we think it is 
ameliorated by allowing the client to choose whether they block on the message 
commit or not, and the additional throughput and disk space due to the lower 
required replication factor is worth it.
+<p>
+Another important design distinction is that Kafka does not require that 
crashed nodes recover with all their data intact. It is not uncommon for 
replication algorithms in this space to depend on the existence of "stable 
storage" that cannot be lost in any failure-recovery scenario without potential 
consistency violations. There are two primary problems with this assumption. 
First, disk errors are the most common problem we observe in real operation of 
persistent data systems and they often do not leave data intact. Secondly, even 
if this were not a problem, we do not want to require the use of fsync on every 
write for our consistency guarantees as this can reduce performance by two to 
three orders of magnitude. Our protocol for allowing a replica to rejoin the 
ISR ensures that before rejoining it must fully re-sync again even if it lost 
unflushed data in its crash.
+
+<h4>Unclean leader election: What if they all die?</h4>
+
+Note that Kafka's guarantee with respect to data loss is predicated on at 
least on replica remaining in sync. If all the nodes replicating a partition 
die, this guarantee no longer holds.
+<p>
+However a practical system needs to do something reasonable when all the 
brokers die. If you are unlucky enough to have this happen it is important to 
consider what will happen. There are two behaviors that could be implemented:
+<ol>
+       <li>Wait for a broker in the last in-sync set to come back to life and 
choose this broker as the leader (hopefully it still has all its data).
+       <li>Choose the first broker to come back to life as the leader.
+</ol>
 <p>
-Kafka takes a different approach from quorum replication. Instead Kafka 
dynamically maintains a set of in-sync replicas that are caught-up to the 
leader. Only members of this set are eligible for election as leader. A write 
to a Kafka partition is not considered committed until all in-sync replicas 
have received the write. This ISR set is persisted to zookeeper whenever it 
changes. Because of this any replica in the in-sync set is eligible to be 
elected leader. This is an important factor in Kafka where there are many 
partitions and ensuring leadership balance is important. With this ISR model 
<i>N</i> replicas a Kafka topic can tolerate <i>N</i>-1 failures without losing 
committed messages.
+This is a simple tradeoff between availability and consistency. If we wait for 
a broker in the ISR then we will remain unavailable as long as this broker is 
down. If this broker was destroyed or its data was lost then we are permanently 
down. If, on the other hand, a non-in-sync broker comes back to life and we 
allow it to become leader then it's log becomes the source of truth even though 
it is not guaranteed to have every committed message. In our current release we 
choose the second strategy and favor choosing a potentially inconsistent broker 
when all other machines are dead. In the future we would like to make this 
configurable to better support uses where downtime is preferable to 
inconsistency.
 <p>
-In practice for most use cases we hope to handle we think this tradeoff is a 
reasonable one. In practice to tolerate <i>f</i> failures both the quorum and 
ISR approach will wait for the same number of replicas to acknowledge before 
committing a message (e.g. to survive one failure a quorum needs three replicas 
and one acknowledgement and the ISR approach requires two replicas and one 
acknowledgement). The ability to commit without the slowest servers is an 
advantage of the quorum approach but we think it is ameliorated by allowing the 
client to choose whether they block on the message commit or not, and the 
additional throughput and disk space due to the lower required replication 
factor is worth it.
+This dilemma is not specific to Kafka, it exists in any quorum-based scheme. 
For example in a majority voting scheme if a majority of servers suffer a 
permanent failure then you must either choose to lose 100% of your data or 
violate consistency by taking what remains on an existing server as your new 
source of truth.
 
 <h4>Replica Management</h4>
 
-The above discussion on a replicated log covers only a single partition. 
However a Kafka cluster will manage hundreds or thousands of these. We attempt 
to balance partitions within a cluster in a round-robin fashion to avoid 
clustering all partitions for high-volume topics on a small number of nodes. 
Likewise we try to balance leadership so that each node is the leader for a 
proportional share of its partitions.
+The above discussion on replicated logs really covers only a single log, i.e. 
one topic partition. However a Kafka cluster will manage hundreds or thousands 
of these. We attempt to balance partitions within a cluster in a round-robin 
fashion to avoid clustering all partitions for high-volume topics on a small 
number of nodes. Likewise we try to balance leadership so that each node is the 
leader for a proportional share of its partitions.
 <p>
 It is also important to optimize the leadership election process as that is 
the critical window of unavailability. A naive implementation of leader 
election would end up running an election per partition for all partitions a 
node hosted when that node failed. Instead we elect a single "controller" that 
is responsible for leadership assignment decisions. This controller serves an 
analogous role to the role of leaders themselves&mdash;we avoid making a 
sequence of leadership decisions by choosing a designated process to make all 
these decisions and then handling faults in this process. The result is that we 
are able to batch together many of the required leadership change notifications 
which makes the election process far cheaper and faster for a large number of 
partitions.

Modified: kafka/site/08/introduction.html
URL: 
http://svn.apache.org/viewvc/kafka/site/08/introduction.html?rev=1518892&r1=1518891&r2=1518892&view=diff
==============================================================================
--- kafka/site/08/introduction.html (original)
+++ kafka/site/08/introduction.html Fri Aug 30 05:33:32 2013
@@ -11,7 +11,7 @@ First let's review some basic messaging 
        <li>Kafka is run as a cluster comprised of one or more servers each of 
which is called a <i>broker</i>.
 </ul>
 
-So, at a high level, producers are send messages over the network to the Kafka 
cluster which in turn serves them up to consumers like this:
+So, at a high level, producers send messages over the network to the Kafka 
cluster which in turn serves them up to consumers like this:
 <div style="text-align: center; width: 100%">
   <img src="../images/producer_consumer.png">
 </div>
@@ -73,9 +73,10 @@ Not that partitioning means Kafka only p
 
 <h4>Guarantees</h4>
 
-Kafka gives the following guarantees
+At a high-level Kafka gives the following guarantees
 <ul>
   <li>Messages sent by a producer to a particular topic partition will be 
appended in the order they are sent. That is if a message M1 is sent by the 
same producer as a message M2, and M1 is sent first, then M1 will have a lower 
offset then M2 and appear earlier in the log.
-  <li>A consumer instance sees messages in the order they are stored in the log
+  <li>A consumer instance sees messages in the order they are stored in the 
log.
   <li>For a topic with replication factor N, we will tolerate up to N-1 server 
failures without losing any messages committed to the log.
-</ul>
\ No newline at end of file
+</ul>
+More details on these guarantees are given in the design section of the 
documentation.
\ No newline at end of file


Reply via email to