HW is updated as to the offset that the messages have been committed to all
replicas. This is only updated by the leader, when it receives the fetch
requests from other follower replicas, to the position of the minimum
starting offsets of the fetch requests. For producer.ack=-1, the leader
will only return the response once it knows the HW has been updated to be
larger than the produce end offset.

Guozhang


On Fri, Jul 25, 2014 at 9:36 AM, Jad Naous <jad.na...@appdynamics.com>
wrote:

> Hi Guozhang,
>
> I apologize for my misunderstanding, I would really like to understand this
> thoroughly. When/how is the HW set, and how does that interact with acks
> being sent to the producer? Is it that the hw sets the offset for messages
> for which acks have been sent, and so a replica only becomes in-sync if it
> has caught up with all the messages that have been acked?
>
> Thanks,
> Jad.
>
>
>
> On Fri, Jul 25, 2014 at 8:19 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Jad,
> >
> > A follower replica can join ISR only when it has caught up to HW, which
> in
> > this case would be the end of the leader replica. So in that scenario it
> > should still be no data loss.
> >
> >
> > On Thu, Jul 24, 2014 at 7:48 PM, Jad Naous <jad.na...@appdynamics.com>
> > wrote:
> >
> > > Actually, is the following scenario possible?
> > > - We start off with only 1 replica (the leader)
> > > - the producer continuously sends messages
> > > - a new replica (the preferred one) comes online
> > > - it becomes an ISR just after an ack is sent to the producer
> > > - the new replica gets elected as the new leader, but it's not fully
> > caught
> > > up to the old leader and then we lose the last message...
> > >
> > >
> > >
> > > On Thu, Jul 24, 2014 at 6:29 PM, Jad Naous <jad.na...@appdynamics.com>
> > > wrote:
> > >
> > > > Ah yes. OK, thanks! So it seems like we should only manually trigger
> > > > re-election after seeing that all replicas are in the ISR. Is there a
> > bug
> > > > to follow this up?
> > > >
> > > > Thanks,
> > > > Jad.
> > > >
> > > >
> > > > On Thu, Jul 24, 2014 at 6:27 PM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > >> With ack=-1 all messages produced to leader must have been acked by
> > all
> > > >> replicas to respond. So that will not cause data loss.
> > > >>
> > > >>
> > > >> On Thu, Jul 24, 2014 at 6:07 PM, Jad Naous <
> jad.na...@appdynamics.com
> > >
> > > >> wrote:
> > > >>
> > > >> > Hi Guozhang,
> > > >> >
> > > >> > Isn't it also possible to lose messages even if the preferred
> leader
> > > is
> > > >> in
> > > >> > the ISR, when the current leader is ahead by a few messages, but
> the
> > > >> > preferred leader still has not caught up?
> > > >> >
> > > >> > Thanks,
> > > >> > Jad.
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Thu, Jul 24, 2014 at 4:59 PM, Guozhang Wang <
> wangg...@gmail.com>
> > > >> wrote:
> > > >> >
> > > >> > > Hi Jad,
> > > >> > >
> > > >> > > Thanks for bring this up. It seems to be a valid issue: in the
> > > current
> > > >> > auto
> > > >> > > leader rebalancer thread's logic, if the imbalance ratio
> threshold
> > > is
> > > >> > > violated, then it will trigger the preferred leader election
> > whether
> > > >> or
> > > >> > not
> > > >> > > the preferred leader is in ISR or not.
> > > >> > >
> > > >> > > Guozhang
> > > >> > >
> > > >> > >
> > > >> > > On Thu, Jul 24, 2014 at 4:21 PM, Jad Naous <
> > > jad.na...@appdynamics.com
> > > >> >
> > > >> > > wrote:
> > > >> > >
> > > >> > > > Hi,
> > > >> > > >
> > > >> > > > I have a test that continuously sends messages to one broker,
> > > >> brings up
> > > >> > > > another broker, and adds it as a replica for all partitions,
> > with
> > > it
> > > >> > > being
> > > >> > > > the preferred replica for some. I have
> > > >> > auto.leader.rebalance.enable=true,
> > > >> > > > so replica election gets triggered. Data is being pumped to
> the
> > > old
> > > >> > > broker
> > > >> > > > all the while. It seems that some data gets lost while
> switching
> > > >> over
> > > >> > to
> > > >> > > > the new leader. Is this a bug, or do I have something
> > > >> misconfigured? I
> > > >> > > also
> > > >> > > > have request.required.acks=-1 on the producer.
> > > >> > > >
> > > >> > > > Here's what I think is happening:
> > > >> > > >
> > > >> > > > 1. Producer writes message to broker 0,
> > > >> [EventServiceUpsertTopic,13],
> > > >> > w/
> > > >> > > > broker 0 currently leader, with ISR=(0), so write returns
> > > >> successfully,
> > > >> > > > even when acks = -1. Correlation id 35836
> > > >> > > >
> > > >> > > > Producer log:
> > > >> > > >
> > > >> > > > [2014-07-24 14:44:26,991]  [DEBUG]  [dw-97 - PATCH
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1]
> > > >> > > > [kafka.producer.BrokerPartitionInfo]  Partition
> > > >> > > > [EventServiceUpsertTopic,13] has leader 0
> > > >> > > >
> > > >> > > > [2014-07-24 14:44:26,993]  [DEBUG]  [dw-97 - PATCH
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1]
> > > >> > > > [k.producer.async.DefaultEventHandler]  Producer sent messages
> > > with
> > > >> > > > correlation id 35836 for topics [EventServiceUpsertTopic,13]
> to
> > > >> broker
> > > >> > 0
> > > >> > > on
> > > >> > > > localhost:56821
> > > >> > > > 2. Broker 1 is still catching up
> > > >> > > >
> > > >> > > > Broker 0 Log:
> > > >> > > >
> > > >> > > > [2014-07-24 14:44:26,992]  [DEBUG]  [kafka-request-handler-3]
> > > >> > > > [kafka.cluster.Partition]  Partition
> > [EventServiceUpsertTopic,13]
> > > on
> > > >> > > broker
> > > >> > > > 0: Old hw for partition [EventServiceUpsertTopic,13] is 971.
> New
> > > hw
> > > >> is
> > > >> > > 971.
> > > >> > > > All leo's are 975,971
> > > >> > > >
> > > >> > > > [2014-07-24 14:44:26,992]  [DEBUG]  [kafka-request-handler-3]
> > > >> > > > [kafka.server.KafkaApis]  [KafkaApi-0] Produce to local log
> in 0
> > > ms
> > > >> > > >
> > > >> > > > [2014-07-24 14:44:26,992]  [DEBUG]  [kafka-processor-56821-0]
> > > >> > > > [kafka.request.logger]  Completed request:Name:
> ProducerRequest;
> > > >> > Version:
> > > >> > > > 0; CorrelationId: 35836; ClientId: ; RequiredAcks: -1;
> > > AckTimeoutMs:
> > > >> > > 10000
> > > >> > > > ms from client /127.0.0.1:57086
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> ;totalTime:0,requestQueueTime:0,localTime:0,remoteTime:0,responseQueueTime:0,sendTime:0
> > > >> > > > 3. Leader election is triggered by the scheduler:
> > > >> > > >
> > > >> > > > Broker 0 Log:
> > > >> > > >
> > > >> > > > [2014-07-24 14:44:26,991]  [INFO ]  [kafka-scheduler-0]
> > > >> > > > [k.c.PreferredReplicaPartitionLeaderSelector]
> > > >> > > > [PreferredReplicaPartitionLeaderSelector]: Current leader 0
> for
> > > >> > > partition [
> > > >> > > > EventServiceUpsertTopic,13] is not the preferred replica.
> > > Trigerring
> > > >> > > > preferred replica leader election
> > > >> > > >
> > > >> > > > [2014-07-24 14:44:26,993]  [DEBUG]  [kafka-scheduler-0]
> > > >> > > > [kafka.utils.ZkUtils$]  Conditional update of path
> > > >> > > > /brokers/topics/EventServiceUpsertTopic/partitions/13/state
> with
> > > >> value
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> {"controller_epoch":1,"leader":1,"version":1,"leader_epoch":3,"isr":[0,1]}
> > > >> > > > and expected version 3 succeeded, returning the new version: 4
> > > >> > > >
> > > >> > > > [2014-07-24 14:44:26,994]  [DEBUG]  [kafka-scheduler-0]
> > > >> > > > [k.controller.PartitionStateMachine]  [Partition state machine
> > on
> > > >> > > > Controller 0]: After leader election, leader cache is updated
> to
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> Map(<Snipped>(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),<EndSnip>)
> > > >> > > >
> > > >> > > > [2014-07-24 14:44:26,994]  [INFO ]  [kafka-scheduler-0]
> > > >> > > > [kafka.controller.KafkaController]  [Controller 0]: Partition
> [
> > > >> > > > EventServiceUpsertTopic,13] completed preferred replica leader
> > > >> > election.
> > > >> > > > New leader is 1
> > > >> > > > 4. Broker 1 is still behind, but it sets the high water mark
> to
> > > >> 971!!!
> > > >> > > >
> > > >> > > > Broker 1 Log:
> > > >> > > >
> > > >> > > > [2014-07-24 14:44:26,999]  [INFO ]  [kafka-request-handler-6]
> > > >> > > > [kafka.server.ReplicaFetcherManager]  [ReplicaFetcherManager
> on
> > > >> broker
> > > >> > 1]
> > > >> > > > Removed fetcher for partitions [EventServiceUpsertTopic,13]
> > > >> > > >
> > > >> > > > [2014-07-24 14:44:27,000]  [DEBUG]  [kafka-request-handler-6]
> > > >> > > > [kafka.cluster.Partition]  Partition
> > [EventServiceUpsertTopic,13]
> > > on
> > > >> > > broker
> > > >> > > > 1: Old hw for partition [EventServiceUpsertTopic,13] is 970.
> New
> > > hw
> > > >> is
> > > >> > > -1.
> > > >> > > > All leo's are -1,971
> > > >> > > >
> > > >> > > > [2014-07-24 14:44:27,098]  [DEBUG]  [kafka-request-handler-3]
> > > >> > > > [kafka.server.KafkaApis]  [KafkaApi-1] Maybe update partition
> HW
> > > >> due to
> > > >> > > > fetch request: Name: FetchRequest; Version: 0; CorrelationId:
> 1;
> > > >> > > ClientId:
> > > >> > > > ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 ms;
> > > MinBytes: 1
> > > >> > > bytes;
> > > >> > > > RequestInfo: [EventServiceUpsertTopic,13] ->
> > > >> > > > PartitionFetchInfo(971,1048576), <Snipped>
> > > >> > > >
> > > >> > > > [2014-07-24 14:44:27,098]  [DEBUG]  [kafka-request-handler-3]
> > > >> > > > [kafka.cluster.Partition]  Partition
> > [EventServiceUpsertTopic,13]
> > > on
> > > >> > > broker
> > > >> > > > 1: Recording follower 0 position 971 for partition [
> > > >> > > > EventServiceUpsertTopic,13].
> > > >> > > >
> > > >> > > > [2014-07-24 14:44:27,100]  [DEBUG]  [kafka-request-handler-3]
> > > >> > > > [kafka.cluster.Partition]  Partition
> > [EventServiceUpsertTopic,13]
> > > on
> > > >> > > broker
> > > >> > > > 1: Highwatermark for partition [EventServiceUpsertTopic,13]
> > > updated
> > > >> to
> > > >> > > 971
> > > >> > > > 5. Consumer is none the wiser. All data that was in offsets
> > > 972-975
> > > >> > > doesn't
> > > >> > > > show up!
> > > >> > > >
> > > >> > > > I tried this with 2 initial replicas, and adding a 3rd which
> is
> > > >> > supposed
> > > >> > > to
> > > >> > > > be the leader for some new partitions, and this problem also
> > > happens
> > > >> > > there.
> > > >> > > > The log on the old leader gets truncated to the offset on the
> > new
> > > >> > leader.
> > > >> > > > What's the solution? Can I make a new broker leader for
> > partitions
> > > >> that
> > > >> > > are
> > > >> > > > currently active without losing data?
> > > >> > > >
> > > >> > > > Thanks,
> > > >> > > > Jad.
> > > >> > > >
> > > >> > > > --
> > > >> > > >  *Jad Naous* | Engineering | AppDynamics
> > > >> > > >  <http://www.appdynamics.com>
> > > >> > > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > --
> > > >> > > -- Guozhang
> > > >> > >
> > > >> >
> > > >> >
> > > >> >
> > > >> > --
> > > >> >  *Jad Naous* | Engineering | AppDynamics
> > > >> >  <http://www.appdynamics.com>
> > > >> >
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> -- Guozhang
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > >  *Jad Naous* | Engineering | AppDynamics
> > > >  <http://www.appdynamics.com>
> > > >
> > >
> > >
> > >
> > > --
> > >  *Jad Naous* | Engineering | AppDynamics
> > >  <http://www.appdynamics.com>
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
>  *Jad Naous* | Engineering | AppDynamics
>  <http://www.appdynamics.com>
>



-- 
-- Guozhang

Reply via email to