[
https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15279284#comment-15279284
]
Ismael Juma commented on KAFKA-3693:
------------------------------------
cc [~junrao]
> Race condition between highwatermark-checkpoint thread and
> handleLeaderAndIsrRequest at broker start-up
> -------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-3693
> URL: https://issues.apache.org/jira/browse/KAFKA-3693
> Project: Kafka
> Issue Type: Bug
> Components: core
> Affects Versions: 0.9.0.1
> Reporter: Maysam Yabandeh
>
> Upon broker start-up, a race between highwatermark-checkpoint thread to write
> replication-offset-checkpoint file and handleLeaderAndIsrRequest thread
> reading from it causes the highwatermark for some partitions to be reset to
> 0. In the good case, this results the replica to truncate its entire log to 0
> and hence initiates fetching of terabytes of data from the lead broker, which
> sometimes leads to hours of downtime. We observed the bad cases that the
> reset offset can propagate to recovery-point-offset-checkpoint file, making a
> lead broker to truncate the file. This seems to have the potential to lead to
> data loss if the truncation happens at both follower and leader brokers.
> This is the particular faulty scenario manifested in our tests:
> # The broker restarts and receive LeaderAndIsr from the controller
> # LeaderAndIsr message however does not contain all the partitions (probably
> because other brokers were churning at the same time)
> # becomeLeaderOrFollower calls getOrCreatePartition and updates the
> allPartitions with the partitions included in the LeaderAndIsr message {code}
> def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
> var partition = allPartitions.get((topic, partitionId))
> if (partition == null) {
> allPartitions.putIfNotExists((topic, partitionId), new Partition(topic,
> partitionId, time, this))
> {code}
> # replication-offset-checkpoint jumps in taking a snapshot of (the partial)
> allReplicas' high watermark into replication-offset-checkpoint file {code}
> def checkpointHighWatermarks() {
> val replicas =
> allPartitions.values.map(_.getReplica(config.brokerId)).collect{case
> Some(replica) => replica}{code} hence rewriting the previous highwatermarks.
> # Later becomeLeaderOrFollower calls makeLeaders and makeFollowers which read
> the (now partial) file through Partition::getOrCreateReplica {code}
> val checkpoint =
> replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
> val offsetMap = checkpoint.read
> if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
> info("No checkpointed highwatermark is found for partition
> [%s,%d]".format(topic, partitionId))
> {code}
> We are not entirely sure whether the initial LeaderAndIsr message including a
> subset of partitions is critical in making this race condition manifest or
> not. But it is an important detail since it clarifies that a solution based
> on not letting the highwatermark-checkpoint thread jumping in the middle of
> processing a LeaderAndIsr message would not suffice.
> The solution we are thinking of is to force initializing allPartitions by the
> partitions listed in the replication-offset-checkpoint (and perhaps
> recovery-point-offset-checkpoint file too) when a server starts.
> Thoughts?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)