[ 
https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15280411#comment-15280411
 ] 

Maysam Yabandeh commented on KAFKA-3693:
----------------------------------------

[~junrao] based on controller logs the following seems to be the faulty 
scenario that breaks the assumption that controller sends to a new broker 
LeadAndIsr requests that contains all the partitions: (note: the manual 
investigation process is error-prone and subject to mistakes)

# KafkaController::shutdownBroker
{code}
INFO controller.KafkaController: [Controller 17]: Shutting down broker 20
{code}
{code}
// If the broker is a follower, updates the isr in ZK and notifies the current 
leader

replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,

  topicAndPartition.partition, id)), OfflineReplica)
{code} Note here that input set has only one member
# ReplicaStateMachine::handleStateChanges(Set\[PartitionAndReplica\], 
ReplicaState, Callbacks)
{code}
INFO controller.ReplicaStateMachine: [Replica state machine on controller 17]: 
Invoking state change to OfflineReplica for replicas 
[Topic=topic-xyz,Partition=134,Replica=20]
{code}
{code}
brokerRequestBatch.newBatch()

partitions.foreach { topicAndPartition =>
  
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, 
targetState, leaderSelector, callbacks)
}

brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
{code}
# ReplicaStateMachine::handleStateChange(PartitionAndReplica, ReplicaState, 
Callbacks)
{code}
TRACE change.logger: Controller 17 epoch 269 changed state of replica 20 for 
partition [topic-xyz,134] from OnlineReplica to OfflineReplica
{code}
{code}
// send the shrunk ISR state change request to all the remaining alive replicas 
of the partition
...
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_
 == replicaId),
    topic, partition, updatedLeaderIsrAndControllerEpoch, 
replicaAssignment)
{code}
# ControllerStateManager::sendRequestsToBrokers
{code}
TRACE change.logger: Controller 17 epoch 269 sending become-leader LeaderAndIsr 
request (Leader:16,ISR:16,LeaderEpoch:79,ControllerEpoch:269) to broker 16 for 
partition [topic-xyz,134]
{code}
{code}
val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, 
controllerEpoch, partitionStates.asJava, leaders.asJava)
{code}

This particular case aside, since the consequences of this assumption being 
violated (either by an existing bug or a future bug) are catastrophic, I guess 
it is wise to have the broker code defensive against the corner cases that this 
assumption could be violated.

> Race condition between highwatermark-checkpoint thread and 
> handleLeaderAndIsrRequest at broker start-up
> -------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-3693
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3693
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.9.0.1
>            Reporter: Maysam Yabandeh
>
> Upon broker start-up, a race between highwatermark-checkpoint thread to write 
> replication-offset-checkpoint file and handleLeaderAndIsrRequest thread 
> reading from it causes the highwatermark for some partitions to be reset to 
> 0. In the good case, this results the replica to truncate its entire log to 0 
> and hence initiates fetching of terabytes of data from the lead broker, which 
> sometimes leads to hours of downtime. We observed the bad cases that the 
> reset offset can propagate to recovery-point-offset-checkpoint file, making a 
> lead broker to truncate the file. This seems to have the potential to lead to 
> data loss if the truncation happens at both follower and leader brokers.
> This is the particular faulty scenario manifested in our tests:
> # The broker restarts and receive LeaderAndIsr from the controller
> # LeaderAndIsr message however does not contain all the partitions (probably 
> because other brokers were churning at the same time)
> # becomeLeaderOrFollower calls getOrCreatePartition and updates the 
> allPartitions with the partitions included in the LeaderAndIsr message {code}
>   def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
>     var partition = allPartitions.get((topic, partitionId))
>     if (partition == null) {
>       allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, 
> partitionId, time, this))
> {code}
> # replication-offset-checkpoint jumps in taking a snapshot of (the partial) 
> allReplicas' high watermark into replication-offset-checkpoint file {code}  
> def checkpointHighWatermarks() {
>     val replicas = 
> allPartitions.values.map(_.getReplica(config.brokerId)).collect{case 
> Some(replica) => replica}{code} hence rewriting the previous highwatermarks.
> # Later becomeLeaderOrFollower calls makeLeaders and makeFollowers which read 
> the (now partial) file through Partition::getOrCreateReplica {code}
>           val checkpoint = 
> replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
>           val offsetMap = checkpoint.read
>           if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
>             info("No checkpointed highwatermark is found for partition 
> [%s,%d]".format(topic, partitionId))
> {code}
> We are not entirely sure whether the initial LeaderAndIsr message including a 
> subset of partitions is critical in making this race condition manifest or 
> not. But it is an important detail since it clarifies that a solution based 
> on not letting the highwatermark-checkpoint thread jumping in the middle of 
> processing a LeaderAndIsr message would not suffice.
> The solution we are thinking of is to force initializing allPartitions by the 
> partitions listed in the replication-offset-checkpoint (and perhaps 
> recovery-point-offset-checkpoint file too) when a server starts.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to