KAFKA-851 Broken handling of leader and isr request leads to incorrect high watermark checkpoint file; reviewed by Jun Rao and Swapnil Ghike
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d3c343f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d3c343f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d3c343f Branch: refs/heads/trunk Commit: 1d3c343f57db55076c7e627bc362b4a64e22dcb2 Parents: afecc9f Author: Neha Narkhede <[email protected]> Authored: Thu Apr 4 21:19:52 2013 -0700 Committer: Neha Narkhede <[email protected]> Committed: Thu Apr 4 21:19:52 2013 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/cluster/Partition.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1d3c343f/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 2ca7ee6..aa2092e 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -179,13 +179,16 @@ class Partition(val topic: String, // record the epoch of the controller that made the leadership decision. This is useful while updating the isr // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch + // make sure local replica exists. This reads the last check pointed high watermark from disk. On startup, it is + // important to ensure that this operation happens for every single partition in a leader and isr request, else + // some high watermark values could be overwritten with 0. This leads to replicas fetching from the earliest offset + // on the leader + val localReplica = getOrCreateReplica() val newLeaderBrokerId: Int = leaderAndIsr.leader liveBrokers.find(_.id == newLeaderBrokerId) match { case Some(leaderBroker) => // stop fetcher thread to previous leader replicaFetcherManager.removeFetcher(topic, partitionId) - // make sure local replica exists - val localReplica = getOrCreateReplica() localReplica.log.get.truncateTo(localReplica.highWatermark) inSyncReplicas = Set.empty[Replica] leaderEpoch = leaderAndIsr.leaderEpoch
