[
https://issues.apache.org/jira/browse/KAFKA-2108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14486345#comment-14486345
]
Jiangjie Qin commented on KAFKA-2108:
-------------------------------------
[~slimunholyone] sent me some logs offline. Thanks for cleaning up the logs.
I went through the logs and here are some findings.
1. The controlled shutdown failed because of the the samza checkpoint
partition. This means in zookeeper the partition information somehow got lost.
This caused the unclean shutdown.
2. The unclean shutdown on broker 6 went through and controller was taken over
by broker 7. The migration looks fine because:
- Controller log on broker 6 shows leader migration on other partitions are
processed without error.
- State change log on broker 6 shows responses to the controller message from
other two brokers.
- [NOTE] The server log on broker 7 and broker 8 were not provided. These
logs are critical as we can confirm if the leader has been successfully
migrated. And what did they do to the later on fetch request from broker 6
after it starts up.
3. After broker 6 comes up, it becomes follower and issues fetch requests to
broker 7 and broker 8. But it got OffsetOutOfRange exception and was trying to
truncate log to the leaders' log end offset. However some log end offsets on
broker 7 and 8 looks very small, so this leads to log segment deletion on
broker 6. (The Samza checkpoint partition in this case is considered as if
never existed on broker 6, because zookeeper has no information for it. This
partition should not affect other partitions.)
So some questions I have here are:
1. When broker 6 shutdown, are there any under replicated partitions?
2. Could you provide server log on broker 7 and broker 8 during the same time
range as well?
Thanks.
Jiangjie (Becket) Qin
> Node deleted all data and re-sync from replicas after attempted upgrade from
> 0.8.1.1 to 0.8.2.0
> -----------------------------------------------------------------------------------------------
>
> Key: KAFKA-2108
> URL: https://issues.apache.org/jira/browse/KAFKA-2108
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.8.1.1
> Reporter: Thunder Stumpges
>
> Per [email
> thread|http://mail-archives.apache.org/mod_mbox/kafka-users/201504.mbox/%3CCAA%2BBczTUBqg1-tpcUjwfZgZYZyOXC-Myuhd_2EaGkeKWkrCVUQ%40mail.gmail.com%3E]
> in user group.
> We ran into an issue in an attempt to perform a rolling upgrade from 0.8.1.1
> to 0.8.2.0 (we should have had 0.8.2.1 but got the wrong binaries
> accidentally).
> In shutting down the first node, it failed a clean controlled shutdown due to
> one corrupt topic. The message in server.log was:
> {noformat}
> [2015-03-31 10:21:46,250] INFO [Kafka Server 6], Remaining partitions to
> move: [__samza_checkpoint_ver_1_for_usersessions_1,0]
> (kafka.server.KafkaServer)
> [2015-03-31 10:21:46,250] INFO [Kafka Server 6], Error code from controller:
> 0 (kafka.server.KafkaServer)
> {noformat}
> And related message in state-change.log:
> {noformat}
> [2015-03-31 10:21:42,622] TRACE Controller 6 epoch 23 started leader election
> for partition [__samza_checkpoint_ver_1_for_usersessions_1,0]
> (state.change.logger)
> [2015-03-31 10:21:42,623] ERROR Controller 6 epoch 23 encountered error while
> electing leader for partition [__samza_checkpoint_ver_1_for_usersessions_1,0]
> due to: LeaderAndIsr information doesn't exist for partition
> [__samza_checkpoint_ver_1_for_usersessions_1,0] in OnlinePartition state.
> (state.change.logger)
> [2015-03-31 10:21:42,623] TRACE Controller 6 epoch 23 received response
> correlationId 2360 for a request sent to broker id:8,host:xxxxxxx,port:9092
> (state.change.logger)
> [2015-03-31 10:21:42,623] ERROR Controller 6 epoch 23 initiated state change
> for partition [__samza_checkpoint_ver_1_for_usersessions_1,0] from
> OnlinePartition to OnlinePartition failed (state.change.logger)
> kafka.common.StateChangeFailedException: encountered error while electing
> leader for partition [__samza_checkpoint_ver_1_for_usersessions_1,0] due to:
> LeaderAndIsr information doesn't exist for partition
> [__samza_checkpoint_ver_1_for_usersessions_1,0] in OnlinePartition state.
> at
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:360)
> at
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:187)
> at
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:125)
> at
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:124)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:86)
> at
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:124)
> at
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply(KafkaController.scala:257)
> at
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply(KafkaController.scala:253)
> at scala.Option.foreach(Option.scala:197)
> at
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply$mcV$sp(KafkaController.scala:253)
> at
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply(KafkaController.scala:253)
> at
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply(KafkaController.scala:253)
> at kafka.utils.Utils$.inLock(Utils.scala:538)
> at
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3.apply(KafkaController.scala:252)
> at
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3.apply(KafkaController.scala:249)
> at
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:130)
> at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
> at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
> at
> kafka.controller.KafkaController.shutdownBroker(KafkaController.scala:249)
> at
> kafka.server.KafkaApis.handleControlledShutdownRequest(KafkaApis.scala:264)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:192)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> at java.lang.Thread.run(Thread.java:744)
> Caused by: kafka.common.StateChangeFailedException: LeaderAndIsr information
> doesn't exist for partition [__samza_checkpoint_ver_1_for_usersessions_1,0]
> in OnlinePartition state
> at
> kafka.controller.PartitionStateMachine.getLeaderIsrAndEpochOrThrowException(PartitionStateMachine.scala:391)
> at
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:324)
> ... 22 more
> {noformat}
> Then when starting up, we got many angry messages like these:
> {noformat}
> [2015-03-31 10:24:10,960] WARN [Replica Manager on Broker 6]: Fetch request
> with correlation id 159883938 from client ReplicaFetcherThread-0-6 on
> partition [__samza_checkpoint_ver_1_for_usersessions_1,0] failed due to
> Partition [__samza_checkpoint_ver_1_for_usersessions_1,0] doesn't exist on 6
> (kafka.server.ReplicaManager) [2015-03-31 10:24:10,989] WARN [Replica Manager
> on Broker 6]: While recording the follower position, the partition
> [__samza_checkpoint_ver_1_for_usersessions_1,0] hasn't been created, skip
> updating leader HW (kafka.server.ReplicaManager)
> [2015-03-31 10:24:10,989] WARN [Replica Manager on Broker 6]: While recording
> the follower position, the partition
> [__samza_checkpoint_ver_1_for_usersessions_1,0] hasn't been created, skip
> updating leader HW (kafka.server.ReplicaManager)
> {noformat}
> Then they change to these:
> {noformat}
> [2015-03-31 10:24:12,008] WARN [Replica Manager on Broker 6]: Fetch request
> with correlation id 159884643 from client ReplicaFetcherThread-0-6 on
> partition [__samza_checkpoint_ver_1_for_usersessions_1,0] failed due to
> Partition [__samza_checkpoint_ver_1_for_usersessions_1,0] doesn't exist on 6
> (kafka.server.ReplicaManager)
> [2015-03-31 10:24:12,009] WARN [Replica Manager on Broker 6]: While recording
> the follower position, the partition
> [__samza_checkpoint_ver_1_for_usersessions_1,0] hasn't been created, skip
> updating leader HW (kafka.server.ReplicaManager)
> [2015-03-31 10:24:12,009] WARN [Replica Manager on Broker 6]: Fetch request
> with correlation id 158861852 from client ReplicaFetcherThread-0-6 on
> partition [__samza_checkpoint_ver_1_for_usersessions_1,0] failed due to
> Partition [__samza_checkpoint_ver_1_for_usersessions_1,0] doesn't exist on 6
> (kafka.server.ReplicaManager)
> [2015-03-31 10:24:12,009] WARN [Replica Manager on Broker 6]: While recording
> the follower position, the partition
> [__samza_checkpoint_ver_1_for_usersessions_1,0] hasn't been created, skip
> updating leader HW (kafka.server.ReplicaManager)
> {noformat}
> And a bunch like this:
> {noformat}
> [2015-03-31 10:24:12,019] ERROR [KafkaApi-6] error when handling request
> Name: FetchRequest; Version: 0; CorrelationId: 158861855; ClientId:
> ReplicaFetcherThread-0-6; ReplicaId: 8; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [__samza_checkpoint_ver_1_for_usersessions_1,0] ->
> PartitionFetchInfo(4461,1048576) (kafka.server.KafkaApis)
> kafka.common.NotAssignedReplicaException: Leader 6 failed to record follower
> 8's position -1 since the replica is not recognized to be one of the assigned
> replicas for partition [__samza_checkpoint_ver_1_for_usersessions_1,0]
> at
> kafka.server.ReplicaManager.updateReplicaLEOAndPartitionHW(ReplicaManager.scala:574)
> at
> kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:388)
> at
> kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:386)
> at
> scala.collection.immutable.MapLike$$anon$2$$anonfun$foreach$3.apply(MapLike.scala:109)
> at
> scala.collection.immutable.MapLike$$anon$2$$anonfun$foreach$3.apply(MapLike.scala:109)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:119)
> at
> scala.collection.immutable.MapLike$$anon$2.foreach(MapLike.scala:109)
> at
> kafka.server.KafkaApis.recordFollowerLogEndOffsets(KafkaApis.scala:386)
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:351)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
> at java.lang.Thread.run(Thread.java:744)
> {noformat}
> And it starts truncating the logs (not to 0, guessing as part of becoming
> follower?) ...
> {noformat}
> [2015-03-31 10:24:15,897] INFO [ReplicaFetcherManager on broker 6] Removed
> fetcher for partitions
> [my-store-changelog,1],[stage_vsw.avrodto.addelivery.internalstats.trafficshaperlog,3],+
> a bunch of partitions... (kafka.server.ReplicaFetcherManager)
> [2015-03-31 10:24:16,013] INFO Truncating log my-store-changelog-1 to offset
> 3736. (kafka.log.Log)
> [2015-03-31 10:24:16,014] INFO Truncating log xyz.topic1-3 to offset 2930.
> (kafka.log.Log)
> [2015-03-31 10:24:16,014] INFO Truncating log xyz.topic2-2 to offset 10176.
> (kafka.log.Log)
> {noformat}
> This continues for many / all partitions, then... It starts deleting logs!?
> {noformat}
> [2015-03-31 10:24:38,449] INFO [ReplicaFetcherManager on broker 6] Removed
> fetcher for partitions [my-store-changelog,1],[xyz.topic1,1],...
> (kafka.server.ReplicaFetcherManager)
> [2015-03-31 10:24:40,700] INFO Scheduling log segment 6587 for log
> xyz.topic1-2 for deletion. (kafka.log.Log)
> [2015-03-31 10:24:40,701] INFO Scheduling log segment 6594 for log
> xyz.topic1-2 for deletion. (kafka.log.Log)
> [2015-03-31 10:24:40,701] INFO Scheduling log segment 6595 for log
> xyz.topic1-2 for deletion. (kafka.log.Log)
> [2015-03-31 10:24:40,702] INFO Scheduling log segment 203953 for log
> xyz.topic2-4 for deletion. (kafka.log.Log)
> [2015-03-31 10:24:40,702] INFO Scheduling log segment 210571 for log
> xyz.topic2-4 for deletion. (kafka.log.Log)
> [2015-03-31 10:24:40,702] INFO Scheduling log segment 211471 for log
> xyz.topic2-4 for deletion. (kafka.log.Log)
> {noformat}
> then it starts actually deleting them... this goes on for a good 20 minutes...
> {noformat}
> [2015-03-31 10:25:40,704] INFO Deleting segment 6587 from log xyz.topic1-2.
> (kafka.log.Log)
> [2015-03-31 10:25:40,716] INFO Deleting index
> /data4/kafka-data/xyz.topic1-2/00000000000000006587.index.deleted
> (kafka.log.OffsetIndex)
> [2015-03-31 10:25:40,716] INFO Deleting segment 6594 from log xyz.topic1-2.
> (kafka.log.Log)
> [2015-03-31 10:25:40,717] INFO Deleting index
> /data4/kafka-data/xyz.topic1-2/00000000000000006594.index.deleted
> (kafka.log.OffsetIndex)
> [2015-03-31 10:25:40,717] INFO Deleting segment 6595 from log xyz.topic1-2.
> (kafka.log.Log)
> [2015-03-31 10:25:40,717] INFO Deleting index
> /data4/kafka-data/xyz.topic1-2/00000000000000006595.index.deleted
> (kafka.log.OffsetIndex)
> [2015-03-31 10:25:40,717] INFO Deleting segment 203953 from log xyz.topic2-4.
> (kafka.log.Log)
> [2015-03-31 10:25:40,722] INFO Deleting segment 210571 from log xyz.topic2-4.
> (kafka.log.Log)
> [2015-03-31 10:25:40,729] INFO Deleting index
> /data4/kafka-data/xyz.topic2-4/00000000000000203953.index.deleted
> (kafka.log.OffsetIndex)
> [2015-03-31 10:25:40,729] INFO Deleting segment 211471 from log xyz.topic2-4.
> (kafka.log.Log)
> {noformat}
> I don't know that we knew what was happening exactly at this time, only that
> it was not sync'ing up with the others. I think the sys-engineer stopped it
> after about 20 minutes to see what was wrong... I think by this point the
> damage was done.
> Any ideas? It makes us more than a little nervous to restart nodes if they
> will just "decide" to delete segments. Under what conditions would this
> happen?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)