The logs are quite large. I've sifted through them, and I'm attaching the
logs for the relevant parts where the lost message goes through the system.
Here's what the test does:

0) Start two brokers, one producer, one consumer. Topic has 20 partitions,
using default partitioning scheme (which seems to send data to only a
couple of partitions when the keys are null, but that doesn't matter for
this test).
1) Start a data generator sending data through Kafka continuously
2) Start a new broker
3) Reassign partitions: {"version": 1, "partitions":[
    {"topic":"EventServiceUpsertTopic","partition":0,        "replicas":
[0, 1, 2]},
    {"topic":"EventServiceUpsertTopic","partition":1,        "replicas":
[1, 2, 0]},
    {"topic":"EventServiceUpsertTopic","partition":2,        "replicas":
[2, 0, 1]},
    {"topic":"EventServiceUpsertTopic","partition":3,        "replicas":
[0, 1, 2]},
    {"topic":"EventServiceUpsertTopic","partition":4,        "replicas":
[1, 2, 0]},
    {"topic":"EventServiceUpsertTopic","partition":5,        "replicas":
[2, 0, 1]},
    {"topic":"EventServiceUpsertTopic","partition":6,        "replicas":
[0, 1, 2]},
    {"topic":"EventServiceUpsertTopic","partition":7,        "replicas":
[1, 2, 0]},
    {"topic":"EventServiceUpsertTopic","partition":8,        "replicas":
[2, 0, 1]},
    {"topic":"EventServiceUpsertTopic","partition":9,        "replicas":
[0, 1, 2]},
    {"topic":"EventServiceUpsertTopic","partition":10,        "replicas":
[1, 2, 0]},
    {"topic":"EventServiceUpsertTopic","partition":11,        "replicas":
[2, 0, 1]},
    {"topic":"EventServiceUpsertTopic","partition":12,        "replicas":
[0, 1, 2]},
    {"topic":"EventServiceUpsertTopic","partition":13,        "replicas":
[1, 2, 0]},
    {"topic":"EventServiceUpsertTopic","partition":14,        "replicas":
[2, 0, 1]},
    {"topic":"EventServiceUpsertTopic","partition":15,        "replicas":
[0, 1, 2]},
    {"topic":"EventServiceUpsertTopic","partition":16,        "replicas":
[1, 2, 0]},
    {"topic":"EventServiceUpsertTopic","partition":17,        "replicas":
[2, 0, 1]},
    {"topic":"EventServiceUpsertTopic","partition":18,        "replicas":
[0, 1, 2]},
    {"topic":"EventServiceUpsertTopic","partition":19,        "replicas":
[1, 2, 0]}]}
4) Wait until reassignment is complete (i.e. until
ZkUtils.getPartitionsBeingReassigned() returns empty map)
5) Wait until all replicas are caught up (i.e. until
ZkUtils.getLeaderAndIsrForPartition() returns all brokers in the ISR for
each partition)
6) Trigger leader re-election by calling the
PreferredReplicaLeaderElectionCommand
7) Wait until all the leaders are the preferred leaders for partitions
according to the replica reassignment from step 3
8) Stop the data generator
9) Check that all the data was consumed

You can seem from the producer.log that the data: {"field1": ["10"],
"idField": "id-5-59"} was sent to broker0 successfully, but the consumer
never sees it.

Thanks,
Jad.



