Hello Jad,

The reason why the new leader needs to truncate to HW is to that only
messages that are appended to all the ISR replicas and hence whose offset
is within the HW are treated as "committed". Blindly follow the largest LEO
may cause inconsistency between replicas and between servers and clients.
Consider these two scenarios:

First case: inconsistency between replicas

1) currently replica 1 is the current leader

replica 1: m1 m2 m3
replica 2: m1 m2

2) replica 1 fails, and replica 2 becomes the new leader and accept
messages 4 and 5:

replica 1: m1 m2 m3
replica 2: m1 m2 m4 m5

3) replica 1 resumes, and does not truncate to HW, then it will still
maintain m3, which is actually never "committed". Say leader moves to
replica 1 again, we can ended up with:

replica 1: m1 m2 m3 m6
replica 2: m1 m2 m4 m5

Second case: inconsistency between server and clients:

1) producer send message m3 with ack=-1:

replica 1: m1 m2 m3
replica 2: m1 m2 m3
replica 3: m1 m2

2) the response is held until all replicas also gets m3, say at this time
current leader replica 1 fails and replica 3 re-elects. If replica 3 gets
up to the largest LEO it will also get m3.

replica 2: m1 m2 m3
replica 3: m1 m2 m3

3) But m3 is not actually "committed" by the time replica 1 fails; when
producer gets the error at the time replica 1 fails, it will think that m3
was not successfully sent, so retry sending m3:

replica 2: m1 m2 m3 m3
replica 3: m1 m2 m3 m3

So in a word, only messages below the HW are safe to be maintained upon
leader migration. The issue here though, is that follower's HW update is
slightly late the leader's HW update, and hence the truncation may cause
data loss. We will try to fix this issue in KAFKA-1211.

Guozhang


On Fri, Jul 25, 2014 at 4:12 PM, Jad Naous <jad.na...@appdynamics.com>
wrote:

