Yes.

On Mon, Jul 28, 2014 at 5:11 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Just to confirm, are you running version 0.8.1.1?
>
> Guozhang
>
>
> On Mon, Jul 28, 2014 at 4:23 PM, Jad Naous <jad.na...@appdynamics.com>
> wrote:
>
> > Done! Thanks!
> >
> >
> > On Mon, Jul 28, 2014 at 4:16 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Thanks Jad,
> > >
> > > The mailing list may blocked the attachments. I have file a JIRA for
> your
> > > issue, could you upload the logs there?
> > >
> > > https://issues.apache.org/jira/browse/KAFKA-1561
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Jul 28, 2014 at 1:55 PM, Jad Naous <jad.na...@appdynamics.com>
> > > wrote:
> > >
> > > > The logs are quite large. I've sifted through them, and I'm attaching
> > the
> > > > logs for the relevant parts where the lost message goes through the
> > > system.
> > > > Here's what the test does:
> > > >
> > > > 0) Start two brokers, one producer, one consumer. Topic has 20
> > > partitions,
> > > > using default partitioning scheme (which seems to send data to only a
> > > > couple of partitions when the keys are null, but that doesn't matter
> > for
> > > > this test).
> > > > 1) Start a data generator sending data through Kafka continuously
> > > > 2) Start a new broker
> > > > 3) Reassign partitions: {"version": 1, "partitions":[
> > > >     {"topic":"EventServiceUpsertTopic","partition":0,
> >  "replicas":
> > > > [0, 1, 2]},
> > > >     {"topic":"EventServiceUpsertTopic","partition":1,
> >  "replicas":
> > > > [1, 2, 0]},
> > > >     {"topic":"EventServiceUpsertTopic","partition":2,
> >  "replicas":
> > > > [2, 0, 1]},
> > > >     {"topic":"EventServiceUpsertTopic","partition":3,
> >  "replicas":
> > > > [0, 1, 2]},
> > > >     {"topic":"EventServiceUpsertTopic","partition":4,
> >  "replicas":
> > > > [1, 2, 0]},
> > > >     {"topic":"EventServiceUpsertTopic","partition":5,
> >  "replicas":
> > > > [2, 0, 1]},
> > > >     {"topic":"EventServiceUpsertTopic","partition":6,
> >  "replicas":
> > > > [0, 1, 2]},
> > > >     {"topic":"EventServiceUpsertTopic","partition":7,
> >  "replicas":
> > > > [1, 2, 0]},
> > > >     {"topic":"EventServiceUpsertTopic","partition":8,
> >  "replicas":
> > > > [2, 0, 1]},
> > > >     {"topic":"EventServiceUpsertTopic","partition":9,
> >  "replicas":
> > > > [0, 1, 2]},
> > > >     {"topic":"EventServiceUpsertTopic","partition":10,
> >  "replicas":
> > > > [1, 2, 0]},
> > > >     {"topic":"EventServiceUpsertTopic","partition":11,
> >  "replicas":
> > > > [2, 0, 1]},
> > > >     {"topic":"EventServiceUpsertTopic","partition":12,
> >  "replicas":
> > > > [0, 1, 2]},
> > > >     {"topic":"EventServiceUpsertTopic","partition":13,
> >  "replicas":
> > > > [1, 2, 0]},
> > > >     {"topic":"EventServiceUpsertTopic","partition":14,
> >  "replicas":
> > > > [2, 0, 1]},
> > > >     {"topic":"EventServiceUpsertTopic","partition":15,
> >  "replicas":
> > > > [0, 1, 2]},
> > > >     {"topic":"EventServiceUpsertTopic","partition":16,
> >  "replicas":
> > > > [1, 2, 0]},
> > > >     {"topic":"EventServiceUpsertTopic","partition":17,
> >  "replicas":
> > > > [2, 0, 1]},
> > > >     {"topic":"EventServiceUpsertTopic","partition":18,
> >  "replicas":
> > > > [0, 1, 2]},
> > > >     {"topic":"EventServiceUpsertTopic","partition":19,
> >  "replicas":
> > > > [1, 2, 0]}]}
> > > > 4) Wait until reassignment is complete (i.e. until
> > > > ZkUtils.getPartitionsBeingReassigned() returns empty map)
> > > > 5) Wait until all replicas are caught up (i.e. until
> > > > ZkUtils.getLeaderAndIsrForPartition() returns all brokers in the ISR
> > for
> > > > each partition)
> > > > 6) Trigger leader re-election by calling the
> > > > PreferredReplicaLeaderElectionCommand
> > > > 7) Wait until all the leaders are the preferred leaders for
> partitions
> > > > according to the replica reassignment from step 3
> > > > 8) Stop the data generator
> > > > 9) Check that all the data was consumed
> > > >
> > > > You can seem from the producer.log that the data: {"field1": ["10"],
> > > > "idField": "id-5-59"} was sent to broker0 successfully, but the
> > consumer
> > > > never sees it.
> > > >
> > > > Thanks,
> > > > Jad.
> > > >
> > > >
> > > >
> > > > On Mon, Jul 28, 2014 at 10:33 AM, Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > >
> > > >> Also, for the initial two replica case did you see any error/warning
> > > logs
> > > >> on your producers?
> > > >>
> > > >> Guozhang
> > > >>
> > > >>
> > > >> On Mon, Jul 28, 2014 at 10:32 AM, Guozhang Wang <wangg...@gmail.com
> >
> > > >> wrote:
> > > >>
> > > >> > Hi Jad,
> > > >> >
> > > >> > Just to clarify, you also see data loss when you created the topic
> > > with
> > > >> > replica factor 2, and two replicas running, and after an auto
> leader
> > > >> > election triggered? If that is the case could you attach the logs
> of
> > > all
> > > >> > involved brokers here?
> > > >> >
> > > >> > For your second question, KAFKA-1211 is designed to handle that
> > case.
> > > >> >
> > > >> > Guozhang
> > > >> >
> > > >> >
> > > >> > On Mon, Jul 28, 2014 at 10:18 AM, Jad Naous <
> > > jad.na...@appdynamics.com>
> > > >> > wrote:
> > > >> >
> > > >> >> Guozhang,
> > > >> >>
> > > >> >> I have actually also seen this happen when there are two replicas
> > > >> >> initially. So this problem is not limited to 1 replica. The issue
> > is
> > > >> the
> > > >> >> truncation after leader election, which will also happen on the
> > > second
> > > >> >> replica.
> > > >> >>
> > > >> >> Coming back to your objections:
> > > >> >>
> > > >> >>
> > > >> >> First case: inconsistency between replicas
> > > >> >> > 1) currently replica 1 is the current leader
> > > >> >> > replica 1: m1 m2 m3
> > > >> >> > replica 2: m1 m2
> > > >> >> > 2) replica 1 fails, and replica 2 becomes the new leader and
> > accept
> > > >> >> > messages 4 and 5:
> > > >> >> > replica 1: m1 m2 m3
> > > >> >> > replica 2: m1 m2 m4 m5
> > > >> >> > 3) replica 1 resumes, and does not truncate to HW, then it will
> > > still
> > > >> >> > maintain m3, which is actually never "committed". Say leader
> > moves
> > > to
> > > >> >> > replica 1 again, we can ended up with:
> > > >> >> > replica 1: m1 m2 m3 m6
> > > >> >> > replica 2: m1 m2 m4 m5
> > > >> >>
> > > >> >>
> > > >> >> I see. So when a replica resumes, it has to truncate to the last
> HW
> > > it
> > > >> saw
> > > >> >> before it died.
> > > >> >>
> > > >> >>
> > > >> >> >
> > > >> >> > Second case: inconsistency between server and clients:
> > > >> >> > 1) producer send message m3 with ack=-1:
> > > >> >> > replica 1: m1 m2 m3
> > > >> >> > replica 2: m1 m2 m3
> > > >> >> > replica 3: m1 m2
> > > >> >> > 2) the response is held until all replicas also gets m3, say at
> > > this
> > > >> >> time
> > > >> >> > current leader replica 1 fails and replica 3 re-elects. If
> > replica
> > > 3
> > > >> >> gets
> > > >> >> > up to the largest LEO it will also get m3.
> > > >> >> > replica 2: m1 m2 m3
> > > >> >> > replica 3: m1 m2 m3
> > > >> >> > 3) But m3 is not actually "committed" by the time replica 1
> > fails;
> > > >> when
> > > >> >> > producer gets the error at the time replica 1 fails, it will
> > think
> > > >> that
> > > >> >> m3
> > > >> >> > was not successfully sent, so retry sending m3:
> > > >> >> > replica 2: m1 m2 m3 m3
> > > >> >> > replica 3: m1 m2 m3 m3
> > > >> >>
> > > >> >>
> > > >> >> So on failure, all nodes need to truncate to the HW. But if
> there's
> > > no
> > > >> >> failure, then truncating would lose data unnecessarily. Maybe
> those
> > > two
> > > >> >> scenarios need to be handled differently?
> > > >> >>
> > > >> >> Jad.
> > > >> >>
> > > >> >>
> > > >> >> On Mon, Jul 28, 2014 at 9:58 AM, Guozhang Wang <
> wangg...@gmail.com
> > >
> > > >> >> wrote:
> > > >> >>
> > > >> >> > Jun, Jad,
> > > >> >> >
> > > >> >> > I think in this case data loss can still happen, since the
> > replica
> > > >> >> factor
> > > >> >> > was previously one, and in handling the produce requests, if
> the
> > > >> server
> > > >> >> > decides that all the produced partitions have a replica factor
> > of 1
> > > >> it
> > > >> >> will
> > > >> >> > also directly send back the response instead of putting the
> > request
> > > >> into
> > > >> >> > purgatory even if currently the number of replicas is 2 (for
> > > details
> > > >> >> look
> > > >> >> > at ReplicaManager.getReplicationFactorForPartition and search
> of
> > > the
> > > >> >> usage
> > > >> >> > of Partition.replicationFactor).
> > > >> >> >
> > > >> >> > I now agree that this is not related to KAFKA-1211 but a
> > different
> > > >> small
> > > >> >> > bug. We need to probably file another JIRA for this. But I
> think
> > > >> after
> > > >> >> this
> > > >> >> > one is fixed (which should be much easier than KAFKA-1211),
> Jad's
> > > >> >> scenario
> > > >> >> > should not cause data loss anymore.
> > > >> >> >
> > > >> >> > Guozhang
> > > >> >> >
> > > >> >> >
> > > >> >> > On Sun, Jul 27, 2014 at 6:11 PM, Jad Naous <
> > > >> jad.na...@appdynamics.com>
> > > >> >> > wrote:
> > > >> >> >
> > > >> >> > > So in summary, is it true to say that currently triggering
> > leader
> > > >> >> > > reelection is not a safe operation? I have been able to
> > reproduce
> > > >> that
> > > >> >> > > message loss pretty reliably in tests. If that is the case,
> > isn't
> > > >> >> that an
> > > >> >> > > important operation in a large cluster where nodes go up and
> > > down?
> > > >> >> > > On Jul 25, 2014 10:00 PM, "Jun Rao" <jun...@gmail.com>
> wrote:
> > > >> >> > >
> > > >> >> > > > Actually, I don't think KAFKA-1211 will happen with just 2
> > > >> replicas.
> > > >> >> > > When a
> > > >> >> > > > replica becomes a leader, it never truncates its log. Only
> > > when a
> > > >> >> > replica
> > > >> >> > > > becomes follower, it truncates its log to HW. So in this
> > > >> particular
> > > >> >> > case,
> > > >> >> > > > the new leader will not truncate data to offset 8.
> > > >> >> > > >
> > > >> >> > > > Thanks,
> > > >> >> > > >
> > > >> >> > > > Jun
> > > >> >> > > >
> > > >> >> > > >
> > > >> >> > > > On Fri, Jul 25, 2014 at 3:37 PM, Guozhang Wang <
> > > >> wangg...@gmail.com>
> > > >> >> > > wrote:
> > > >> >> > > >
> > > >> >> > > > > Hi Jad,
> > > >> >> > > > >
> > > >> >> > > > > Yes. In this case I think you are actually hitting
> > > KAFKA-1211.
> > > >> The
> > > >> >> > > > summary
> > > >> >> > > > > of the issue is that, it takes one more fetch request
> round
> > > >> trip
> > > >> >> for
> > > >> >> > > the
> > > >> >> > > > > follower replica to advance the HW after the leader has
> > > >> advanced
> > > >> >> HW.
> > > >> >> > So
> > > >> >> > > > for
> > > >> >> > > > > your case, the whole process is like this:
> > > >> >> > > > >
> > > >> >> > > > > 1. leader LEO at 10, follower LEO at 8. Both leader and
> > > >> follower
> > > >> >> > knows
> > > >> >> > > > the
> > > >> >> > > > > LEO is at 8.
> > > >> >> > > > > 2. Follower fetch data on Leader starting at 8, leader
> > > records
> > > >> its
> > > >> >> > LEO
> > > >> >> > > as
> > > >> >> > > > > 8.
> > > >> >> > > > > 3. Follower gets 9 and 10 and append to its local log.
> > > >> >> > > > > 4. Follower fetch data on Leader starting at 10, leader
> > > records
> > > >> >> its
> > > >> >> > LEO
> > > >> >> > > > as
> > > >> >> > > > > 10; now leader knows follower has caught up, it advances
> > its
> > > HW
> > > >> >> to 10
> > > >> >> > > and
> > > >> >> > > > > adds the follower to ISR (but follower does not know that
> > > yet!
> > > >> It
> > > >> >> > still
> > > >> >> > > > > think the HW is 8).
> > > >> >> > > > > 5. Leader's fetch response gets back to follower, and now
> > the
> > > >> >> > follower
> > > >> >> > > > > knows that HW has been updated to 10.
> > > >> >> > > > >
> > > >> >> > > > > And let's say there is a leader election between step 4)
> > and
> > > >> 5),
> > > >> >> for
> > > >> >> > > your
> > > >> >> > > > > case it is due to preferred leader election, but it could
> > > also
> > > >> be
> > > >> >> > that
> > > >> >> > > > > current leader fails, etc. Then on becoming the new
> leader
> > > the
> > > >> >> > follower
> > > >> >> > > > > will truncate its data to 8, which is the HW it knows.
> > Hence
> > > >> the
> > > >> >> data
> > > >> >> > > > loss.
> > > >> >> > > > >
> > > >> >> > > > > The proposed solution in KAFKA-1211 will tackle this
> issue.
> > > >> >> > > > >
> > > >> >> > > > > Guozhang
> > > >> >> > > > >
> > > >> >> > > > >
> > > >> >> > > > > On Fri, Jul 25, 2014 at 2:48 PM, Jad Naous <
> > > >> >> > jad.na...@appdynamics.com>
> > > >> >> > > > > wrote:
> > > >> >> > > > >
> > > >> >> > > > > > Hi Guozhang,
> > > >> >> > > > > >
> > > >> >> > > > > > Yes, broker 1 is in the ISR (in fact, I wait for
> broker 1
> > > to
> > > >> be
> > > >> >> in
> > > >> >> > > the
> > > >> >> > > > > ISR
> > > >> >> > > > > > before triggering election). However, I think there is
> > > >> something
> > > >> >> > > still
> > > >> >> > > > > > amiss. I still see data loss. Here are some relevant
> log
> > > >> lines
> > > >> >> from
> > > >> >> > > > > broker
> > > >> >> > > > > > 0 (different test run). The log on broker 0 is getting
> > > >> >> truncated,
> > > >> >> > > > losing
> > > >> >> > > > > > some messages.
> > > >> >> > > > > >
> > > >> >> > > > > > [2014-07-25 10:40:02,134]  [DEBUG]
> > >  [kafka-request-handler-5]
> > > >> >> > > > > > [kafka.cluster.Partition]  Partition
> > > >> >> [EventServiceUpsertTopic,19]
> > > >> >> > on
> > > >> >> > > > > broker
> > > >> >> > > > > > 0: Old hw for partition [EventServiceUpsertTopic,19] is
> > > 8082.
> > > >> >> New
> > > >> >> > hw
> > > >> >> > > is
> > > >> >> > > > > > 8082. All leo's are 8111,8082
> > > >> >> > > > > >
> > > >> >> > > > > > [2014-07-25 10:40:02,134]  [DEBUG]
> > >  [kafka-request-handler-5]
> > > >> >> > > > > > [kafka.server.KafkaApis]  [KafkaApi-0] Produce to local
> > log
> > > >> in
> > > >> >> 1 ms
> > > >> >> > > > > >
> > > >> >> > > > > > [2014-07-25 10:40:02,134]  [DEBUG]
> > > >> >> > >  [main-SendThread(localhost:49893)]
> > > >> >> > > > > > [org.apache.zookeeper.ClientCnxn]  Reading reply
> > > >> >> > > > > > sessionid:0x476e9a9e9a0001, packet:: clientPath:null
> > > >> >> > serverPath:null
> > > >> >> > > > > > finished:false header:: 729,5  replyHeader::
> > > 729,4294968217,0
> > > >> >> > > >  request::
> > > >> >> > > > > >
> > > >> >> > > > > >
> > > >> >> > > > >
> > > >> >> > > >
> > > >> >> > >
> > > >> >> >
> > > >> >>
> > > >>
> > >
> >
> '/brokers/topics/EventServiceUpsertTopic/partitions/5/state,#7b22636f6e74726f6c6c65725f65706f6368223a312c226c6561646572223a312c2276657273696f6e223a312c226c65616465725f65706f6368223a332c22697372223a5b302c315d7d,3
> > > >> >> > > > > > response::
> > > >> >> > > > > >
> > > >> >> > > > > >
> > > >> >> > > > >
> > > >> >> > > >
> > > >> >> > >
> > > >> >> >
> > > >> >>
> > > >>
> > >
> >
> s{4294967416,4294968217,1406309966419,1406310002132,4,0,0,0,74,0,4294967416}
> > > >> >> > > > > >
> > > >> >> > > > > > [2014-07-25 10:40:02,134]  [DEBUG]
> > >  [kafka-processor-49917-2]
> > > >> >> > > > > > [kafka.request.logger]  Completed request:Name:
> > > >> ProducerRequest;
> > > >> >> > > > Version:
> > > >> >> > > > > > 0; CorrelationId: 16248; ClientId: ; RequiredAcks: -1;
> > > >> >> > AckTimeoutMs:
> > > >> >> > > > > 10000
> > > >> >> > > > > > ms from client /127.0.0.1:50168
> > > >> >> > > > > >
> > > >> >> > > > > >
> > > >> >> > > > >
> > > >> >> > > >
> > > >> >> > >
> > > >> >> >
> > > >> >>
> > > >>
> > >
> >
> ;totalTime:1,requestQueueTime:0,localTime:1,remoteTime:0,responseQueueTime:0,sendTime:0
> > > >> >> > > > > >
> > > >> >> > > > > > [2014-07-25 10:40:02,134]  [DEBUG]
> > > >> >> > > > > >
> > > >> >> > > >
> > > >> >> >
> > > >> >>
> > > >>
> > >
> [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899]
> > > >> >> > > > > > [kafka.utils.ZkUtils$]  Conditional update of path
> > > >> >> > > > > >
> > /brokers/topics/EventServiceUpsertTopic/partitions/5/state
> > > >> with
> > > >> >> > value
> > > >> >> > > > > >
> > > >> >> > > > >
> > > >> >> > > >
> > > >> >> > >
> > > >> >> >
> > > >> >>
> > > >>
> > >
> >
> {"controller_epoch":1,"leader":1,"version":1,"leader_epoch":3,"isr":[0,1]}
> > > >> >> > > > > > and expected version 3 succeeded, returning the new
> > > version:
> > > >> 4
> > > >> >> > > > > >
> > > >> >> > > > > > [2014-07-25 10:40:02,136]  [DEBUG]
> > > >> >> > > > > >
> > > >> >> > > >
> > > >> >> >
> > > >> >>
> > > >>
> > >
> [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899]
> > > >> >> > > > > > [k.controller.PartitionStateMachine]  [Partition state
> > > >> machine
> > > >> >> on
> > > >> >> > > > > > Controller 0]: After leader election, leader cache is
> > > >> updated to
> > > >> >> > > > > > Map([EventServiceUpsertTopic,18] ->
> > > >> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> > > >> >> > > > > > [EventServiceUpsertTopic,4] ->
> > > >> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> > > >> >> > > > > > [EventServiceUpsertTopic,7] ->
> > > >> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> > > >> >> > > > > > [EventServiceUpsertTopic,17] ->
> > > >> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> > > >> >> > > > > > [EventServiceUpsertTopic,16] ->
> > > >> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> > > >> >> > > > > > [EventServiceUpsertTopic,5] ->
> > > >> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> > > >> >> > > > > > [EventServiceUpsertTopic,10] ->
> > > >> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> > > >> >> > > > > > [EventServiceUpsertTopic,1] ->
> > > >> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> > > >> >> > > > > > [EventServiceUpsertTopic,13] ->
> > > >> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> > > >> >> > > > > > [EventServiceUpsertTopic,9] ->
> > > >> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> > > >> >> > > > > > [EventServiceUpsertTopic,8] ->
> > > >> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> > > >> >> > > > > > [EventServiceUpsertTopic,11] ->
> > > >> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> > > >> >> > > > > > [EventServiceUpsertTopic,6] ->
> > > >> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> > > >> >> > > > > > [EventServiceUpsertTopic,12] ->
> > > >> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> > > >> >> > > > > > [EventServiceUpsertTopic,2] ->
> > > >> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> > > >> >> > > > > > [EventServiceUpsertTopic,3] ->
> > > >> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> > > >> >> > > > > > [EventServiceUpsertTopic,14] ->
> > > >> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), [
> > > >> >> > > > > > EventServiceUpsertTopic,19] ->
> > > >> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> > > >> >> > > > > > [EventServiceUpsertTopic,0] ->
> > > >> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> > > >> >> > > > > > [EventServiceUpsertTopic,15] ->
> > > >> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1))
> > > >> >> > > > > >
> > > >> >> > > > > > [2014-07-25 10:40:02,136]  [DEBUG]
> > >  [kafka-request-handler-1]
> > > >> >> > > > > > [kafka.cluster.Partition]  Partition
> > > >> >> [EventServiceUpsertTopic,19]
> > > >> >> > on
> > > >> >> > > > > broker
> > > >> >> > > > > > 0: Old hw for partition [EventServiceUpsertTopic,19] is
> > > 8082.
> > > >> >> New
> > > >> >> > hw
> > > >> >> > > is
> > > >> >> > > > > > 8082. All leo's are 8112,8082
> > > >> >> > > > > >
> > > >> >> > > > > > [2014-07-25 10:40:02,136]  [DEBUG]
> > >  [kafka-request-handler-1]
> > > >> >> > > > > > [kafka.server.KafkaApis]  [KafkaApi-0] Produce to local
> > log
> > > >> in
> > > >> >> 0 ms
> > > >> >> > > > > >
> > > >> >> > > > > > [2014-07-25 10:40:02,137]  [DEBUG]
> > >  [kafka-processor-49917-2]
> > > >> >> > > > > > [kafka.request.logger]  Completed request:Name:
> > > >> ProducerRequest;
> > > >> >> > > > Version:
> > > >> >> > > > > > 0; CorrelationId: 16250; ClientId: ; RequiredAcks: -1;
> > > >> >> > AckTimeoutMs:
> > > >> >> > > > > 10000
> > > >> >> > > > > > ms from client /127.0.0.1:50168
> > > >> >> > > > > >
> > > >> >> > > > > >
> > > >> >> > > > >
> > > >> >> > > >
> > > >> >> > >
> > > >> >> >
> > > >> >>
> > > >>
> > >
> >
> ;totalTime:1,requestQueueTime:0,localTime:1,remoteTime:0,responseQueueTime:0,sendTime:0
> > > >> >> > > > > >
> > > >> >> > > > > > [2014-07-25 10:40:02,140]  [INFO ]
> > > >> >> > > > > >
> > > >> >> > > >
> > > >> >> >
> > > >> >>
> > > >>
> > >
> [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899]
> > > >> >> > > > > > [kafka.controller.KafkaController]  [Controller 0]:
> > > >> Partition [
> > > >> >> > > > > > EventServiceUpsertTopic,19] completed preferred replica
> > > >> leader
> > > >> >> > > > election.
> > > >> >> > > > > > New leader is 1
> > > >> >> > > > > >
> > > >> >> > > > > > [2014-07-25 10:40:02,140]  [INFO ]
> > > >> >> > > > > >
> > > >> >> > > >
> > > >> >> >
> > > >> >>
> > > >>
> > >
> [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899]
> > > >> >> > > > > > [kafka.controller.KafkaController]  [Controller 0]:
> > > Partition
> > > >> >> > > > > > [EventServiceUpsertTopic,5] completed preferred replica
> > > >> leader
> > > >> >> > > > election.
> > > >> >> > > > > > New leader is 1
> > > >> >> > > > > >
> > > >> >> > > > > > [2014-07-25 10:40:02,142]  [DEBUG]
> > > >> >> > >  [main-SendThread(localhost:49893)]
> > > >> >> > > > > > [org.apache.zookeeper.ClientCnxn]  Got notification
> > > >> >> > > > > > sessionid:0x476e9a9e9a0001
> > > >> >> > > > > >
> > > >> >> > > > > > [2014-07-25 10:40:02,142]  [DEBUG]
> > > >> >> > >  [main-SendThread(localhost:49893)]
> > > >> >> > > > > > [org.apache.zookeeper.ClientCnxn]  Got WatchedEvent
> > > >> >> > > state:SyncConnected
> > > >> >> > > > > > type:NodeDeleted path:/admin/preferred_replica_election
> > for
> > > >> >> > sessionid
> > > >> >> > > > > > 0x476e9a9e9a0001
> > > >> >> > > > > >
> > > >> >> > > > > > [2014-07-25 10:40:02,143]  [DEBUG]
> > > >> >> > >  [main-SendThread(localhost:49893)]
> > > >> >> > > > > > [org.apache.zookeeper.ClientCnxn]  Reading reply
> > > >> >> > > > > > sessionid:0x476e9a9e9a0001, packet:: clientPath:null
> > > >> >> > serverPath:null
> > > >> >> > > > > > finished:false header:: 730,2  replyHeader::
> > > 730,4294968218,0
> > > >> >> > > >  request::
> > > >> >> > > > > > '/admin/preferred_replica_election,-1  response:: null
> > > >> >> > > > > >
> > > >> >> > > > > > [2014-07-25 10:40:02,145]  [INFO ]
> > >  [kafka-request-handler-0]
> > > >> >> > > > > > [kafka.server.ReplicaFetcherManager]
> > >  [ReplicaFetcherManager
> > > >> on
> > > >> >> > > broker
> > > >> >> > > > 0]
> > > >> >> > > > > > Removed fetcher for partitions
> > > >> >> > > > > >
> > > >> >> > > > > >
> > > >> >> > > > >
> > > >> >> > > >
> > > >> >> > >
> > > >> >> >
> > > >> >>
> > > >>
> > >
> >
> [EventServiceUpsertTopic,13],[EventServiceUpsertTopic,11],[EventServiceUpsertTopic,17],[EventServiceUpsertTopic,7],[EventServiceUpsertTopic,9],[EventServiceUpsertTopic,1],[EventServiceUpsertTopic,15],[
> > > >> >> > > > > > EventServiceUpsertTopic,19
> > > >> >> > > > > >
> ],[EventServiceUpsertTopic,3],[EventServiceUpsertTopic,5]
> > > >> >> > > > > >
> > > >> >> > > > > > [2014-07-25 10:40:02,153]  [INFO ]
> > >  [kafka-request-handler-0]
> > > >> >> > > > > > [kafka.log.Log]  Truncating log
> > EventServiceUpsertTopic-19
> > > to
> > > >> >> > offset
> > > >> >> > > > > 8082.
> > > >> >> > > > > >
> > > >> >> > > > > > [2014-07-25 10:40:02,159]  [INFO ]
> > >  [kafka-request-handler-0]
> > > >> >> > > > > > [kafka.log.Log]  Truncating log
> EventServiceUpsertTopic-5
> > > to
> > > >> >> offset
> > > >> >> > > 0.
> > > >> >> > > > > >
> > > >> >> > > > > >
> > > >> >> > > > > > I have removed some irrelevant lines. As you can see,
> the
> > > log
> > > >> >> for
> > > >> >> > > > > > EventServiceUpsertTopic-19 is being truncated, losing
> > > >> messages
> > > >> >> that
> > > >> >> > > > have
> > > >> >> > > > > > not yet been replicated to broker 1. I'm not sure
> exactly
> > > >> what
> > > >> >> the
> > > >> >> > > > issue
> > > >> >> > > > > > is. Maybe new requests should be completely blocked
> after
> > > >> leader
> > > >> >> > > > election
> > > >> >> > > > > > until the new leader catches up to the messages that
> have
> > > >> been
> > > >> >> > acked
> > > >> >> > > > and
> > > >> >> > > > > it
> > > >> >> > > > > > has not yet received.
> > > >> >> > > > > >
> > > >> >> > > > > > Thanks,
> > > >> >> > > > > >
> > > >> >> > > > > > Jad.
> > > >> >> > > > > >
> > > >> >> > > > > >
> > > >> >> > > > > > On Fri, Jul 25, 2014 at 2:14 PM, Guozhang Wang <
> > > >> >> wangg...@gmail.com
> > > >> >> > >
> > > >> >> > > > > wrote:
> > > >> >> > > > > >
> > > >> >> > > > > > > Hello Jad,
> > > >> >> > > > > > >
> > > >> >> > > > > > > I double-checked the source code again, and found
> that
> > > >> >> actually
> > > >> >> > the
> > > >> >> > > > > > > preferred leader elector does consider whether the
> > > selected
> > > >> >> > replica
> > > >> >> > > > is
> > > >> >> > > > > in
> > > >> >> > > > > > > ISR or not. This means that by the time the election
> is
> > > >> >> > triggered,
> > > >> >> > > > > > broker 1
> > > >> >> > > > > > > is added to ISR by broker 0. Could you check before
> > step
> > > >> 3, is
> > > >> >> > > there
> > > >> >> > > > > any
> > > >> >> > > > > > > log entries on broker 0 adding broker 1 to ISR or
> > > updating
> > > >> HW
> > > >> >> to
> > > >> >> > > 975?
> > > >> >> > > > > > >
> > > >> >> > > > > > >
> > > >> >> > > > > > > On Thu, Jul 24, 2014 at 4:21 PM, Jad Naous <
> > > >> >> > > > jad.na...@appdynamics.com>
> > > >> >> > > > > > > wrote:
> > > >> >> > > > > > >
> > > >> >> > > > > > > > Hi,
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > I have a test that continuously sends messages to
> one
> > > >> >> broker,
> > > >> >> > > > brings
> > > >> >> > > > > up
> > > >> >> > > > > > > > another broker, and adds it as a replica for all
> > > >> partitions,
> > > >> >> > with
> > > >> >> > > > it
> > > >> >> > > > > > > being
> > > >> >> > > > > > > > the preferred replica for some. I have
> > > >> >> > > > > > auto.leader.rebalance.enable=true,
> > > >> >> > > > > > > > so replica election gets triggered. Data is being
> > > pumped
> > > >> to
> > > >> >> the
> > > >> >> > > old
> > > >> >> > > > > > > broker
> > > >> >> > > > > > > > all the while. It seems that some data gets lost
> > while
> > > >> >> > switching
> > > >> >> > > > over
> > > >> >> > > > > > to
> > > >> >> > > > > > > > the new leader. Is this a bug, or do I have
> something
> > > >> >> > > > misconfigured?
> > > >> >> > > > > I
> > > >> >> > > > > > > also
> > > >> >> > > > > > > > have request.required.acks=-1 on the producer.
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > Here's what I think is happening:
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > 1. Producer writes message to broker 0,
> > > >> >> > > > [EventServiceUpsertTopic,13],
> > > >> >> > > > > > w/
> > > >> >> > > > > > > > broker 0 currently leader, with ISR=(0), so write
> > > returns
> > > >> >> > > > > successfully,
> > > >> >> > > > > > > > even when acks = -1. Correlation id 35836
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > Producer log:
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > [2014-07-24 14:44:26,991]  [DEBUG]  [dw-97 - PATCH
> > > >> >> > > > > > > >
> > > >> >> > > > > > > >
> > > >> >> > > > > > >
> > > >> >> > > > > >
> > > >> >> > > > >
> > > >> >> > > >
> > > >> >> > >
> > > >> >> >
> > > >> >>
> > > >>
> > >
> >
> /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1]
> > > >> >> > > > > > > > [kafka.producer.BrokerPartitionInfo]  Partition
> > > >> >> > > > > > > > [EventServiceUpsertTopic,13] has leader 0
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > [2014-07-24 14:44:26,993]  [DEBUG]  [dw-97 - PATCH
> > > >> >> > > > > > > >
> > > >> >> > > > > > > >
> > > >> >> > > > > > >
> > > >> >> > > > > >
> > > >> >> > > > >
> > > >> >> > > >
> > > >> >> > >
> > > >> >> >
> > > >> >>
> > > >>
> > >
> >
> /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1]
> > > >> >> > > > > > > > [k.producer.async.DefaultEventHandler]  Producer
> sent
> > > >> >> messages
> > > >> >> > > with
> > > >> >> > > > > > > > correlation id 35836 for topics
> > > >> >> [EventServiceUpsertTopic,13] to
> > > >> >> > > > > broker
> > > >> >> > > > > > 0
> > > >> >> > > > > > > on
> > > >> >> > > > > > > > localhost:56821
> > > >> >> > > > > > > > 2. Broker 1 is still catching up
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > Broker 0 Log:
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > [2014-07-24 14:44:26,992]  [DEBUG]
> > > >> >>  [kafka-request-handler-3]
> > > >> >> > > > > > > > [kafka.cluster.Partition]  Partition
> > > >> >> > [EventServiceUpsertTopic,13]
> > > >> >> > > > on
> > > >> >> > > > > > > broker
> > > >> >> > > > > > > > 0: Old hw for partition
> [EventServiceUpsertTopic,13]
> > is
> > > >> 971.
> > > >> >> > New
> > > >> >> > > hw
> > > >> >> > > > > is
> > > >> >> > > > > > > 971.
> > > >> >> > > > > > > > All leo's are 975,971
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > [2014-07-24 14:44:26,992]  [DEBUG]
> > > >> >>  [kafka-request-handler-3]
> > > >> >> > > > > > > > [kafka.server.KafkaApis]  [KafkaApi-0] Produce to
> > local
> > > >> log
> > > >> >> in
> > > >> >> > 0
> > > >> >> > > ms
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > [2014-07-24 14:44:26,992]  [DEBUG]
> > > >> >>  [kafka-processor-56821-0]
> > > >> >> > > > > > > > [kafka.request.logger]  Completed request:Name:
> > > >> >> > ProducerRequest;
> > > >> >> > > > > > Version:
> > > >> >> > > > > > > > 0; CorrelationId: 35836; ClientId: ; RequiredAcks:
> > -1;
> > > >> >> > > > AckTimeoutMs:
> > > >> >> > > > > > > 10000
> > > >> >> > > > > > > > ms from client /127.0.0.1:57086
> > > >> >> > > > > > > >
> > > >> >> > > > > > > >
> > > >> >> > > > > > >
> > > >> >> > > > > >
> > > >> >> > > > >
> > > >> >> > > >
> > > >> >> > >
> > > >> >> >
> > > >> >>
> > > >>
> > >
> >
> ;totalTime:0,requestQueueTime:0,localTime:0,remoteTime:0,responseQueueTime:0,sendTime:0
> > > >> >> > > > > > > > 3. Leader election is triggered by the scheduler:
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > Broker 0 Log:
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > [2014-07-24 14:44:26,991]  [INFO ]
> >  [kafka-scheduler-0]
> > > >> >> > > > > > > > [k.c.PreferredReplicaPartitionLeaderSelector]
> > > >> >> > > > > > > > [PreferredReplicaPartitionLeaderSelector]: Current
> > > >> leader 0
> > > >> >> for
> > > >> >> > > > > > > partition [
> > > >> >> > > > > > > > EventServiceUpsertTopic,13] is not the preferred
> > > replica.
> > > >> >> > > > Trigerring
> > > >> >> > > > > > > > preferred replica leader election
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > [2014-07-24 14:44:26,993]  [DEBUG]
> >  [kafka-scheduler-0]
> > > >> >> > > > > > > > [kafka.utils.ZkUtils$]  Conditional update of path
> > > >> >> > > > > > > >
> > > >> /brokers/topics/EventServiceUpsertTopic/partitions/13/state
> > > >> >> > with
> > > >> >> > > > > value
> > > >> >> > > > > > > >
> > > >> >> > > > > > >
> > > >> >> > > > > >
> > > >> >> > > > >
> > > >> >> > > >
> > > >> >> > >
> > > >> >> >
> > > >> >>
> > > >>
> > >
> >
> {"controller_epoch":1,"leader":1,"version":1,"leader_epoch":3,"isr":[0,1]}
> > > >> >> > > > > > > > and expected version 3 succeeded, returning the new
> > > >> >> version: 4
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > [2014-07-24 14:44:26,994]  [DEBUG]
> >  [kafka-scheduler-0]
> > > >> >> > > > > > > > [k.controller.PartitionStateMachine]  [Partition
> > state
> > > >> >> machine
> > > >> >> > on
> > > >> >> > > > > > > > Controller 0]: After leader election, leader cache
> is
> > > >> >> updated
> > > >> >> > to
> > > >> >> > > > > > > >
> > > >> >> > > > > > >
> > > >> >> > > > > >
> > > >> >> > > > >
> > > >> >> > > >
> > > >> >> > >
> > > >> >> >
> > > >> >>
> > > >>
> > >
> >
> Map(<Snipped>(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),<EndSnip>)
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > [2014-07-24 14:44:26,994]  [INFO ]
> >  [kafka-scheduler-0]
> > > >> >> > > > > > > > [kafka.controller.KafkaController]  [Controller 0]:
> > > >> >> Partition [
> > > >> >> > > > > > > > EventServiceUpsertTopic,13] completed preferred
> > replica
> > > >> >> leader
> > > >> >> > > > > > election.
> > > >> >> > > > > > > > New leader is 1
> > > >> >> > > > > > > > 4. Broker 1 is still behind, but it sets the high
> > water
> > > >> >> mark to
> > > >> >> > > > > 971!!!
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > Broker 1 Log:
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > [2014-07-24 14:44:26,999]  [INFO ]
> > > >> >>  [kafka-request-handler-6]
> > > >> >> > > > > > > > [kafka.server.ReplicaFetcherManager]
> > > >> >>  [ReplicaFetcherManager on
> > > >> >> > > > > broker
> > > >> >> > > > > > 1]
> > > >> >> > > > > > > > Removed fetcher for partitions
> > > >> [EventServiceUpsertTopic,13]
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > [2014-07-24 14:44:27,000]  [DEBUG]
> > > >> >>  [kafka-request-handler-6]
> > > >> >> > > > > > > > [kafka.cluster.Partition]  Partition
> > > >> >> > [EventServiceUpsertTopic,13]
> > > >> >> > > > on
> > > >> >> > > > > > > broker
> > > >> >> > > > > > > > 1: Old hw for partition
> [EventServiceUpsertTopic,13]
> > is
> > > >> 970.
> > > >> >> > New
> > > >> >> > > hw
> > > >> >> > > > > is
> > > >> >> > > > > > > -1.
> > > >> >> > > > > > > > All leo's are -1,971
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > [2014-07-24 14:44:27,098]  [DEBUG]
> > > >> >>  [kafka-request-handler-3]
> > > >> >> > > > > > > > [kafka.server.KafkaApis]  [KafkaApi-1] Maybe update
> > > >> >> partition
> > > >> >> > HW
> > > >> >> > > > due
> > > >> >> > > > > to
> > > >> >> > > > > > > > fetch request: Name: FetchRequest; Version: 0;
> > > >> >> CorrelationId:
> > > >> >> > 1;
> > > >> >> > > > > > > ClientId:
> > > >> >> > > > > > > > ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait:
> 500
> > > ms;
> > > >> >> > > MinBytes:
> > > >> >> > > > 1
> > > >> >> > > > > > > bytes;
> > > >> >> > > > > > > > RequestInfo: [EventServiceUpsertTopic,13] ->
> > > >> >> > > > > > > > PartitionFetchInfo(971,1048576), <Snipped>
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > [2014-07-24 14:44:27,098]  [DEBUG]
> > > >> >>  [kafka-request-handler-3]
> > > >> >> > > > > > > > [kafka.cluster.Partition]  Partition
> > > >> >> > [EventServiceUpsertTopic,13]
> > > >> >> > > > on
> > > >> >> > > > > > > broker
> > > >> >> > > > > > > > 1: Recording follower 0 position 971 for partition
> [
> > > >> >> > > > > > > > EventServiceUpsertTopic,13].
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > [2014-07-24 14:44:27,100]  [DEBUG]
> > > >> >>  [kafka-request-handler-3]
> > > >> >> > > > > > > > [kafka.cluster.Partition]  Partition
> > > >> >> > [EventServiceUpsertTopic,13]
> > > >> >> > > > on
> > > >> >> > > > > > > broker
> > > >> >> > > > > > > > 1: Highwatermark for partition
> > > >> [EventServiceUpsertTopic,13]
> > > >> >> > > updated
> > > >> >> > > > > to
> > > >> >> > > > > > > 971
> > > >> >> > > > > > > > 5. Consumer is none the wiser. All data that was in
> > > >> offsets
> > > >> >> > > 972-975
> > > >> >> > > > > > > doesn't
> > > >> >> > > > > > > > show up!
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > I tried this with 2 initial replicas, and adding a
> > 3rd
> > > >> >> which is
> > > >> >> > > > > > supposed
> > > >> >> > > > > > > to
> > > >> >> > > > > > > > be the leader for some new partitions, and this
> > problem
> > > >> also
> > > >> >> > > > happens
> > > >> >> > > > > > > there.
> > > >> >> > > > > > > > The log on the old leader gets truncated to the
> > offset
> > > on
> > > >> >> the
> > > >> >> > new
> > > >> >> > > > > > leader.
> > > >> >> > > > > > > > What's the solution? Can I make a new broker leader
> > for
> > > >> >> > > partitions
> > > >> >> > > > > that
> > > >> >> > > > > > > are
> > > >> >> > > > > > > > currently active without losing data?
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > Thanks,
> > > >> >> > > > > > > > Jad.
> > > >> >> > > > > > > >
> > > >> >> > > > > > > > --
> > > >> >> > > > > > > >  *Jad Naous* | Engineering | AppDynamics
> > > >> >> > > > > > > >  <http://www.appdynamics.com>
> > > >> >> > > > > > > >
> > > >> >> > > > > > >
> > > >> >> > > > > > >
> > > >> >> > > > > > >
> > > >> >> > > > > > > --
> > > >> >> > > > > > > -- Guozhang
> > > >> >> > > > > > >
> > > >> >> > > > > >
> > > >> >> > > > > >
> > > >> >> > > > > >
> > > >> >> > > > > > --
> > > >> >> > > > > >  *Jad Naous* | Engineering | AppDynamics
> > > >> >> > > > > >  <http://www.appdynamics.com>
> > > >> >> > > > > >
> > > >> >> > > > >
> > > >> >> > > > >
> > > >> >> > > > >
> > > >> >> > > > > --
> > > >> >> > > > > -- Guozhang
> > > >> >> > > > >
> > > >> >> > > >
> > > >> >> > >
> > > >> >> >
> > > >> >> >
> > > >> >> >
> > > >> >> > --
> > > >> >> > -- Guozhang
> > > >> >> >
> > > >> >>
> > > >> >>
> > > >> >>
> > > >> >> --
> > > >> >>  *Jad Naous* | Engineering | AppDynamics
> > > >> >>  <http://www.appdynamics.com>
> > > >> >>
> > > >> >
> > > >> >
> > > >> >
> > > >> > --
> > > >> > -- Guozhang
> > > >> >
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> -- Guozhang
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > >  *Jad Naous* | Engineering | AppDynamics
> > > >  <http://www.appdynamics.com>
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> >
> > --
> >  *Jad Naous* | Engineering | AppDynamics
> >  <http://www.appdynamics.com>
> >
>
>
>
> --
> -- Guozhang
>



-- 
 *Jad Naous* | Engineering | AppDynamics
 <http://www.appdynamics.com>

Reply via email to