On Mon, Jul 28, 2014 at 10:33 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Also, for the initial two replica case did you see any error/warning logs
> on your producers?
>
> Guozhang
>
>
> On Mon, Jul 28, 2014 at 10:32 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
> > Hi Jad,
> >
> > Just to clarify, you also see data loss when you created the topic with
> > replica factor 2, and two replicas running, and after an auto leader
> > election triggered? If that is the case could you attach the logs of all
> > involved brokers here?
> >
> > For your second question, KAFKA-1211 is designed to handle that case.
> >
> > Guozhang
> >
> >
> > On Mon, Jul 28, 2014 at 10:18 AM, Jad Naous <jad.na...@appdynamics.com>
> > wrote:
> >
> >> Guozhang,
> >>
> >> I have actually also seen this happen when there are two replicas
> >> initially. So this problem is not limited to 1 replica. The issue is the
> >> truncation after leader election, which will also happen on the second
> >> replica.
> >>
> >> Coming back to your objections:
> >>
> >>
> >> First case: inconsistency between replicas
> >> > 1) currently replica 1 is the current leader
> >> > replica 1: m1 m2 m3
> >> > replica 2: m1 m2
> >> > 2) replica 1 fails, and replica 2 becomes the new leader and accept
> >> > messages 4 and 5:
> >> > replica 1: m1 m2 m3
> >> > replica 2: m1 m2 m4 m5
> >> > 3) replica 1 resumes, and does not truncate to HW, then it will still
> >> > maintain m3, which is actually never "committed". Say leader moves to
> >> > replica 1 again, we can ended up with:
> >> > replica 1: m1 m2 m3 m6
> >> > replica 2: m1 m2 m4 m5
> >>
> >>
> >> I see. So when a replica resumes, it has to truncate to the last HW it
> saw
> >> before it died.
> >>
> >>
> >> >
> >> > Second case: inconsistency between server and clients:
> >> > 1) producer send message m3 with ack=-1:
> >> > replica 1: m1 m2 m3
> >> > replica 2: m1 m2 m3
> >> > replica 3: m1 m2
> >> > 2) the response is held until all replicas also gets m3, say at this
> >> time
> >> > current leader replica 1 fails and replica 3 re-elects. If replica 3
> >> gets
> >> > up to the largest LEO it will also get m3.
> >> > replica 2: m1 m2 m3
> >> > replica 3: m1 m2 m3
> >> > 3) But m3 is not actually "committed" by the time replica 1 fails;
> when
> >> > producer gets the error at the time replica 1 fails, it will think
> that
> >> m3
> >> > was not successfully sent, so retry sending m3:
> >> > replica 2: m1 m2 m3 m3
> >> > replica 3: m1 m2 m3 m3
> >>
> >>
> >> So on failure, all nodes need to truncate to the HW. But if there's no
> >> failure, then truncating would lose data unnecessarily. Maybe those two
> >> scenarios need to be handled differently?
> >>
> >> Jad.
> >>
> >>
> >> On Mon, Jul 28, 2014 at 9:58 AM, Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >>
> >> > Jun, Jad,
> >> >
> >> > I think in this case data loss can still happen, since the replica
> >> factor
> >> > was previously one, and in handling the produce requests, if the
> server
> >> > decides that all the produced partitions have a replica factor of 1 it
> >> will
> >> > also directly send back the response instead of putting the request
> into
> >> > purgatory even if currently the number of replicas is 2 (for details
> >> look
> >> > at ReplicaManager.getReplicationFactorForPartition and search of the
> >> usage
> >> > of Partition.replicationFactor).
> >> >
> >> > I now agree that this is not related to KAFKA-1211 but a different
> small
> >> > bug. We need to probably file another JIRA for this. But I think after
> >> this
> >> > one is fixed (which should be much easier than KAFKA-1211), Jad's
> >> scenario
> >> > should not cause data loss anymore.
> >> >
> >> > Guozhang
> >> >
> >> >
> >> > On Sun, Jul 27, 2014 at 6:11 PM, Jad Naous <jad.na...@appdynamics.com
> >
> >> > wrote:
> >> >
> >> > > So in summary, is it true to say that currently triggering leader
> >> > > reelection is not a safe operation? I have been able to reproduce
> that
> >> > > message loss pretty reliably in tests. If that is the case, isn't
> >> that an
> >> > > important operation in a large cluster where nodes go up and down?
> >> > > On Jul 25, 2014 10:00 PM, "Jun Rao" <jun...@gmail.com> wrote:
> >> > >
> >> > > > Actually, I don't think KAFKA-1211 will happen with just 2
> replicas.
> >> > > When a
> >> > > > replica becomes a leader, it never truncates its log. Only when a
> >> > replica
> >> > > > becomes follower, it truncates its log to HW. So in this
> particular
> >> > case,
> >> > > > the new leader will not truncate data to offset 8.
> >> > > >
> >> > > > Thanks,
> >> > > >
> >> > > > Jun
> >> > > >
> >> > > >
> >> > > > On Fri, Jul 25, 2014 at 3:37 PM, Guozhang Wang <
> wangg...@gmail.com>
> >> > > wrote:
> >> > > >
> >> > > > > Hi Jad,
> >> > > > >
> >> > > > > Yes. In this case I think you are actually hitting KAFKA-1211.
> The
> >> > > > summary
> >> > > > > of the issue is that, it takes one more fetch request round trip
> >> for
> >> > > the
> >> > > > > follower replica to advance the HW after the leader has advanced
> >> HW.
> >> > So
> >> > > > for
> >> > > > > your case, the whole process is like this:
> >> > > > >
> >> > > > > 1. leader LEO at 10, follower LEO at 8. Both leader and follower
> >> > knows
> >> > > > the
> >> > > > > LEO is at 8.
> >> > > > > 2. Follower fetch data on Leader starting at 8, leader records
> its
> >> > LEO
> >> > > as
> >> > > > > 8.
> >> > > > > 3. Follower gets 9 and 10 and append to its local log.
> >> > > > > 4. Follower fetch data on Leader starting at 10, leader records
> >> its
> >> > LEO
> >> > > > as
> >> > > > > 10; now leader knows follower has caught up, it advances its HW
> >> to 10
> >> > > and
> >> > > > > adds the follower to ISR (but follower does not know that yet!
> It
> >> > still
> >> > > > > think the HW is 8).
> >> > > > > 5. Leader's fetch response gets back to follower, and now the
> >> > follower
> >> > > > > knows that HW has been updated to 10.
> >> > > > >
> >> > > > > And let's say there is a leader election between step 4) and 5),
> >> for
> >> > > your
> >> > > > > case it is due to preferred leader election, but it could also
> be
> >> > that
> >> > > > > current leader fails, etc. Then on becoming the new leader the
> >> > follower
> >> > > > > will truncate its data to 8, which is the HW it knows. Hence the
> >> data
> >> > > > loss.
> >> > > > >
> >> > > > > The proposed solution in KAFKA-1211 will tackle this issue.
> >> > > > >
> >> > > > > Guozhang
> >> > > > >
> >> > > > >
> >> > > > > On Fri, Jul 25, 2014 at 2:48 PM, Jad Naous <
> >> > jad.na...@appdynamics.com>
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Hi Guozhang,
> >> > > > > >
> >> > > > > > Yes, broker 1 is in the ISR (in fact, I wait for broker 1 to
> be
> >> in
> >> > > the
> >> > > > > ISR
> >> > > > > > before triggering election). However, I think there is
> something
> >> > > still
> >> > > > > > amiss. I still see data loss. Here are some relevant log lines
> >> from
> >> > > > > broker
> >> > > > > > 0 (different test run). The log on broker 0 is getting
> >> truncated,
> >> > > > losing
> >> > > > > > some messages.
> >> > > > > >
> >> > > > > > [2014-07-25 10:40:02,134]  [DEBUG]  [kafka-request-handler-5]
> >> > > > > > [kafka.cluster.Partition]  Partition
> >> [EventServiceUpsertTopic,19]
> >> > on
> >> > > > > broker
> >> > > > > > 0: Old hw for partition [EventServiceUpsertTopic,19] is 8082.
> >> New
> >> > hw
> >> > > is
> >> > > > > > 8082. All leo's are 8111,8082
> >> > > > > >
> >> > > > > > [2014-07-25 10:40:02,134]  [DEBUG]  [kafka-request-handler-5]
> >> > > > > > [kafka.server.KafkaApis]  [KafkaApi-0] Produce to local log in
> >> 1 ms
> >> > > > > >
> >> > > > > > [2014-07-25 10:40:02,134]  [DEBUG]
> >> > >  [main-SendThread(localhost:49893)]
> >> > > > > > [org.apache.zookeeper.ClientCnxn]  Reading reply
> >> > > > > > sessionid:0x476e9a9e9a0001, packet:: clientPath:null
> >> > serverPath:null
> >> > > > > > finished:false header:: 729,5  replyHeader:: 729,4294968217,0
> >> > > >  request::
> >> > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> '/brokers/topics/EventServiceUpsertTopic/partitions/5/state,#7b22636f6e74726f6c6c65725f65706f6368223a312c226c6561646572223a312c2276657273696f6e223a312c226c65616465725f65706f6368223a332c22697372223a5b302c315d7d,3
> >> > > > > > response::
> >> > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> s{4294967416,4294968217,1406309966419,1406310002132,4,0,0,0,74,0,4294967416}
> >> > > > > >
> >> > > > > > [2014-07-25 10:40:02,134]  [DEBUG]  [kafka-processor-49917-2]
> >> > > > > > [kafka.request.logger]  Completed request:Name:
> ProducerRequest;
> >> > > > Version:
> >> > > > > > 0; CorrelationId: 16248; ClientId: ; RequiredAcks: -1;
> >> > AckTimeoutMs:
> >> > > > > 10000
> >> > > > > > ms from client /127.0.0.1:50168
> >> > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> ;totalTime:1,requestQueueTime:0,localTime:1,remoteTime:0,responseQueueTime:0,sendTime:0
> >> > > > > >
> >> > > > > > [2014-07-25 10:40:02,134]  [DEBUG]
> >> > > > > >
> >> > > >
> >> >
> >>
> [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899]
> >> > > > > > [kafka.utils.ZkUtils$]  Conditional update of path
> >> > > > > > /brokers/topics/EventServiceUpsertTopic/partitions/5/state
> with
> >> > value
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> {"controller_epoch":1,"leader":1,"version":1,"leader_epoch":3,"isr":[0,1]}
> >> > > > > > and expected version 3 succeeded, returning the new version: 4
> >> > > > > >
> >> > > > > > [2014-07-25 10:40:02,136]  [DEBUG]
> >> > > > > >
> >> > > >
> >> >
> >>
> [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899]
> >> > > > > > [k.controller.PartitionStateMachine]  [Partition state machine
> >> on
> >> > > > > > Controller 0]: After leader election, leader cache is updated
> to
> >> > > > > > Map([EventServiceUpsertTopic,18] ->
> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> >> > > > > > [EventServiceUpsertTopic,4] ->
> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> >> > > > > > [EventServiceUpsertTopic,7] ->
> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> >> > > > > > [EventServiceUpsertTopic,17] ->
> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> >> > > > > > [EventServiceUpsertTopic,16] ->
> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> >> > > > > > [EventServiceUpsertTopic,5] ->
> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> >> > > > > > [EventServiceUpsertTopic,10] ->
> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> >> > > > > > [EventServiceUpsertTopic,1] ->
> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> >> > > > > > [EventServiceUpsertTopic,13] ->
> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> >> > > > > > [EventServiceUpsertTopic,9] ->
> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> >> > > > > > [EventServiceUpsertTopic,8] ->
> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> >> > > > > > [EventServiceUpsertTopic,11] ->
> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> >> > > > > > [EventServiceUpsertTopic,6] ->
> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> >> > > > > > [EventServiceUpsertTopic,12] ->
> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> >> > > > > > [EventServiceUpsertTopic,2] ->
> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> >> > > > > > [EventServiceUpsertTopic,3] ->
> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> >> > > > > > [EventServiceUpsertTopic,14] ->
> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), [
> >> > > > > > EventServiceUpsertTopic,19] ->
> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
> >> > > > > > [EventServiceUpsertTopic,0] ->
> >> > > > > > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
> >> > > > > > [EventServiceUpsertTopic,15] ->
> >> > > > > > (Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1))
> >> > > > > >
> >> > > > > > [2014-07-25 10:40:02,136]  [DEBUG]  [kafka-request-handler-1]
> >> > > > > > [kafka.cluster.Partition]  Partition
> >> [EventServiceUpsertTopic,19]
> >> > on
> >> > > > > broker
> >> > > > > > 0: Old hw for partition [EventServiceUpsertTopic,19] is 8082.
> >> New
> >> > hw
> >> > > is
> >> > > > > > 8082. All leo's are 8112,8082
> >> > > > > >
> >> > > > > > [2014-07-25 10:40:02,136]  [DEBUG]  [kafka-request-handler-1]
> >> > > > > > [kafka.server.KafkaApis]  [KafkaApi-0] Produce to local log in
> >> 0 ms
> >> > > > > >
> >> > > > > > [2014-07-25 10:40:02,137]  [DEBUG]  [kafka-processor-49917-2]
> >> > > > > > [kafka.request.logger]  Completed request:Name:
> ProducerRequest;
> >> > > > Version:
> >> > > > > > 0; CorrelationId: 16250; ClientId: ; RequiredAcks: -1;
> >> > AckTimeoutMs:
> >> > > > > 10000
> >> > > > > > ms from client /127.0.0.1:50168
> >> > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> ;totalTime:1,requestQueueTime:0,localTime:1,remoteTime:0,responseQueueTime:0,sendTime:0
> >> > > > > >
> >> > > > > > [2014-07-25 10:40:02,140]  [INFO ]
> >> > > > > >
> >> > > >
> >> >
> >>
> [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899]
> >> > > > > > [kafka.controller.KafkaController]  [Controller 0]: Partition
> [
> >> > > > > > EventServiceUpsertTopic,19] completed preferred replica leader
> >> > > > election.
> >> > > > > > New leader is 1
> >> > > > > >
> >> > > > > > [2014-07-25 10:40:02,140]  [INFO ]
> >> > > > > >
> >> > > >
> >> >
> >>
> [ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899]
> >> > > > > > [kafka.controller.KafkaController]  [Controller 0]: Partition
> >> > > > > > [EventServiceUpsertTopic,5] completed preferred replica leader
> >> > > > election.
> >> > > > > > New leader is 1
> >> > > > > >
> >> > > > > > [2014-07-25 10:40:02,142]  [DEBUG]
> >> > >  [main-SendThread(localhost:49893)]
> >> > > > > > [org.apache.zookeeper.ClientCnxn]  Got notification
> >> > > > > > sessionid:0x476e9a9e9a0001
> >> > > > > >
> >> > > > > > [2014-07-25 10:40:02,142]  [DEBUG]
> >> > >  [main-SendThread(localhost:49893)]
> >> > > > > > [org.apache.zookeeper.ClientCnxn]  Got WatchedEvent
> >> > > state:SyncConnected
> >> > > > > > type:NodeDeleted path:/admin/preferred_replica_election for
> >> > sessionid
> >> > > > > > 0x476e9a9e9a0001
> >> > > > > >
> >> > > > > > [2014-07-25 10:40:02,143]  [DEBUG]
> >> > >  [main-SendThread(localhost:49893)]
> >> > > > > > [org.apache.zookeeper.ClientCnxn]  Reading reply
> >> > > > > > sessionid:0x476e9a9e9a0001, packet:: clientPath:null
> >> > serverPath:null
> >> > > > > > finished:false header:: 730,2  replyHeader:: 730,4294968218,0
> >> > > >  request::
> >> > > > > > '/admin/preferred_replica_election,-1  response:: null
> >> > > > > >
> >> > > > > > [2014-07-25 10:40:02,145]  [INFO ]  [kafka-request-handler-0]
> >> > > > > > [kafka.server.ReplicaFetcherManager]  [ReplicaFetcherManager
> on
> >> > > broker
> >> > > > 0]
> >> > > > > > Removed fetcher for partitions
> >> > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> [EventServiceUpsertTopic,13],[EventServiceUpsertTopic,11],[EventServiceUpsertTopic,17],[EventServiceUpsertTopic,7],[EventServiceUpsertTopic,9],[EventServiceUpsertTopic,1],[EventServiceUpsertTopic,15],[
> >> > > > > > EventServiceUpsertTopic,19
> >> > > > > > ],[EventServiceUpsertTopic,3],[EventServiceUpsertTopic,5]
> >> > > > > >
> >> > > > > > [2014-07-25 10:40:02,153]  [INFO ]  [kafka-request-handler-0]
> >> > > > > > [kafka.log.Log]  Truncating log EventServiceUpsertTopic-19 to
> >> > offset
> >> > > > > 8082.
> >> > > > > >
> >> > > > > > [2014-07-25 10:40:02,159]  [INFO ]  [kafka-request-handler-0]
> >> > > > > > [kafka.log.Log]  Truncating log EventServiceUpsertTopic-5 to
> >> offset
> >> > > 0.
> >> > > > > >
> >> > > > > >
> >> > > > > > I have removed some irrelevant lines. As you can see, the log
> >> for
> >> > > > > > EventServiceUpsertTopic-19 is being truncated, losing messages
> >> that
> >> > > > have
> >> > > > > > not yet been replicated to broker 1. I'm not sure exactly what
> >> the
> >> > > > issue
> >> > > > > > is. Maybe new requests should be completely blocked after
> leader
> >> > > > election
> >> > > > > > until the new leader catches up to the messages that have been
> >> > acked
> >> > > > and
> >> > > > > it
> >> > > > > > has not yet received.
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > >
> >> > > > > > Jad.
> >> > > > > >
> >> > > > > >
> >> > > > > > On Fri, Jul 25, 2014 at 2:14 PM, Guozhang Wang <
> >> wangg...@gmail.com
> >> > >
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > > Hello Jad,
> >> > > > > > >
> >> > > > > > > I double-checked the source code again, and found that
> >> actually
> >> > the
> >> > > > > > > preferred leader elector does consider whether the selected
> >> > replica
> >> > > > is
> >> > > > > in
> >> > > > > > > ISR or not. This means that by the time the election is
> >> > triggered,
> >> > > > > > broker 1
> >> > > > > > > is added to ISR by broker 0. Could you check before step 3,
> is
> >> > > there
> >> > > > > any
> >> > > > > > > log entries on broker 0 adding broker 1 to ISR or updating
> HW
> >> to
> >> > > 975?
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Thu, Jul 24, 2014 at 4:21 PM, Jad Naous <
> >> > > > jad.na...@appdynamics.com>
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > Hi,
> >> > > > > > > >
> >> > > > > > > > I have a test that continuously sends messages to one
> >> broker,
> >> > > > brings
> >> > > > > up
> >> > > > > > > > another broker, and adds it as a replica for all
> partitions,
> >> > with
> >> > > > it
> >> > > > > > > being
> >> > > > > > > > the preferred replica for some. I have
> >> > > > > > auto.leader.rebalance.enable=true,
> >> > > > > > > > so replica election gets triggered. Data is being pumped
> to
> >> the
> >> > > old
> >> > > > > > > broker
> >> > > > > > > > all the while. It seems that some data gets lost while
> >> > switching
> >> > > > over
> >> > > > > > to
> >> > > > > > > > the new leader. Is this a bug, or do I have something
> >> > > > misconfigured?
> >> > > > > I
> >> > > > > > > also
> >> > > > > > > > have request.required.acks=-1 on the producer.
> >> > > > > > > >
> >> > > > > > > > Here's what I think is happening:
> >> > > > > > > >
> >> > > > > > > > 1. Producer writes message to broker 0,
> >> > > > [EventServiceUpsertTopic,13],
> >> > > > > > w/
> >> > > > > > > > broker 0 currently leader, with ISR=(0), so write returns
> >> > > > > successfully,
> >> > > > > > > > even when acks = -1. Correlation id 35836
> >> > > > > > > >
> >> > > > > > > > Producer log:
> >> > > > > > > >
> >> > > > > > > > [2014-07-24 14:44:26,991]  [DEBUG]  [dw-97 - PATCH
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1]
> >> > > > > > > > [kafka.producer.BrokerPartitionInfo]  Partition
> >> > > > > > > > [EventServiceUpsertTopic,13] has leader 0
> >> > > > > > > >
> >> > > > > > > > [2014-07-24 14:44:26,993]  [DEBUG]  [dw-97 - PATCH
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1]
> >> > > > > > > > [k.producer.async.DefaultEventHandler]  Producer sent
> >> messages
> >> > > with
> >> > > > > > > > correlation id 35836 for topics
> >> [EventServiceUpsertTopic,13] to
> >> > > > > broker
> >> > > > > > 0
> >> > > > > > > on
> >> > > > > > > > localhost:56821
> >> > > > > > > > 2. Broker 1 is still catching up
> >> > > > > > > >
> >> > > > > > > > Broker 0 Log:
> >> > > > > > > >
> >> > > > > > > > [2014-07-24 14:44:26,992]  [DEBUG]
> >>  [kafka-request-handler-3]
> >> > > > > > > > [kafka.cluster.Partition]  Partition
> >> > [EventServiceUpsertTopic,13]
> >> > > > on
> >> > > > > > > broker
> >> > > > > > > > 0: Old hw for partition [EventServiceUpsertTopic,13] is
> 971.
> >> > New
> >> > > hw
> >> > > > > is
> >> > > > > > > 971.
> >> > > > > > > > All leo's are 975,971
> >> > > > > > > >
> >> > > > > > > > [2014-07-24 14:44:26,992]  [DEBUG]
> >>  [kafka-request-handler-3]
> >> > > > > > > > [kafka.server.KafkaApis]  [KafkaApi-0] Produce to local
> log
> >> in
> >> > 0
> >> > > ms
> >> > > > > > > >
> >> > > > > > > > [2014-07-24 14:44:26,992]  [DEBUG]
> >>  [kafka-processor-56821-0]
> >> > > > > > > > [kafka.request.logger]  Completed request:Name:
> >> > ProducerRequest;
> >> > > > > > Version:
> >> > > > > > > > 0; CorrelationId: 35836; ClientId: ; RequiredAcks: -1;
> >> > > > AckTimeoutMs:
> >> > > > > > > 10000
> >> > > > > > > > ms from client /127.0.0.1:57086
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> ;totalTime:0,requestQueueTime:0,localTime:0,remoteTime:0,responseQueueTime:0,sendTime:0
> >> > > > > > > > 3. Leader election is triggered by the scheduler:
> >> > > > > > > >
> >> > > > > > > > Broker 0 Log:
> >> > > > > > > >
> >> > > > > > > > [2014-07-24 14:44:26,991]  [INFO ]  [kafka-scheduler-0]
> >> > > > > > > > [k.c.PreferredReplicaPartitionLeaderSelector]
> >> > > > > > > > [PreferredReplicaPartitionLeaderSelector]: Current leader
> 0
> >> for
> >> > > > > > > partition [
> >> > > > > > > > EventServiceUpsertTopic,13] is not the preferred replica.
> >> > > > Trigerring
> >> > > > > > > > preferred replica leader election
> >> > > > > > > >
> >> > > > > > > > [2014-07-24 14:44:26,993]  [DEBUG]  [kafka-scheduler-0]
> >> > > > > > > > [kafka.utils.ZkUtils$]  Conditional update of path
> >> > > > > > > >
> /brokers/topics/EventServiceUpsertTopic/partitions/13/state
> >> > with
> >> > > > > value
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> {"controller_epoch":1,"leader":1,"version":1,"leader_epoch":3,"isr":[0,1]}
> >> > > > > > > > and expected version 3 succeeded, returning the new
> >> version: 4
> >> > > > > > > >
> >> > > > > > > > [2014-07-24 14:44:26,994]  [DEBUG]  [kafka-scheduler-0]
> >> > > > > > > > [k.controller.PartitionStateMachine]  [Partition state
> >> machine
> >> > on
> >> > > > > > > > Controller 0]: After leader election, leader cache is
> >> updated
> >> > to
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> Map(<Snipped>(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),<EndSnip>)
> >> > > > > > > >
> >> > > > > > > > [2014-07-24 14:44:26,994]  [INFO ]  [kafka-scheduler-0]
> >> > > > > > > > [kafka.controller.KafkaController]  [Controller 0]:
> >> Partition [
> >> > > > > > > > EventServiceUpsertTopic,13] completed preferred replica
> >> leader
> >> > > > > > election.
> >> > > > > > > > New leader is 1
> >> > > > > > > > 4. Broker 1 is still behind, but it sets the high water
> >> mark to
> >> > > > > 971!!!
> >> > > > > > > >
> >> > > > > > > > Broker 1 Log:
> >> > > > > > > >
> >> > > > > > > > [2014-07-24 14:44:26,999]  [INFO ]
> >>  [kafka-request-handler-6]
> >> > > > > > > > [kafka.server.ReplicaFetcherManager]
> >>  [ReplicaFetcherManager on
> >> > > > > broker
> >> > > > > > 1]
> >> > > > > > > > Removed fetcher for partitions
> [EventServiceUpsertTopic,13]
> >> > > > > > > >
> >> > > > > > > > [2014-07-24 14:44:27,000]  [DEBUG]
> >>  [kafka-request-handler-6]
> >> > > > > > > > [kafka.cluster.Partition]  Partition
> >> > [EventServiceUpsertTopic,13]
> >> > > > on
> >> > > > > > > broker
> >> > > > > > > > 1: Old hw for partition [EventServiceUpsertTopic,13] is
> 970.
> >> > New
> >> > > hw
> >> > > > > is
> >> > > > > > > -1.
> >> > > > > > > > All leo's are -1,971
> >> > > > > > > >
> >> > > > > > > > [2014-07-24 14:44:27,098]  [DEBUG]
> >>  [kafka-request-handler-3]
> >> > > > > > > > [kafka.server.KafkaApis]  [KafkaApi-1] Maybe update
> >> partition
> >> > HW
> >> > > > due
> >> > > > > to
> >> > > > > > > > fetch request: Name: FetchRequest; Version: 0;
> >> CorrelationId:
> >> > 1;
> >> > > > > > > ClientId:
> >> > > > > > > > ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 ms;
> >> > > MinBytes:
> >> > > > 1
> >> > > > > > > bytes;
> >> > > > > > > > RequestInfo: [EventServiceUpsertTopic,13] ->
> >> > > > > > > > PartitionFetchInfo(971,1048576), <Snipped>
> >> > > > > > > >
> >> > > > > > > > [2014-07-24 14:44:27,098]  [DEBUG]
> >>  [kafka-request-handler-3]
> >> > > > > > > > [kafka.cluster.Partition]  Partition
> >> > [EventServiceUpsertTopic,13]
> >> > > > on
> >> > > > > > > broker
> >> > > > > > > > 1: Recording follower 0 position 971 for partition [
> >> > > > > > > > EventServiceUpsertTopic,13].
> >> > > > > > > >
> >> > > > > > > > [2014-07-24 14:44:27,100]  [DEBUG]
> >>  [kafka-request-handler-3]
> >> > > > > > > > [kafka.cluster.Partition]  Partition
> >> > [EventServiceUpsertTopic,13]
> >> > > > on
> >> > > > > > > broker
> >> > > > > > > > 1: Highwatermark for partition
> [EventServiceUpsertTopic,13]
> >> > > updated
> >> > > > > to
> >> > > > > > > 971
> >> > > > > > > > 5. Consumer is none the wiser. All data that was in
> offsets
> >> > > 972-975
> >> > > > > > > doesn't
> >> > > > > > > > show up!
> >> > > > > > > >
> >> > > > > > > > I tried this with 2 initial replicas, and adding a 3rd
> >> which is
> >> > > > > > supposed
> >> > > > > > > to
> >> > > > > > > > be the leader for some new partitions, and this problem
> also
> >> > > > happens
> >> > > > > > > there.
> >> > > > > > > > The log on the old leader gets truncated to the offset on
> >> the
> >> > new
> >> > > > > > leader.
> >> > > > > > > > What's the solution? Can I make a new broker leader for
> >> > > partitions
> >> > > > > that
> >> > > > > > > are
> >> > > > > > > > currently active without losing data?
> >> > > > > > > >
> >> > > > > > > > Thanks,
> >> > > > > > > > Jad.
> >> > > > > > > >
> >> > > > > > > > --
> >> > > > > > > >  *Jad Naous* | Engineering | AppDynamics
> >> > > > > > > >  <http://www.appdynamics.com>
> >> > > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > --
> >> > > > > > > -- Guozhang
> >> > > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > --
> >> > > > > >  *Jad Naous* | Engineering | AppDynamics
> >> > > > > >  <http://www.appdynamics.com>
> >> > > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > --
> >> > > > > -- Guozhang
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >>
> >>
> >>
> >> --
> >>  *Jad Naous* | Engineering | AppDynamics
> >>  <http://www.appdynamics.com>
> >>
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>



-- 
 *Jad Naous* | Engineering | AppDynamics
 <http://www.appdynamics.com>

Reply via email to