[ https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15280411#comment-15280411 ]
Maysam Yabandeh commented on KAFKA-3693: ---------------------------------------- [~junrao] based on controller logs the following seems to be the faulty scenario that breaks the assumption that controller sends to a new broker LeadAndIsr requests that contains all the partitions: (note: the manual investigation process is error-prone and subject to mistakes) # KafkaController::shutdownBroker {code} INFO controller.KafkaController: [Controller 17]: Shutting down broker 20 {code} {code} // If the broker is a follower, updates the isr in ZK and notifies the current leader replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, id)), OfflineReplica) {code} Note here that input set has only one member # ReplicaStateMachine::handleStateChanges(Set\[PartitionAndReplica\], ReplicaState, Callbacks) {code} INFO controller.ReplicaStateMachine: [Replica state machine on controller 17]: Invoking state change to OfflineReplica for replicas [Topic=topic-xyz,Partition=134,Replica=20] {code} {code} brokerRequestBatch.newBatch() partitions.foreach { topicAndPartition => handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks) } brokerRequestBatch.sendRequestsToBrokers(controller.epoch) {code} # ReplicaStateMachine::handleStateChange(PartitionAndReplica, ReplicaState, Callbacks) {code} TRACE change.logger: Controller 17 epoch 269 changed state of replica 20 for partition [topic-xyz,134] from OnlineReplica to OfflineReplica {code} {code} // send the shrunk ISR state change request to all the remaining alive replicas of the partition ... brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId), topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment) {code} # ControllerStateManager::sendRequestsToBrokers {code} TRACE change.logger: Controller 17 epoch 269 sending become-leader LeaderAndIsr request (Leader:16,ISR:16,LeaderEpoch:79,ControllerEpoch:269) to broker 16 for partition [topic-xyz,134] {code} {code} val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava) {code} This particular case aside, since the consequences of this assumption being violated (either by an existing bug or a future bug) are catastrophic, I guess it is wise to have the broker code defensive against the corner cases that this assumption could be violated. > Race condition between highwatermark-checkpoint thread and > handleLeaderAndIsrRequest at broker start-up > ------------------------------------------------------------------------------------------------------- > > Key: KAFKA-3693 > URL: https://issues.apache.org/jira/browse/KAFKA-3693 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 0.9.0.1 > Reporter: Maysam Yabandeh > > Upon broker start-up, a race between highwatermark-checkpoint thread to write > replication-offset-checkpoint file and handleLeaderAndIsrRequest thread > reading from it causes the highwatermark for some partitions to be reset to > 0. In the good case, this results the replica to truncate its entire log to 0 > and hence initiates fetching of terabytes of data from the lead broker, which > sometimes leads to hours of downtime. We observed the bad cases that the > reset offset can propagate to recovery-point-offset-checkpoint file, making a > lead broker to truncate the file. This seems to have the potential to lead to > data loss if the truncation happens at both follower and leader brokers. > This is the particular faulty scenario manifested in our tests: > # The broker restarts and receive LeaderAndIsr from the controller > # LeaderAndIsr message however does not contain all the partitions (probably > because other brokers were churning at the same time) > # becomeLeaderOrFollower calls getOrCreatePartition and updates the > allPartitions with the partitions included in the LeaderAndIsr message {code} > def getOrCreatePartition(topic: String, partitionId: Int): Partition = { > var partition = allPartitions.get((topic, partitionId)) > if (partition == null) { > allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, > partitionId, time, this)) > {code} > # replication-offset-checkpoint jumps in taking a snapshot of (the partial) > allReplicas' high watermark into replication-offset-checkpoint file {code} > def checkpointHighWatermarks() { > val replicas = > allPartitions.values.map(_.getReplica(config.brokerId)).collect{case > Some(replica) => replica}{code} hence rewriting the previous highwatermarks. > # Later becomeLeaderOrFollower calls makeLeaders and makeFollowers which read > the (now partial) file through Partition::getOrCreateReplica {code} > val checkpoint = > replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath) > val offsetMap = checkpoint.read > if (!offsetMap.contains(TopicAndPartition(topic, partitionId))) > info("No checkpointed highwatermark is found for partition > [%s,%d]".format(topic, partitionId)) > {code} > We are not entirely sure whether the initial LeaderAndIsr message including a > subset of partitions is critical in making this race condition manifest or > not. But it is an important detail since it clarifies that a solution based > on not letting the highwatermark-checkpoint thread jumping in the middle of > processing a LeaderAndIsr message would not suffice. > The solution we are thinking of is to force initializing allPartitions by the > partitions listed in the replication-offset-checkpoint (and perhaps > recovery-point-offset-checkpoint file too) when a server starts. > Thoughts? -- This message was sent by Atlassian JIRA (v6.3.4#6332)