> Hi Guozhang,
>
> Yes, I think they are related.
>
> It seems odd to me that there should be any truncation at all since that is
> always an opportunity for data loss. It seems like we would want to avoid
> that at all costs, assuming we uphold the invariant that messages committed
> to an offset on any replica will always be identical. There seems to be two
> choices to make sure we don't lose data during leader election:
>
> 1- after leader election, requests going to the new leader should block
> until the leader is caught up to the highest LEO in the set of replicas (or
> until the HW reaches the highest LEO among replicas). I think this is less
> "normal" since it would mean that the new leader has to fetch messages from
> a non-leader. Maybe you can have the notion of a write-leader and a
> read-leader. After leader election, the write-leader becomes whatever the
> preferred leader today is, and the read-leader becomes the replica with the
> highest LEO. If the write-leader is not also the read-leader, then writes
> are blocked. All replicas and consumers fetch from the read-leader, and
> when the write-leader is caught up to the read-leader, the write-leader
> also becomes the read-leader and writes are unblocked.
>
> 2- After leader election, block actual leadership switching and block
> responding to new requests until the HW reaches the LEO on the old leader,
> then switch leadership, failing the blocked requests (or signaling a retry
> for a new leader), which should then retry the writes on the new leader,
> and things proceed as normal.
>
> Would either of these work?
>
> Thanks,
> Jad.
>
>
>
>
> On Fri, Jul 25, 2014 at 3:37 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Jad,
> >
> > Yes. In this case I think you are actually hitting KAFKA-1211. The
> summary
> > of the issue is that, it takes one more fetch request round trip for the
> > follower replica to advance the HW after the leader has advanced HW. So
> for
> > your case, the whole process is like this:
> >
> > 1. leader LEO at 10, follower LEO at 8. Both leader and follower knows
> the
> > LEO is at 8.
> > 2. Follower fetch data on Leader starting at 8, leader records its LEO as
> > 8.
> > 3. Follower gets 9 and 10 and append to its local log.
> > 4. Follower fetch data on Leader starting at 10, leader records its LEO
> as
> > 10; now leader knows follower has caught up, it advances its HW to 10 and
> > adds the follower to ISR (but follower does not know that yet! It still
> > think the HW is 8).
> > 5. Leader's fetch response gets back to follower, and now the follower
> > knows that HW has been updated to 10.
> >
> > And let's say there is a leader election between step 4) and 5), for your
> > case it is due to preferred leader election, but it could also be that
> > current leader fails, etc. Then on becoming the new leader the follower
> > will truncate its data to 8, which is the HW it knows. Hence the data
> loss.
> >
> > The proposed solution in KAFKA-1211 will tackle this issue.
> >
> > Guozhang
> >
> >
> > On Fri, Jul 25, 2014 at 2:48 PM, Jad Naous <jad.na...@appdynamics.com>
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > Yes, broker 1 is in the ISR (in fact, I wait for broker 1 to be in the
> > ISR
> > > before triggering election). However, I think there is something still
> > > amiss. I still see data loss. Here are some relevant log lines from
> > broker
> > > 0 (different test run). The log on broker 0 is getting truncated,
> losing
> > > some messages.
> > >
> > > [2014-07-25 10:40:02,134]  [DEBUG]  [kafka-request-handler-5]
> > > [kafka.cluster.Partition]  Partition [EventServiceUpsertTopic,19] on
> > broker
> > > 0: Old hw for partition [EventServiceUpsertTopic,19] is 8082. New hw is
> > > 8082. All leo's are 8111,8082
> > >
> > > [2014-07-25 10:40:02,134]  [DEBUG]  [kafka-request-handler-5]
> > > [kafka.server.KafkaApis]  [KafkaApi-0] Produce to local log in 1 ms
> > >
> > > [2014-07-25 10:40:02,134]  [DEBUG]  [main-SendThread(localhost:49893)]
> > > [org.apache.zookeeper.ClientCnxn]  Reading reply
> > > sessionid:0x476e9a9e9a0001, packet:: clientPath:null serverPath:null
> > > finished:false header:: 729,5  replyHeader:: 729,4294968217,0
>  request::
> > >
> > >
> >
> '/brokers/topics/EventServiceUpsertTopic/partitions/5/state,#7b22636f6e74726f6c6c65725f65706f6368223a312c226c6561646572223a312c2276657273696f6e223a312c226c65616465725f65706f6368223a332c22697372223a5b302c315d7d,3
> > > response::
> > >
> > >
> >
> s{4294967416,4294968217,1406309966419,1406310002132,4,0,0,0,74,0,4294967416}
> > >
> > > [2014-07-25 10:40:02,134]  [DEBUG]  [kafka-processor-49917-2]
> > > [kafka.request.logger]  Completed request:Name: ProducerRequest;
> Version:
> > > 0; CorrelationId: 16248; ClientId: ; RequiredAcks: -1; AckTimeoutMs:
> > 10000
> > > ms from client /127.0.0.1:50168
> > >
> > >
> >
> ;totalTime:1,requestQueueTime:0,localTime:1,remoteTime:0,responseQueueTime:0,sendTime:0
> > >
> > > [2014-07-25 10:40:02,134]  [DEBUG]
> > >
> [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899]
> > > [kafka.utils.ZkUtils$]  Conditional update of path
> > > /brokers/topics/EventServiceUpsertTopic/partitions/5/state with value
> > >
> >
> {"controller_epoch":1,"leader":1,"version":1,"leader_epoch":3,"isr":[0,1]}
> > > and expected version 3 succeeded, returning the new version: 4
> > >
> > > [2014-07-25 10:40:02,136]  [DEBUG]
> > >
> [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899]
> > > [k.controller.PartitionStateMachine]  [Partition state machine on
> > > Controller 0]: After leader election, leader cache is updated to
> > > Map([EventServiceUpsertTopic,18] ->
> > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> > > [EventServiceUpsertTopic,4] ->
> > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> > > [EventServiceUpsertTopic,7] ->
> > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> > > [EventServiceUpsertTopic,17] ->
> > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> > > [EventServiceUpsertTopic,16] ->
> > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> > > [EventServiceUpsertTopic,5] ->
> > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> > > [EventServiceUpsertTopic,10] ->
> > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> > > [EventServiceUpsertTopic,1] ->
> > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> > > [EventServiceUpsertTopic,13] ->
> > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> > > [EventServiceUpsertTopic,9] ->
> > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> > > [EventServiceUpsertTopic,8] ->
> > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> > > [EventServiceUpsertTopic,11] ->
> > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> > > [EventServiceUpsertTopic,6] ->
> > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> > > [EventServiceUpsertTopic,12] ->
> > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> > > [EventServiceUpsertTopic,2] ->
> > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> > > [EventServiceUpsertTopic,3] ->
> > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> > > [EventServiceUpsertTopic,14] ->
> > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), [
> > > EventServiceUpsertTopic,19] ->
> > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> > > [EventServiceUpsertTopic,0] ->
> > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> > > [EventServiceUpsertTopic,15] ->
> > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1))
> > >
> > > [2014-07-25 10:40:02,136]  [DEBUG]  [kafka-request-handler-1]
> > > [kafka.cluster.Partition]  Partition [EventServiceUpsertTopic,19] on
> > broker
> > > 0: Old hw for partition [EventServiceUpsertTopic,19] is 8082. New hw is
> > > 8082. All leo's are 8112,8082
> > >
> > > [2014-07-25 10:40:02,136]  [DEBUG]  [kafka-request-handler-1]
> > > [kafka.server.KafkaApis]  [KafkaApi-0] Produce to local log in 0 ms
> > >
> > > [2014-07-25 10:40:02,137]  [DEBUG]  [kafka-processor-49917-2]
> > > [kafka.request.logger]  Completed request:Name: ProducerRequest;
> Version:
> > > 0; CorrelationId: 16250; ClientId: ; RequiredAcks: -1; AckTimeoutMs:
> > 10000
> > > ms from client /127.0.0.1:50168
> > >
> > >
> >
> ;totalTime:1,requestQueueTime:0,localTime:1,remoteTime:0,responseQueueTime:0,sendTime:0
> > >
> > > [2014-07-25 10:40:02,140]  [INFO ]
> > >
> [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899]
> > > [kafka.controller.KafkaController]  [Controller 0]: Partition [
> > > EventServiceUpsertTopic,19] completed preferred replica leader
> election.
> > > New leader is 1
> > >
> > > [2014-07-25 10:40:02,140]  [INFO ]
> > >
> [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899]
> > > [kafka.controller.KafkaController]  [Controller 0]: Partition
> > > [EventServiceUpsertTopic,5] completed preferred replica leader
> election.
> > > New leader is 1
> > >
> > > [2014-07-25 10:40:02,142]  [DEBUG]  [main-SendThread(localhost:49893)]
> > > [org.apache.zookeeper.ClientCnxn]  Got notification
> > > sessionid:0x476e9a9e9a0001
> > >
> > > [2014-07-25 10:40:02,142]  [DEBUG]  [main-SendThread(localhost:49893)]
> > > [org.apache.zookeeper.ClientCnxn]  Got WatchedEvent state:SyncConnected
> > > type:NodeDeleted path:/admin/preferred_replica_election for sessionid
> > > 0x476e9a9e9a0001
> > >
> > > [2014-07-25 10:40:02,143]  [DEBUG]  [main-SendThread(localhost:49893)]
> > > [org.apache.zookeeper.ClientCnxn]  Reading reply
> > > sessionid:0x476e9a9e9a0001, packet:: clientPath:null serverPath:null
> > > finished:false header:: 730,2  replyHeader:: 730,4294968218,0
>  request::
> > > '/admin/preferred_replica_election,-1  response:: null
> > >
> > > [2014-07-25 10:40:02,145]  [INFO ]  [kafka-request-handler-0]
> > > [kafka.server.ReplicaFetcherManager]  [ReplicaFetcherManager on broker
> 0]
> > > Removed fetcher for partitions
> > >
> > >
> >
> [EventServiceUpsertTopic,13],[EventServiceUpsertTopic,11],[EventServiceUpsertTopic,17],[EventServiceUpsertTopic,7],[EventServiceUpsertTopic,9],[EventServiceUpsertTopic,1],[EventServiceUpsertTopic,15],[
> > > EventServiceUpsertTopic,19
> > > ],[EventServiceUpsertTopic,3],[EventServiceUpsertTopic,5]
> > >
> > > [2014-07-25 10:40:02,153]  [INFO ]  [kafka-request-handler-0]
> > > [kafka.log.Log]  Truncating log EventServiceUpsertTopic-19 to offset
> > 8082.
> > >
> > > [2014-07-25 10:40:02,159]  [INFO ]  [kafka-request-handler-0]
> > > [kafka.log.Log]  Truncating log EventServiceUpsertTopic-5 to offset 0.
> > >
> > >
> > > I have removed some irrelevant lines. As you can see, the log for
> > > EventServiceUpsertTopic-19 is being truncated, losing messages that
> have
> > > not yet been replicated to broker 1. I'm not sure exactly what the
> issue
> > > is. Maybe new requests should be completely blocked after leader
> election
> > > until the new leader catches up to the messages that have been acked
> and
> > it
> > > has not yet received.
> > >
> > > Thanks,
> > >
> > > Jad.
> > >
> > >
> > > On Fri, Jul 25, 2014 at 2:14 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Hello Jad,
> > > >
> > > > I double-checked the source code again, and found that actually the
> > > > preferred leader elector does consider whether the selected replica
> is
> > in
> > > > ISR or not. This means that by the time the election is triggered,
> > > broker 1
> > > > is added to ISR by broker 0. Could you check before step 3, is there
> > any
> > > > log entries on broker 0 adding broker 1 to ISR or updating HW to 975?
> > > >
> > > >
> > > > On Thu, Jul 24, 2014 at 4:21 PM, Jad Naous <
> jad.na...@appdynamics.com>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I have a test that continuously sends messages to one broker,
> brings
> > up
> > > > > another broker, and adds it as a replica for all partitions, with
> it
> > > > being
> > > > > the preferred replica for some. I have
> > > auto.leader.rebalance.enable=true,
> > > > > so replica election gets triggered. Data is being pumped to the old
> > > > broker
> > > > > all the while. It seems that some data gets lost while switching
> over
> > > to
> > > > > the new leader. Is this a bug, or do I have something
> misconfigured?
> > I
> > > > also
> > > > > have request.required.acks=-1 on the producer.
> > > > >
> > > > > Here's what I think is happening:
> > > > >
> > > > > 1. Producer writes message to broker 0,
> [EventServiceUpsertTopic,13],
> > > w/
> > > > > broker 0 currently leader, with ISR=(0), so write returns
> > successfully,
> > > > > even when acks = -1. Correlation id 35836
> > > > >
> > > > > Producer log:
> > > > >
> > > > > [2014-07-24 14:44:26,991]  [DEBUG]  [dw-97 - PATCH
> > > > >
> > > > >
> > > >
> > >
> >
> /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1]
> > > > > [kafka.producer.BrokerPartitionInfo]  Partition
> > > > > [EventServiceUpsertTopic,13] has leader 0
> > > > >
> > > > > [2014-07-24 14:44:26,993]  [DEBUG]  [dw-97 - PATCH
> > > > >
> > > > >
> > > >
> > >
> >
> /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1]
> > > > > [k.producer.async.DefaultEventHandler]  Producer sent messages with
> > > > > correlation id 35836 for topics [EventServiceUpsertTopic,13] to
> > broker
> > > 0
> > > > on
> > > > > localhost:56821
> > > > > 2. Broker 1 is still catching up
> > > > >
> > > > > Broker 0 Log:
> > > > >
> > > > > [2014-07-24 14:44:26,992]  [DEBUG]  [kafka-request-handler-3]
> > > > > [kafka.cluster.Partition]  Partition [EventServiceUpsertTopic,13]
> on
> > > > broker
> > > > > 0: Old hw for partition [EventServiceUpsertTopic,13] is 971. New hw
> > is
> > > > 971.
> > > > > All leo's are 975,971
> > > > >
> > > > > [2014-07-24 14:44:26,992]  [DEBUG]  [kafka-request-handler-3]
> > > > > [kafka.server.KafkaApis]  [KafkaApi-0] Produce to local log in 0 ms
> > > > >
> > > > > [2014-07-24 14:44:26,992]  [DEBUG]  [kafka-processor-56821-0]
> > > > > [kafka.request.logger]  Completed request:Name: ProducerRequest;
> > > Version:
> > > > > 0; CorrelationId: 35836; ClientId: ; RequiredAcks: -1;
> AckTimeoutMs:
> > > > 10000
> > > > > ms from client /127.0.0.1:57086
> > > > >
> > > > >
> > > >
> > >
> >
> ;totalTime:0,requestQueueTime:0,localTime:0,remoteTime:0,responseQueueTime:0,sendTime:0
> > > > > 3. Leader election is triggered by the scheduler:
> > > > >
> > > > > Broker 0 Log:
> > > > >
> > > > > [2014-07-24 14:44:26,991]  [INFO ]  [kafka-scheduler-0]
> > > > > [k.c.PreferredReplicaPartitionLeaderSelector]
> > > > > [PreferredReplicaPartitionLeaderSelector]: Current leader 0 for
> > > > partition [
> > > > > EventServiceUpsertTopic,13] is not the preferred replica.
> Trigerring
> > > > > preferred replica leader election
> > > > >
> > > > > [2014-07-24 14:44:26,993]  [DEBUG]  [kafka-scheduler-0]
> > > > > [kafka.utils.ZkUtils$]  Conditional update of path
> > > > > /brokers/topics/EventServiceUpsertTopic/partitions/13/state with
> > value
> > > > >
> > > >
> > >
> >
> {"controller_epoch":1,"leader":1,"version":1,"leader_epoch":3,"isr":[0,1]}
> > > > > and expected version 3 succeeded, returning the new version: 4
> > > > >
> > > > > [2014-07-24 14:44:26,994]  [DEBUG]  [kafka-scheduler-0]
> > > > > [k.controller.PartitionStateMachine]  [Partition state machine on
> > > > > Controller 0]: After leader election, leader cache is updated to
> > > > >
> > > >
> > >
> >
> Map(<Snipped>(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),<EndSnip>)
> > > > >
> > > > > [2014-07-24 14:44:26,994]  [INFO ]  [kafka-scheduler-0]
> > > > > [kafka.controller.KafkaController]  [Controller 0]: Partition [
> > > > > EventServiceUpsertTopic,13] completed preferred replica leader
> > > election.
> > > > > New leader is 1
> > > > > 4. Broker 1 is still behind, but it sets the high water mark to
> > 971!!!
> > > > >
> > > > > Broker 1 Log:
> > > > >
> > > > > [2014-07-24 14:44:26,999]  [INFO ]  [kafka-request-handler-6]
> > > > > [kafka.server.ReplicaFetcherManager]  [ReplicaFetcherManager on
> > broker
> > > 1]
> > > > > Removed fetcher for partitions [EventServiceUpsertTopic,13]
> > > > >
> > > > > [2014-07-24 14:44:27,000]  [DEBUG]  [kafka-request-handler-6]
> > > > > [kafka.cluster.Partition]  Partition [EventServiceUpsertTopic,13]
> on
> > > > broker
> > > > > 1: Old hw for partition [EventServiceUpsertTopic,13] is 970. New hw
> > is
> > > > -1.
> > > > > All leo's are -1,971
> > > > >
> > > > > [2014-07-24 14:44:27,098]  [DEBUG]  [kafka-request-handler-3]
> > > > > [kafka.server.KafkaApis]  [KafkaApi-1] Maybe update partition HW
> due
> > to
> > > > > fetch request: Name: FetchRequest; Version: 0; CorrelationId: 1;
> > > > ClientId:
> > > > > ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes:
> 1
> > > > bytes;
> > > > > RequestInfo: [EventServiceUpsertTopic,13] ->
> > > > > PartitionFetchInfo(971,1048576), <Snipped>
> > > > >
> > > > > [2014-07-24 14:44:27,098]  [DEBUG]  [kafka-request-handler-3]
> > > > > [kafka.cluster.Partition]  Partition [EventServiceUpsertTopic,13]
> on
> > > > broker
> > > > > 1: Recording follower 0 position 971 for partition [
> > > > > EventServiceUpsertTopic,13].
> > > > >
> > > > > [2014-07-24 14:44:27,100]  [DEBUG]  [kafka-request-handler-3]
> > > > > [kafka.cluster.Partition]  Partition [EventServiceUpsertTopic,13]
> on
> > > > broker
> > > > > 1: Highwatermark for partition [EventServiceUpsertTopic,13] updated
> > to
> > > > 971
> > > > > 5. Consumer is none the wiser. All data that was in offsets 972-975
> > > > doesn't
> > > > > show up!
> > > > >
> > > > > I tried this with 2 initial replicas, and adding a 3rd which is
> > > supposed
> > > > to
> > > > > be the leader for some new partitions, and this problem also
> happens
> > > > there.
> > > > > The log on the old leader gets truncated to the offset on the new
> > > leader.
> > > > > What's the solution? Can I make a new broker leader for partitions
> > that
> > > > are
> > > > > currently active without losing data?
> > > > >
> > > > > Thanks,
> > > > > Jad.
> > > > >
> > > > > --
> > > > >  *Jad Naous* | Engineering | AppDynamics
> > > > >  <http://www.appdynamics.com>
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> > >
> > >
> > > --
> > >  *Jad Naous* | Engineering | AppDynamics
> > >  <http://www.appdynamics.com>
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
>  *Jad Naous* | Engineering | AppDynamics
>  <http://www.appdynamics.com>
>



-- 
-- Guozhang

Reply via email to