Hi Guozhang,

Yes, broker 1 is in the ISR (in fact, I wait for broker 1 to be in the ISR
before triggering election). However, I think there is something still
amiss. I still see data loss. Here are some relevant log lines from broker
0 (different test run). The log on broker 0 is getting truncated, losing
some messages.

[2014-07-25 10:40:02,134]  [DEBUG]  [kafka-request-handler-5]
[kafka.cluster.Partition]  Partition [EventServiceUpsertTopic,19] on broker
0: Old hw for partition [EventServiceUpsertTopic,19] is 8082. New hw is
8082. All leo's are 8111,8082

[2014-07-25 10:40:02,134]  [DEBUG]  [kafka-request-handler-5]
[kafka.server.KafkaApis]  [KafkaApi-0] Produce to local log in 1 ms

[2014-07-25 10:40:02,134]  [DEBUG]  [main-SendThread(localhost:49893)]
[org.apache.zookeeper.ClientCnxn]  Reading reply
sessionid:0x476e9a9e9a0001, packet:: clientPath:null serverPath:null
finished:false header:: 729,5  replyHeader:: 729,4294968217,0  request::
'/brokers/topics/EventServiceUpsertTopic/partitions/5/state,#7b22636f6e74726f6c6c65725f65706f6368223a312c226c6561646572223a312c2276657273696f6e223a312c226c65616465725f65706f6368223a332c22697372223a5b302c315d7d,3
response::
s{4294967416,4294968217,1406309966419,1406310002132,4,0,0,0,74,0,4294967416}

[2014-07-25 10:40:02,134]  [DEBUG]  [kafka-processor-49917-2]
[kafka.request.logger]  Completed request:Name: ProducerRequest; Version:
0; CorrelationId: 16248; ClientId: ; RequiredAcks: -1; AckTimeoutMs: 10000
ms from client /127.0.0.1:50168
;totalTime:1,requestQueueTime:0,localTime:1,remoteTime:0,responseQueueTime:0,sendTime:0

[2014-07-25 10:40:02,134]  [DEBUG]
[ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899]
[kafka.utils.ZkUtils$]  Conditional update of path
/brokers/topics/EventServiceUpsertTopic/partitions/5/state with value
{"controller_epoch":1,"leader":1,"version":1,"leader_epoch":3,"isr":[0,1]}
and expected version 3 succeeded, returning the new version: 4

