Hi all

So having gone through a few extra failure scenarios it appears it is still
possible for logs to diverge if the unclean.leader.election setting is
enabled. The protocol could be evolved further to protect against this. The
issue is that it adds significant complexity, and potentially impacts other
primitives like log compaction. As a result the most pragmatic solution is
to *limit the guarantees this KIP provides to clusters where unclean leader
election is disabled*.

If anyone has any strong feelings on this, or useful insights, that would
be awesome. Otherwise I'll update the KIP to reflect this stance (along
with the example below).

All the best
B

*Divergent Logs with Leader Epochs & Unclean Leader Election*
It should be possible to still corrupt the log, even with Leader epochs, if
min.isr=1 and unclean.leader.election=true. Consider two brokers A,B, a
single topic, a single partition, reps=2, min.isr=1.

Intuitively the issue can be seen as:
-> The first two writes create a divergent log at offset 0 on completely
isolated brokers.
-> The second two writes “cover up” that first divergent write so the
LeaderEpoch request doesn’t see it.

Scenario:
1. [LeaderEpoch0] Write a message to A (offset A:0), Stop broker A. Bring
up broker B which becomes leader
2. [LeaderEpoch1] Write a message to B (offset B:0), Stop broker B. Bring
up broker A which becomes leader
3. [LeaderEpoch2] Write a message to A (offset A:1), Stop broker A. Bring
up broker B which becomes leader
4. [LeaderEpoch3] Write a message to B (offset B:1),
5. Bring up broker A. It sends a Epoch Request for Epoch 2 to broker B. B
has only epochs 1,3, not 2, so it replies with the first offset of Epoch 3
(which is 1). So offset 0 is divergent.

The underlying problem here is that, whilst B can tell something is wrong,
it can't tell where in the log the divergence started.

One solution is to detect the break, by comparing complete epoch lineage
between brokers, then truncate either to (a) zero or (b) the point of
divergence, then refetch. However compacted topics make both of these
options hard as arbitrary epochs & offset information can be 'lost' from
the log. This information could be retained and managed in the LeaderEpoch
file instead, but the whole solution is becoming quite complex. Hence it
seems sensible to forgo this guarantee for the unclean leader election
case, or at least push it to a subsequent kip.


On Wed, Dec 14, 2016 at 6:45 PM Jun Rao <j...@confluent.io> wrote:

Hi, Onur,

The reason for keeping track of the CZXID of the broker registration path
is the following. There is one corner case bug (KAFKA-1120) that Ben
mentioned where the controller could miss a ZK watcher event if the broker
deregisters and registers quickly. Always triggering a leader election (and
thus increasing the leader epoch) on broker registration event may work,
but we have to think through the controller failover logic. When the
controller initializes, it simply reads all current broker registration
from ZK. The controller doesn't know whether any broker registration has
changed since the previous controller has failed. Just blindly forcing
leader election on all partitions during the controller failover probably
adds too much overhead.

So, the idea is to have the broker tracks the broker -> CZXID mapping in
memory. Every time the controller changes the leader for a partition, the
controller stores the CZXID of the leader together with the leader broker
id (and leader epoch, controller epoch etc) in memory and in
/brokers/topics/[topic]/partitions/[partitionId]/state
(this is missing in the KIP wiki). Now if the controller gets a broker
registration event or when there is a controller failover, the controller
just needs to force a leader election if the CZXID of the broker
registration doesn't match the CZXID associated with the leader in
/brokers/topics/[topic]/partitions/[partitionId]/state.
This way, we will only do leader election when it's truly necessary.

The reason why this change is related to this KIP is that it also addresses
the issue of keeping the replicas identical during correlated failures. If
all replicas are down and the leader replica is the first being restarted,
by forcing the increase of leader epoch even though the leader remains on
the same replica, we can distinguish the data written since the leader
replica is restarted from those written by the same leader replica before
it's restarted. This allows us to maintain all replicas to be identical
even in the correlated failure case.

Thanks,

Jun

On Sun, Dec 11, 2016 at 3:54 PM, Onur Karaman <onurkaraman.apa...@gmail.com>
wrote:

> Pretty happy to see a KIP tackling this problem! One comment below.
>
> The "Extending LeaderEpoch to include Returning Leaders" states:
> "To protect against this eventuality the controller will maintain a cached
> mapping of [broker -> Zookeeper CZXID] (CZXID is a unique and monotonic
> 64-bit number) for the broker’s registration in Zookeeper
> (/brokers/ids/[brokerId]). If the controller receives a Broker
Registration
> where the CZXID has changed it will increment the Leader Epoch and
> propagate that value to the broker via the Leader and ISR Request (in the
> normal way), then update the cached CZXID for that broker."
>
> In general I think kafka underutilizes zookeeper's various flavors of
zxids
> but this time it's not clear to me what the motivation is for maintaining
> the broker to czxid mapping. It seems that the following check is
> redundant: "If the controller receives a Broker Registration where the
> CZXID has changed". By definition, the czxid of the
/brokers/ids/[brokerId]
> znode will always change upon successful broker registration (
> https://zookeeper.apache.org/doc/r3.4.8/zookeeperProgrammers.html#sc_
> zkStatStructure).
> Why maintain the mapping at all? Why not just always update leader epochs
> and propagate every time the controller receives the broker registration
zk
> event?
>
> On Sun, Dec 11, 2016 at 2:30 PM, Neha Narkhede <n...@confluent.io> wrote:
>
> > Good to see this KIP being proposed. Back when I added the epoch to the
> > replication protocol, we discussed adding it to the log due to the
> failure
> > scenarios listed in the KIP but I failed to convince people that it was
> > worth the effort needed to upgrade the cluster (especially after we
asked
> > people to go through a painful backwards incompatible upgrade for 0.8
> :-))
> > The lack of including the leader epoch/generation in the log has also
> been
> > one of the biggest critiques of Kafka's replication protocol by the
> > distributed systems community.
> >
> > I'm in favor of this work though I think we shouldn't end up with 2
> notions
> > of representing a leader's generation. When we added the epoch, we
wanted
> > to add it to the log but we didn't. Now that we are adding the
generation
> > id to the log, I think we should revisit calling it the epoch at all.
> Have
> > you thought about a way to evolve the epoch to the generation id
> throughout
> > and what it will take?
> >
> > On Sun, Dec 11, 2016 at 4:31 AM Ben Stopford <b...@confluent.io> wrote:
> >
> > > Hi All
> > >
> > > Please find the below KIP which describes a proposed solution to a
> couple
> > > of issues that have been observed with the replication protocol.
> > >
> > > In short, the proposal replaces the use of the High Watermark, for
> > > follower log trunctation, with an alternate Generation Marker. This
> > > uniquely defines which leader messages were acknowledged by.
> > >
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 101+-+Alter+Replication+Protocol+to+use+Leader+
> > Generation+rather+than+High+Watermark+for+Truncation
> > > <
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 101+-+Alter+Replication+Protocol+to+use+Leader+
> > Generation+rather+than+High+Watermark+for+Truncation
> > > >
> > >
> > > All comments and suggestions greatly appreciated.
> > >
> > > Ben Stopford
> > > Confluent, http://www.confluent.io <http://www.confluent.io/>
> > >
> > > --
> > Thanks,
> > Neha
> >
>

Reply via email to