Thanks for the great explanation! So I guess for now, it's not safe to do leader election for rebalancing leadership? Or under what conditions can I trigger re-election without losing messages? It seems like this issue can always happen.
On Fri, Jul 25, 2014 at 5:09 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Jad, > > The reason why the new leader needs to truncate to HW is to that only > messages that are appended to all the ISR replicas and hence whose offset > is within the HW are treated as "committed". Blindly follow the largest LEO > may cause inconsistency between replicas and between servers and clients. > Consider these two scenarios: > > First case: inconsistency between replicas > > 1) currently replica 1 is the current leader > > replica 1: m1 m2 m3 > replica 2: m1 m2 > > 2) replica 1 fails, and replica 2 becomes the new leader and accept > messages 4 and 5: > > replica 1: m1 m2 m3 > replica 2: m1 m2 m4 m5 > > 3) replica 1 resumes, and does not truncate to HW, then it will still > maintain m3, which is actually never "committed". Say leader moves to > replica 1 again, we can ended up with: > > replica 1: m1 m2 m3 m6 > replica 2: m1 m2 m4 m5 > > Second case: inconsistency between server and clients: > > 1) producer send message m3 with ack=-1: > > replica 1: m1 m2 m3 > replica 2: m1 m2 m3 > replica 3: m1 m2 > > 2) the response is held until all replicas also gets m3, say at this time > current leader replica 1 fails and replica 3 re-elects. If replica 3 gets > up to the largest LEO it will also get m3. > > replica 2: m1 m2 m3 > replica 3: m1 m2 m3 > > 3) But m3 is not actually "committed" by the time replica 1 fails; when > producer gets the error at the time replica 1 fails, it will think that m3 > was not successfully sent, so retry sending m3: > > replica 2: m1 m2 m3 m3 > replica 3: m1 m2 m3 m3 > > So in a word, only messages below the HW are safe to be maintained upon > leader migration. The issue here though, is that follower's HW update is > slightly late the leader's HW update, and hence the truncation may cause > data loss. We will try to fix this issue in KAFKA-1211. > > Guozhang > > > On Fri, Jul 25, 2014 at 4:12 PM, Jad Naous <jad.na...@appdynamics.com> > wrote: > > > Hi Guozhang, > > > > Yes, I think they are related. > > > > It seems odd to me that there should be any truncation at all since that > is > > always an opportunity for data loss. It seems like we would want to avoid > > that at all costs, assuming we uphold the invariant that messages > committed > > to an offset on any replica will always be identical. There seems to be > two > > choices to make sure we don't lose data during leader election: > > > > 1- after leader election, requests going to the new leader should block > > until the leader is caught up to the highest LEO in the set of replicas > (or > > until the HW reaches the highest LEO among replicas). I think this is > less > > "normal" since it would mean that the new leader has to fetch messages > from > > a non-leader. Maybe you can have the notion of a write-leader and a > > read-leader. After leader election, the write-leader becomes whatever the > > preferred leader today is, and the read-leader becomes the replica with > the > > highest LEO. If the write-leader is not also the read-leader, then writes > > are blocked. All replicas and consumers fetch from the read-leader, and > > when the write-leader is caught up to the read-leader, the write-leader > > also becomes the read-leader and writes are unblocked. > > > > 2- After leader election, block actual leadership switching and block > > responding to new requests until the HW reaches the LEO on the old > leader, > > then switch leadership, failing the blocked requests (or signaling a > retry > > for a new leader), which should then retry the writes on the new leader, > > and things proceed as normal. > > > > Would either of these work? > > > > Thanks, > > Jad. > > > > > > > > > > On Fri, Jul 25, 2014 at 3:37 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hi Jad, > > > > > > Yes. In this case I think you are actually hitting KAFKA-1211. The > > summary > > > of the issue is that, it takes one more fetch request round trip for > the > > > follower replica to advance the HW after the leader has advanced HW. So > > for > > > your case, the whole process is like this: > > > > > > 1. leader LEO at 10, follower LEO at 8. Both leader and follower knows > > the > > > LEO is at 8. > > > 2. Follower fetch data on Leader starting at 8, leader records its LEO > as > > > 8. > > > 3. Follower gets 9 and 10 and append to its local log. > > > 4. Follower fetch data on Leader starting at 10, leader records its LEO > > as > > > 10; now leader knows follower has caught up, it advances its HW to 10 > and > > > adds the follower to ISR (but follower does not know that yet! It still > > > think the HW is 8). > > > 5. Leader's fetch response gets back to follower, and now the follower > > > knows that HW has been updated to 10. > > > > > > And let's say there is a leader election between step 4) and 5), for > your > > > case it is due to preferred leader election, but it could also be that > > > current leader fails, etc. Then on becoming the new leader the follower > > > will truncate its data to 8, which is the HW it knows. Hence the data > > loss. > > > > > > The proposed solution in KAFKA-1211 will tackle this issue. > > > > > > Guozhang > > > > > > > > > On Fri, Jul 25, 2014 at 2:48 PM, Jad Naous <jad.na...@appdynamics.com> > > > wrote: > > > > > > > Hi Guozhang, > > > > > > > > Yes, broker 1 is in the ISR (in fact, I wait for broker 1 to be in > the > > > ISR > > > > before triggering election). However, I think there is something > still > > > > amiss. I still see data loss. Here are some relevant log lines from > > > broker > > > > 0 (different test run). The log on broker 0 is getting truncated, > > losing > > > > some messages. > > > > > > > > [2014-07-25 10:40:02,134] [DEBUG] [kafka-request-handler-5] > > > > [kafka.cluster.Partition] Partition [EventServiceUpsertTopic,19] on > > > broker > > > > 0: Old hw for partition [EventServiceUpsertTopic,19] is 8082. New hw > is > > > > 8082. All leo's are 8111,8082 > > > > > > > > [2014-07-25 10:40:02,134] [DEBUG] [kafka-request-handler-5] > > > > [kafka.server.KafkaApis] [KafkaApi-0] Produce to local log in 1 ms > > > > > > > > [2014-07-25 10:40:02,134] [DEBUG] > [main-SendThread(localhost:49893)] > > > > [org.apache.zookeeper.ClientCnxn] Reading reply > > > > sessionid:0x476e9a9e9a0001, packet:: clientPath:null serverPath:null > > > > finished:false header:: 729,5 replyHeader:: 729,4294968217,0 > > request:: > > > > > > > > > > > > > > '/brokers/topics/EventServiceUpsertTopic/partitions/5/state,#7b22636f6e74726f6c6c65725f65706f6368223a312c226c6561646572223a312c2276657273696f6e223a312c226c65616465725f65706f6368223a332c22697372223a5b302c315d7d,3 > > > > response:: > > > > > > > > > > > > > > s{4294967416,4294968217,1406309966419,1406310002132,4,0,0,0,74,0,4294967416} > > > > > > > > [2014-07-25 10:40:02,134] [DEBUG] [kafka-processor-49917-2] > > > > [kafka.request.logger] Completed request:Name: ProducerRequest; > > Version: > > > > 0; CorrelationId: 16248; ClientId: ; RequiredAcks: -1; AckTimeoutMs: > > > 10000 > > > > ms from client /127.0.0.1:50168 > > > > > > > > > > > > > > ;totalTime:1,requestQueueTime:0,localTime:1,remoteTime:0,responseQueueTime:0,sendTime:0 > > > > > > > > [2014-07-25 10:40:02,134] [DEBUG] > > > > > > [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899] > > > > [kafka.utils.ZkUtils$] Conditional update of path > > > > /brokers/topics/EventServiceUpsertTopic/partitions/5/state with value > > > > > > > > > > {"controller_epoch":1,"leader":1,"version":1,"leader_epoch":3,"isr":[0,1]} > > > > and expected version 3 succeeded, returning the new version: 4 > > > > > > > > [2014-07-25 10:40:02,136] [DEBUG] > > > > > > [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899] > > > > [k.controller.PartitionStateMachine] [Partition state machine on > > > > Controller 0]: After leader election, leader cache is updated to > > > > Map([EventServiceUpsertTopic,18] -> > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), > > > > [EventServiceUpsertTopic,4] -> > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), > > > > [EventServiceUpsertTopic,7] -> > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), > > > > [EventServiceUpsertTopic,17] -> > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), > > > > [EventServiceUpsertTopic,16] -> > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), > > > > [EventServiceUpsertTopic,5] -> > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), > > > > [EventServiceUpsertTopic,10] -> > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), > > > > [EventServiceUpsertTopic,1] -> > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), > > > > [EventServiceUpsertTopic,13] -> > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), > > > > [EventServiceUpsertTopic,9] -> > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), > > > > [EventServiceUpsertTopic,8] -> > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), > > > > [EventServiceUpsertTopic,11] -> > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), > > > > [EventServiceUpsertTopic,6] -> > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), > > > > [EventServiceUpsertTopic,12] -> > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), > > > > [EventServiceUpsertTopic,2] -> > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), > > > > [EventServiceUpsertTopic,3] -> > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), > > > > [EventServiceUpsertTopic,14] -> > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), [ > > > > EventServiceUpsertTopic,19] -> > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1), > > > > [EventServiceUpsertTopic,0] -> > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), > > > > [EventServiceUpsertTopic,15] -> > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1)) > > > > > > > > [2014-07-25 10:40:02,136] [DEBUG] [kafka-request-handler-1] > > > > [kafka.cluster.Partition] Partition [EventServiceUpsertTopic,19] on > > > broker > > > > 0: Old hw for partition [EventServiceUpsertTopic,19] is 8082. New hw > is > > > > 8082. All leo's are 8112,8082 > > > > > > > > [2014-07-25 10:40:02,136] [DEBUG] [kafka-request-handler-1] > > > > [kafka.server.KafkaApis] [KafkaApi-0] Produce to local log in 0 ms > > > > > > > > [2014-07-25 10:40:02,137] [DEBUG] [kafka-processor-49917-2] > > > > [kafka.request.logger] Completed request:Name: ProducerRequest; > > Version: > > > > 0; CorrelationId: 16250; ClientId: ; RequiredAcks: -1; AckTimeoutMs: > > > 10000 > > > > ms from client /127.0.0.1:50168 > > > > > > > > > > > > > > ;totalTime:1,requestQueueTime:0,localTime:1,remoteTime:0,responseQueueTime:0,sendTime:0 > > > > > > > > [2014-07-25 10:40:02,140] [INFO ] > > > > > > [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899] > > > > [kafka.controller.KafkaController] [Controller 0]: Partition [ > > > > EventServiceUpsertTopic,19] completed preferred replica leader > > election. > > > > New leader is 1 > > > > > > > > [2014-07-25 10:40:02,140] [INFO ] > > > > > > [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899] > > > > [kafka.controller.KafkaController] [Controller 0]: Partition > > > > [EventServiceUpsertTopic,5] completed preferred replica leader > > election. > > > > New leader is 1 > > > > > > > > [2014-07-25 10:40:02,142] [DEBUG] > [main-SendThread(localhost:49893)] > > > > [org.apache.zookeeper.ClientCnxn] Got notification > > > > sessionid:0x476e9a9e9a0001 > > > > > > > > [2014-07-25 10:40:02,142] [DEBUG] > [main-SendThread(localhost:49893)] > > > > [org.apache.zookeeper.ClientCnxn] Got WatchedEvent > state:SyncConnected > > > > type:NodeDeleted path:/admin/preferred_replica_election for sessionid > > > > 0x476e9a9e9a0001 > > > > > > > > [2014-07-25 10:40:02,143] [DEBUG] > [main-SendThread(localhost:49893)] > > > > [org.apache.zookeeper.ClientCnxn] Reading reply > > > > sessionid:0x476e9a9e9a0001, packet:: clientPath:null serverPath:null > > > > finished:false header:: 730,2 replyHeader:: 730,4294968218,0 > > request:: > > > > '/admin/preferred_replica_election,-1 response:: null > > > > > > > > [2014-07-25 10:40:02,145] [INFO ] [kafka-request-handler-0] > > > > [kafka.server.ReplicaFetcherManager] [ReplicaFetcherManager on > broker > > 0] > > > > Removed fetcher for partitions > > > > > > > > > > > > > > [EventServiceUpsertTopic,13],[EventServiceUpsertTopic,11],[EventServiceUpsertTopic,17],[EventServiceUpsertTopic,7],[EventServiceUpsertTopic,9],[EventServiceUpsertTopic,1],[EventServiceUpsertTopic,15],[ > > > > EventServiceUpsertTopic,19 > > > > ],[EventServiceUpsertTopic,3],[EventServiceUpsertTopic,5] > > > > > > > > [2014-07-25 10:40:02,153] [INFO ] [kafka-request-handler-0] > > > > [kafka.log.Log] Truncating log EventServiceUpsertTopic-19 to offset > > > 8082. > > > > > > > > [2014-07-25 10:40:02,159] [INFO ] [kafka-request-handler-0] > > > > [kafka.log.Log] Truncating log EventServiceUpsertTopic-5 to offset > 0. > > > > > > > > > > > > I have removed some irrelevant lines. As you can see, the log for > > > > EventServiceUpsertTopic-19 is being truncated, losing messages that > > have > > > > not yet been replicated to broker 1. I'm not sure exactly what the > > issue > > > > is. Maybe new requests should be completely blocked after leader > > election > > > > until the new leader catches up to the messages that have been acked > > and > > > it > > > > has not yet received. > > > > > > > > Thanks, > > > > > > > > Jad. > > > > > > > > > > > > On Fri, Jul 25, 2014 at 2:14 PM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > Hello Jad, > > > > > > > > > > I double-checked the source code again, and found that actually the > > > > > preferred leader elector does consider whether the selected replica > > is > > > in > > > > > ISR or not. This means that by the time the election is triggered, > > > > broker 1 > > > > > is added to ISR by broker 0. Could you check before step 3, is > there > > > any > > > > > log entries on broker 0 adding broker 1 to ISR or updating HW to > 975? > > > > > > > > > > > > > > > On Thu, Jul 24, 2014 at 4:21 PM, Jad Naous < > > jad.na...@appdynamics.com> > > > > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > I have a test that continuously sends messages to one broker, > > brings > > > up > > > > > > another broker, and adds it as a replica for all partitions, with > > it > > > > > being > > > > > > the preferred replica for some. I have > > > > auto.leader.rebalance.enable=true, > > > > > > so replica election gets triggered. Data is being pumped to the > old > > > > > broker > > > > > > all the while. It seems that some data gets lost while switching > > over > > > > to > > > > > > the new leader. Is this a bug, or do I have something > > misconfigured? > > > I > > > > > also > > > > > > have request.required.acks=-1 on the producer. > > > > > > > > > > > > Here's what I think is happening: > > > > > > > > > > > > 1. Producer writes message to broker 0, > > [EventServiceUpsertTopic,13], > > > > w/ > > > > > > broker 0 currently leader, with ISR=(0), so write returns > > > successfully, > > > > > > even when acks = -1. Correlation id 35836 > > > > > > > > > > > > Producer log: > > > > > > > > > > > > [2014-07-24 14:44:26,991] [DEBUG] [dw-97 - PATCH > > > > > > > > > > > > > > > > > > > > > > > > > > > /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1] > > > > > > [kafka.producer.BrokerPartitionInfo] Partition > > > > > > [EventServiceUpsertTopic,13] has leader 0 > > > > > > > > > > > > [2014-07-24 14:44:26,993] [DEBUG] [dw-97 - PATCH > > > > > > > > > > > > > > > > > > > > > > > > > > > /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1] > > > > > > [k.producer.async.DefaultEventHandler] Producer sent messages > with > > > > > > correlation id 35836 for topics [EventServiceUpsertTopic,13] to > > > broker > > > > 0 > > > > > on > > > > > > localhost:56821 > > > > > > 2. Broker 1 is still catching up > > > > > > > > > > > > Broker 0 Log: > > > > > > > > > > > > [2014-07-24 14:44:26,992] [DEBUG] [kafka-request-handler-3] > > > > > > [kafka.cluster.Partition] Partition [EventServiceUpsertTopic,13] > > on > > > > > broker > > > > > > 0: Old hw for partition [EventServiceUpsertTopic,13] is 971. New > hw > > > is > > > > > 971. > > > > > > All leo's are 975,971 > > > > > > > > > > > > [2014-07-24 14:44:26,992] [DEBUG] [kafka-request-handler-3] > > > > > > [kafka.server.KafkaApis] [KafkaApi-0] Produce to local log in 0 > ms > > > > > > > > > > > > [2014-07-24 14:44:26,992] [DEBUG] [kafka-processor-56821-0] > > > > > > [kafka.request.logger] Completed request:Name: ProducerRequest; > > > > Version: > > > > > > 0; CorrelationId: 35836; ClientId: ; RequiredAcks: -1; > > AckTimeoutMs: > > > > > 10000 > > > > > > ms from client /127.0.0.1:57086 > > > > > > > > > > > > > > > > > > > > > > > > > > > ;totalTime:0,requestQueueTime:0,localTime:0,remoteTime:0,responseQueueTime:0,sendTime:0 > > > > > > 3. Leader election is triggered by the scheduler: > > > > > > > > > > > > Broker 0 Log: > > > > > > > > > > > > [2014-07-24 14:44:26,991] [INFO ] [kafka-scheduler-0] > > > > > > [k.c.PreferredReplicaPartitionLeaderSelector] > > > > > > [PreferredReplicaPartitionLeaderSelector]: Current leader 0 for > > > > > partition [ > > > > > > EventServiceUpsertTopic,13] is not the preferred replica. > > Trigerring > > > > > > preferred replica leader election > > > > > > > > > > > > [2014-07-24 14:44:26,993] [DEBUG] [kafka-scheduler-0] > > > > > > [kafka.utils.ZkUtils$] Conditional update of path > > > > > > /brokers/topics/EventServiceUpsertTopic/partitions/13/state with > > > value > > > > > > > > > > > > > > > > > > > > > {"controller_epoch":1,"leader":1,"version":1,"leader_epoch":3,"isr":[0,1]} > > > > > > and expected version 3 succeeded, returning the new version: 4 > > > > > > > > > > > > [2014-07-24 14:44:26,994] [DEBUG] [kafka-scheduler-0] > > > > > > [k.controller.PartitionStateMachine] [Partition state machine on > > > > > > Controller 0]: After leader election, leader cache is updated to > > > > > > > > > > > > > > > > > > > > > Map(<Snipped>(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),<EndSnip>) > > > > > > > > > > > > [2014-07-24 14:44:26,994] [INFO ] [kafka-scheduler-0] > > > > > > [kafka.controller.KafkaController] [Controller 0]: Partition [ > > > > > > EventServiceUpsertTopic,13] completed preferred replica leader > > > > election. > > > > > > New leader is 1 > > > > > > 4. Broker 1 is still behind, but it sets the high water mark to > > > 971!!! > > > > > > > > > > > > Broker 1 Log: > > > > > > > > > > > > [2014-07-24 14:44:26,999] [INFO ] [kafka-request-handler-6] > > > > > > [kafka.server.ReplicaFetcherManager] [ReplicaFetcherManager on > > > broker > > > > 1] > > > > > > Removed fetcher for partitions [EventServiceUpsertTopic,13] > > > > > > > > > > > > [2014-07-24 14:44:27,000] [DEBUG] [kafka-request-handler-6] > > > > > > [kafka.cluster.Partition] Partition [EventServiceUpsertTopic,13] > > on > > > > > broker > > > > > > 1: Old hw for partition [EventServiceUpsertTopic,13] is 970. New > hw > > > is > > > > > -1. > > > > > > All leo's are -1,971 > > > > > > > > > > > > [2014-07-24 14:44:27,098] [DEBUG] [kafka-request-handler-3] > > > > > > [kafka.server.KafkaApis] [KafkaApi-1] Maybe update partition HW > > due > > > to > > > > > > fetch request: Name: FetchRequest; Version: 0; CorrelationId: 1; > > > > > ClientId: > > > > > > ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 ms; > MinBytes: > > 1 > > > > > bytes; > > > > > > RequestInfo: [EventServiceUpsertTopic,13] -> > > > > > > PartitionFetchInfo(971,1048576), <Snipped> > > > > > > > > > > > > [2014-07-24 14:44:27,098] [DEBUG] [kafka-request-handler-3] > > > > > > [kafka.cluster.Partition] Partition [EventServiceUpsertTopic,13] > > on > > > > > broker > > > > > > 1: Recording follower 0 position 971 for partition [ > > > > > > EventServiceUpsertTopic,13]. > > > > > > > > > > > > [2014-07-24 14:44:27,100] [DEBUG] [kafka-request-handler-3] > > > > > > [kafka.cluster.Partition] Partition [EventServiceUpsertTopic,13] > > on > > > > > broker > > > > > > 1: Highwatermark for partition [EventServiceUpsertTopic,13] > updated > > > to > > > > > 971 > > > > > > 5. Consumer is none the wiser. All data that was in offsets > 972-975 > > > > > doesn't > > > > > > show up! > > > > > > > > > > > > I tried this with 2 initial replicas, and adding a 3rd which is > > > > supposed > > > > > to > > > > > > be the leader for some new partitions, and this problem also > > happens > > > > > there. > > > > > > The log on the old leader gets truncated to the offset on the new > > > > leader. > > > > > > What's the solution? Can I make a new broker leader for > partitions > > > that > > > > > are > > > > > > currently active without losing data? > > > > > > > > > > > > Thanks, > > > > > > Jad. > > > > > > > > > > > > -- > > > > > > *Jad Naous* | Engineering | AppDynamics > > > > > > <http://www.appdynamics.com> > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > -- > > > > *Jad Naous* | Engineering | AppDynamics > > > > <http://www.appdynamics.com> > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > > > -- > > *Jad Naous* | Engineering | AppDynamics > > <http://www.appdynamics.com> > > > > > > -- > -- Guozhang > -- *Jad Naous* | Engineering | AppDynamics <http://www.appdynamics.com>