[2014-07-25 10:40:02,136]  [DEBUG]
[ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899]
[k.controller.PartitionStateMachine]  [Partition state machine on
Controller 0]: After leader election, leader cache is updated to
Map([EventServiceUpsertTopic,18] ->
(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
[EventServiceUpsertTopic,4] ->
(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
[EventServiceUpsertTopic,7] ->
(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
[EventServiceUpsertTopic,17] ->
(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
[EventServiceUpsertTopic,16] ->
(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
[EventServiceUpsertTopic,5] ->
(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
[EventServiceUpsertTopic,10] ->
(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
[EventServiceUpsertTopic,1] ->
(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
[EventServiceUpsertTopic,13] ->
(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
[EventServiceUpsertTopic,9] ->
(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
[EventServiceUpsertTopic,8] ->
(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
[EventServiceUpsertTopic,11] ->
(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
[EventServiceUpsertTopic,6] ->
(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
[EventServiceUpsertTopic,12] ->
(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
[EventServiceUpsertTopic,2] ->
(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
[EventServiceUpsertTopic,3] ->
(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
[EventServiceUpsertTopic,14] ->
(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1), [
EventServiceUpsertTopic,19] ->
(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),
[EventServiceUpsertTopic,0] ->
(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),
[EventServiceUpsertTopic,15] ->
(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1))

[2014-07-25 10:40:02,136]  [DEBUG]  [kafka-request-handler-1]
[kafka.cluster.Partition]  Partition [EventServiceUpsertTopic,19] on broker
0: Old hw for partition [EventServiceUpsertTopic,19] is 8082. New hw is
8082. All leo's are 8112,8082

[2014-07-25 10:40:02,136]  [DEBUG]  [kafka-request-handler-1]
[kafka.server.KafkaApis]  [KafkaApi-0] Produce to local log in 0 ms

[2014-07-25 10:40:02,137]  [DEBUG]  [kafka-processor-49917-2]
[kafka.request.logger]  Completed request:Name: ProducerRequest; Version:
0; CorrelationId: 16250; ClientId: ; RequiredAcks: -1; AckTimeoutMs: 10000
ms from client /127.0.0.1:50168
;totalTime:1,requestQueueTime:0,localTime:1,remoteTime:0,responseQueueTime:0,sendTime:0

[2014-07-25 10:40:02,140]  [INFO ]
[ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899]
[kafka.controller.KafkaController]  [Controller 0]: Partition [
EventServiceUpsertTopic,19] completed preferred replica leader election.
New leader is 1

[2014-07-25 10:40:02,140]  [INFO ]
[ZkClient-EventThread-13-localhost:49893,localhost:49896,localhost:49899]
[kafka.controller.KafkaController]  [Controller 0]: Partition
[EventServiceUpsertTopic,5] completed preferred replica leader election.
New leader is 1

[2014-07-25 10:40:02,142]  [DEBUG]  [main-SendThread(localhost:49893)]
[org.apache.zookeeper.ClientCnxn]  Got notification
sessionid:0x476e9a9e9a0001

[2014-07-25 10:40:02,142]  [DEBUG]  [main-SendThread(localhost:49893)]
[org.apache.zookeeper.ClientCnxn]  Got WatchedEvent state:SyncConnected
type:NodeDeleted path:/admin/preferred_replica_election for sessionid
0x476e9a9e9a0001

[2014-07-25 10:40:02,143]  [DEBUG]  [main-SendThread(localhost:49893)]
[org.apache.zookeeper.ClientCnxn]  Reading reply
sessionid:0x476e9a9e9a0001, packet:: clientPath:null serverPath:null
finished:false header:: 730,2  replyHeader:: 730,4294968218,0  request::
'/admin/preferred_replica_election,-1  response:: null

[2014-07-25 10:40:02,145]  [INFO ]  [kafka-request-handler-0]
[kafka.server.ReplicaFetcherManager]  [ReplicaFetcherManager on broker 0]
Removed fetcher for partitions
[EventServiceUpsertTopic,13],[EventServiceUpsertTopic,11],[EventServiceUpsertTopic,17],[EventServiceUpsertTopic,7],[EventServiceUpsertTopic,9],[EventServiceUpsertTopic,1],[EventServiceUpsertTopic,15],[
EventServiceUpsertTopic,19
],[EventServiceUpsertTopic,3],[EventServiceUpsertTopic,5]

[2014-07-25 10:40:02,153]  [INFO ]  [kafka-request-handler-0]
[kafka.log.Log]  Truncating log EventServiceUpsertTopic-19 to offset 8082.

[2014-07-25 10:40:02,159]  [INFO ]  [kafka-request-handler-0]
[kafka.log.Log]  Truncating log EventServiceUpsertTopic-5 to offset 0.


I have removed some irrelevant lines. As you can see, the log for
EventServiceUpsertTopic-19 is being truncated, losing messages that have
not yet been replicated to broker 1. I'm not sure exactly what the issue
is. Maybe new requests should be completely blocked after leader election
until the new leader catches up to the messages that have been acked and it
has not yet received.

Thanks,

Jad.


On Fri, Jul 25, 2014 at 2:14 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Jad,
>
> I double-checked the source code again, and found that actually the
> preferred leader elector does consider whether the selected replica is in
> ISR or not. This means that by the time the election is triggered, broker 1
> is added to ISR by broker 0. Could you check before step 3, is there any
> log entries on broker 0 adding broker 1 to ISR or updating HW to 975?
>
>
> On Thu, Jul 24, 2014 at 4:21 PM, Jad Naous <jad.na...@appdynamics.com>
> wrote:
>
> > Hi,
> >
> > I have a test that continuously sends messages to one broker, brings up
> > another broker, and adds it as a replica for all partitions, with it
> being
> > the preferred replica for some. I have auto.leader.rebalance.enable=true,
> > so replica election gets triggered. Data is being pumped to the old
> broker
> > all the while. It seems that some data gets lost while switching over to
> > the new leader. Is this a bug, or do I have something misconfigured? I
> also
> > have request.required.acks=-1 on the producer.
> >
> > Here's what I think is happening:
> >
> > 1. Producer writes message to broker 0, [EventServiceUpsertTopic,13], w/
> > broker 0 currently leader, with ISR=(0), so write returns successfully,
> > even when acks = -1. Correlation id 35836
> >
> > Producer log:
> >
> > [2014-07-24 14:44:26,991]  [DEBUG]  [dw-97 - PATCH
> >
> >
> /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1]
> > [kafka.producer.BrokerPartitionInfo]  Partition
> > [EventServiceUpsertTopic,13] has leader 0
> >
> > [2014-07-24 14:44:26,993]  [DEBUG]  [dw-97 - PATCH
> >
> >
> /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1]
> > [k.producer.async.DefaultEventHandler]  Producer sent messages with
> > correlation id 35836 for topics [EventServiceUpsertTopic,13] to broker 0
> on
> > localhost:56821
> > 2. Broker 1 is still catching up
> >
> > Broker 0 Log:
> >
> > [2014-07-24 14:44:26,992]  [DEBUG]  [kafka-request-handler-3]
> > [kafka.cluster.Partition]  Partition [EventServiceUpsertTopic,13] on
> broker
> > 0: Old hw for partition [EventServiceUpsertTopic,13] is 971. New hw is
> 971.
> > All leo's are 975,971
> >
> > [2014-07-24 14:44:26,992]  [DEBUG]  [kafka-request-handler-3]
> > [kafka.server.KafkaApis]  [KafkaApi-0] Produce to local log in 0 ms
> >
> > [2014-07-24 14:44:26,992]  [DEBUG]  [kafka-processor-56821-0]
> > [kafka.request.logger]  Completed request:Name: ProducerRequest; Version:
> > 0; CorrelationId: 35836; ClientId: ; RequiredAcks: -1; AckTimeoutMs:
> 10000
> > ms from client /127.0.0.1:57086
> >
> >
> ;totalTime:0,requestQueueTime:0,localTime:0,remoteTime:0,responseQueueTime:0,sendTime:0
> > 3. Leader election is triggered by the scheduler:
> >
> > Broker 0 Log:
> >
> > [2014-07-24 14:44:26,991]  [INFO ]  [kafka-scheduler-0]
> > [k.c.PreferredReplicaPartitionLeaderSelector]
> > [PreferredReplicaPartitionLeaderSelector]: Current leader 0 for
> partition [
> > EventServiceUpsertTopic,13] is not the preferred replica. Trigerring
> > preferred replica leader election
> >
> > [2014-07-24 14:44:26,993]  [DEBUG]  [kafka-scheduler-0]
> > [kafka.utils.ZkUtils$]  Conditional update of path
> > /brokers/topics/EventServiceUpsertTopic/partitions/13/state with value
> >
> {"controller_epoch":1,"leader":1,"version":1,"leader_epoch":3,"isr":[0,1]}
> > and expected version 3 succeeded, returning the new version: 4
> >
> > [2014-07-24 14:44:26,994]  [DEBUG]  [kafka-scheduler-0]
> > [k.controller.PartitionStateMachine]  [Partition state machine on
> > Controller 0]: After leader election, leader cache is updated to
> >
> Map(<Snipped>(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),<EndSnip>)
> >
> > [2014-07-24 14:44:26,994]  [INFO ]  [kafka-scheduler-0]
> > [kafka.controller.KafkaController]  [Controller 0]: Partition [
> > EventServiceUpsertTopic,13] completed preferred replica leader election.
> > New leader is 1
> > 4. Broker 1 is still behind, but it sets the high water mark to 971!!!
> >
> > Broker 1 Log:
> >
> > [2014-07-24 14:44:26,999]  [INFO ]  [kafka-request-handler-6]
> > [kafka.server.ReplicaFetcherManager]  [ReplicaFetcherManager on broker 1]
> > Removed fetcher for partitions [EventServiceUpsertTopic,13]
> >
> > [2014-07-24 14:44:27,000]  [DEBUG]  [kafka-request-handler-6]
> > [kafka.cluster.Partition]  Partition [EventServiceUpsertTopic,13] on
> broker
> > 1: Old hw for partition [EventServiceUpsertTopic,13] is 970. New hw is
> -1.
> > All leo's are -1,971
> >
> > [2014-07-24 14:44:27,098]  [DEBUG]  [kafka-request-handler-3]
> > [kafka.server.KafkaApis]  [KafkaApi-1] Maybe update partition HW due to
> > fetch request: Name: FetchRequest; Version: 0; CorrelationId: 1;
> ClientId:
> > ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1
> bytes;
> > RequestInfo: [EventServiceUpsertTopic,13] ->
> > PartitionFetchInfo(971,1048576), <Snipped>
> >
> > [2014-07-24 14:44:27,098]  [DEBUG]  [kafka-request-handler-3]
> > [kafka.cluster.Partition]  Partition [EventServiceUpsertTopic,13] on
> broker
> > 1: Recording follower 0 position 971 for partition [
> > EventServiceUpsertTopic,13].
> >
> > [2014-07-24 14:44:27,100]  [DEBUG]  [kafka-request-handler-3]
> > [kafka.cluster.Partition]  Partition [EventServiceUpsertTopic,13] on
> broker
> > 1: Highwatermark for partition [EventServiceUpsertTopic,13] updated to
> 971
> > 5. Consumer is none the wiser. All data that was in offsets 972-975
> doesn't
> > show up!
> >
> > I tried this with 2 initial replicas, and adding a 3rd which is supposed
> to
> > be the leader for some new partitions, and this problem also happens
> there.
> > The log on the old leader gets truncated to the offset on the new leader.
> > What's the solution? Can I make a new broker leader for partitions that
> are
> > currently active without losing data?
> >
> > Thanks,
> > Jad.
> >
> > --
> >  *Jad Naous* | Engineering | AppDynamics
> >  <http://www.appdynamics.com>
> >
>
>
>
> --
> -- Guozhang
>



-- 
 *Jad Naous* | Engineering | AppDynamics
 <http://www.appdynamics.com>

Reply via email to