[ https://issues.apache.org/jira/browse/KAFKA-1561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14077117#comment-14077117 ]
Jad Naous commented on KAFKA-1561: ---------------------------------- Here's some more detailed info on what the latest test does (from which these logs are obtained): 0) Start two brokers, one producer, one consumer. Topic has 20 partitions, using default partitioning scheme (which seems to send data to only a couple of partitions when the keys are null, but that doesn't matter for this test). 1) Start a data generator sending data through Kafka continuously 2) Start a new broker 3) Reassign partitions: {code} {"version": 1, "partitions":[ {"topic":"EventServiceUpsertTopic","partition":0, "replicas": [0, 1, 2]}, {"topic":"EventServiceUpsertTopic","partition":1, "replicas": [1, 2, 0]}, {"topic":"EventServiceUpsertTopic","partition":2, "replicas": [2, 0, 1]}, {"topic":"EventServiceUpsertTopic","partition":3, "replicas": [0, 1, 2]}, {"topic":"EventServiceUpsertTopic","partition":4, "replicas": [1, 2, 0]}, {"topic":"EventServiceUpsertTopic","partition":5, "replicas": [2, 0, 1]}, {"topic":"EventServiceUpsertTopic","partition":6, "replicas": [0, 1, 2]}, {"topic":"EventServiceUpsertTopic","partition":7, "replicas": [1, 2, 0]}, {"topic":"EventServiceUpsertTopic","partition":8, "replicas": [2, 0, 1]}, {"topic":"EventServiceUpsertTopic","partition":9, "replicas": [0, 1, 2]}, {"topic":"EventServiceUpsertTopic","partition":10, "replicas": [1, 2, 0]}, {"topic":"EventServiceUpsertTopic","partition":11, "replicas": [2, 0, 1]}, {"topic":"EventServiceUpsertTopic","partition":12, "replicas": [0, 1, 2]}, {"topic":"EventServiceUpsertTopic","partition":13, "replicas": [1, 2, 0]}, {"topic":"EventServiceUpsertTopic","partition":14, "replicas": [2, 0, 1]}, {"topic":"EventServiceUpsertTopic","partition":15, "replicas": [0, 1, 2]}, {"topic":"EventServiceUpsertTopic","partition":16, "replicas": [1, 2, 0]}, {"topic":"EventServiceUpsertTopic","partition":17, "replicas": [2, 0, 1]}, {"topic":"EventServiceUpsertTopic","partition":18, "replicas": [0, 1, 2]}, {"topic":"EventServiceUpsertTopic","partition":19, "replicas": [1, 2, 0]}]} {code} 4) Wait until reassignment is complete (i.e. until ZkUtils.getPartitionsBeingReassigned() returns empty map) 5) Wait until all replicas are caught up (i.e. until ZkUtils.getLeaderAndIsrForPartition() returns all brokers in the ISR for each partition) 6) Trigger leader re-election by calling the PreferredReplicaLeaderElectionCommand 7) Wait until all the leaders are the preferred leaders for partitions according to the replica reassignment from step 3 8) Stop the data generator 9) Check that all the data was consumed You can see from the producer.log that the data: {{ {"field1": ["10"], "idField": "id-5-59"} }} was sent to broker0 successfully, but the consumer never sees it. > Data Loss for Incremented Replica Factor and Leader Election > ------------------------------------------------------------ > > Key: KAFKA-1561 > URL: https://issues.apache.org/jira/browse/KAFKA-1561 > Project: Kafka > Issue Type: Bug > Reporter: Guozhang Wang > Assignee: Guozhang Wang > Fix For: 0.8.2 > > Attachments: broker0.log, broker2.log, consumer.log, producer.log > > > This is reported on the mailing list (thanks to Jad). > {quote} > Hi, > I have a test that continuously sends messages to one broker, brings up > another broker, and adds it as a replica for all partitions, with it being > the preferred replica for some. I have auto.leader.rebalance.enable=true, > so replica election gets triggered. Data is being pumped to the old broker > all the while. It seems that some data gets lost while switching over to > the new leader. Is this a bug, or do I have something misconfigured? I also > have request.required.acks=-1 on the producer. > Here's what I think is happening: > 1. Producer writes message to broker 0, [EventServiceUpsertTopic,13], w/ > broker 0 currently leader, with ISR=(0), so write returns successfully, > even when acks = -1. Correlation id 35836 > Producer log: > [2014-07-24 14:44:26,991] [DEBUG] [dw-97 - PATCH > /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1] > [kafka.producer.BrokerPartitionInfo] Partition > [EventServiceUpsertTopic,13] has leader 0 > [2014-07-24 14:44:26,993] [DEBUG] [dw-97 - PATCH > /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1] > [k.producer.async.DefaultEventHandler] Producer sent messages with > correlation id 35836 for topics [EventServiceUpsertTopic,13] to broker 0 on > localhost:56821 > 2. Broker 1 is still catching up > Broker 0 Log: > [2014-07-24 14:44:26,992] [DEBUG] [kafka-request-handler-3] > [kafka.cluster.Partition] Partition [EventServiceUpsertTopic,13] on broker > 0: Old hw for partition [EventServiceUpsertTopic,13] is 971. New hw is 971. > All leo's are 975,971 > [2014-07-24 14:44:26,992] [DEBUG] [kafka-request-handler-3] > [kafka.server.KafkaApis] [KafkaApi-0] Produce to local log in 0 ms > [2014-07-24 14:44:26,992] [DEBUG] [kafka-processor-56821-0] > [kafka.request.logger] Completed request:Name: ProducerRequest; Version: > 0; CorrelationId: 35836; ClientId: ; RequiredAcks: -1; AckTimeoutMs: 10000 > ms from client /127.0.0.1:57086 > ;totalTime:0,requestQueueTime:0,localTime:0,remoteTime:0,responseQueueTime:0,sendTime:0 > 3. Leader election is triggered by the scheduler: > Broker 0 Log: > [2014-07-24 14:44:26,991] [INFO ] [kafka-scheduler-0] > [k.c.PreferredReplicaPartitionLeaderSelector] > [PreferredReplicaPartitionLeaderSelector]: Current leader 0 for partition [ > EventServiceUpsertTopic,13] is not the preferred replica. Trigerring > preferred replica leader election > [2014-07-24 14:44:26,993] [DEBUG] [kafka-scheduler-0] > [kafka.utils.ZkUtils$] Conditional update of path > /brokers/topics/EventServiceUpsertTopic/partitions/13/state with value > {"controller_epoch":1,"leader":1,"version":1,"leader_epoch":3,"isr":[0,1]} > and expected version 3 succeeded, returning the new version: 4 > [2014-07-24 14:44:26,994] [DEBUG] [kafka-scheduler-0] > [k.controller.PartitionStateMachine] [Partition state machine on > Controller 0]: After leader election, leader cache is updated to > Map(<Snipped>(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),<EndSnip>) > [2014-07-24 14:44:26,994] [INFO ] [kafka-scheduler-0] > [kafka.controller.KafkaController] [Controller 0]: Partition [ > EventServiceUpsertTopic,13] completed preferred replica leader election. > New leader is 1 > 4. Broker 1 is still behind, but it sets the high water mark to 971!!! > Broker 1 Log: > [2014-07-24 14:44:26,999] [INFO ] [kafka-request-handler-6] > [kafka.server.ReplicaFetcherManager] [ReplicaFetcherManager on broker 1] > Removed fetcher for partitions [EventServiceUpsertTopic,13] > [2014-07-24 14:44:27,000] [DEBUG] [kafka-request-handler-6] > [kafka.cluster.Partition] Partition [EventServiceUpsertTopic,13] on broker > 1: Old hw for partition [EventServiceUpsertTopic,13] is 970. New hw is -1. > All leo's are -1,971 > [2014-07-24 14:44:27,098] [DEBUG] [kafka-request-handler-3] > [kafka.server.KafkaApis] [KafkaApi-1] Maybe update partition HW due to > fetch request: Name: FetchRequest; Version: 0; CorrelationId: 1; ClientId: > ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; > RequestInfo: [EventServiceUpsertTopic,13] -> > PartitionFetchInfo(971,1048576), <Snipped> > [2014-07-24 14:44:27,098] [DEBUG] [kafka-request-handler-3] > [kafka.cluster.Partition] Partition [EventServiceUpsertTopic,13] on broker > 1: Recording follower 0 position 971 for partition [ > EventServiceUpsertTopic,13]. > [2014-07-24 14:44:27,100] [DEBUG] [kafka-request-handler-3] > [kafka.cluster.Partition] Partition [EventServiceUpsertTopic,13] on broker > 1: Highwatermark for partition [EventServiceUpsertTopic,13] updated to 971 > 5. Consumer is none the wiser. All data that was in offsets 972-975 doesn't > show up! > I tried this with 2 initial replicas, and adding a 3rd which is supposed to > be the leader for some new partitions, and this problem also happens there. > The log on the old leader gets truncated to the offset on the new leader. > What's the solution? Can I make a new broker leader for partitions that are > currently active without losing data? > Thanks, > Jad. > {quote} -- This message was sent by Atlassian JIRA (v6.2#6252)