Repository: kafka Updated Branches: refs/heads/trunk 6cbd97597 -> 6df9e7ff2
KAFKA-2721; Avoid handling duplicate LeaderAndISR requests Author: Dong Lin <[email protected]> Reviewers: Jun Rao <[email protected]? Closes #436 from lindong28/KAFKA-2721 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6df9e7ff Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6df9e7ff Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6df9e7ff Branch: refs/heads/trunk Commit: 6df9e7ff2c6cfb3c7ca16f94928d0e86f3d087e2 Parents: 6cbd975 Author: Dong Lin <[email protected]> Authored: Mon Nov 16 15:50:46 2015 -0800 Committer: Jun Rao <[email protected]> Committed: Mon Nov 16 15:50:46 2015 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/cluster/Partition.scala | 39 +++++++++++--------- .../scala/kafka/server/ReplicaManager.scala | 29 ++++++++++----- 2 files changed, 42 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6df9e7ff/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 70c8d99..3805dcc 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -157,12 +157,12 @@ class Partition(val topic: String, } /** - * Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time) - * and setting the new leader and ISR + * Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset + * from the time when this broker was the leader last time) and setting the new leader and ISR. + * If the leader replica id does not change, return false to indicate the replica manager. */ - def makeLeader(controllerId: Int, - partitionStateInfo: PartitionStateInfo, correlationId: Int) { - val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) { + def makeLeader(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId: Int): Boolean = { + val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr @@ -177,29 +177,34 @@ class Partition(val topic: String, inSyncReplicas = newInSyncReplicas leaderEpoch = leaderAndIsr.leaderEpoch zkVersion = leaderAndIsr.zkVersion - leaderReplicaIdOpt = Some(localBrokerId) - // construct the high watermark metadata for the new leader replica - val newLeaderReplica = getReplica().get - newLeaderReplica.convertHWToLocalOffsetMetadata() - // reset log end offset for remote replicas - assignedReplicas.foreach(r => - if (r.brokerId != localBrokerId) r.updateLogReadResult(LogReadResult.UnknownLogReadResult)) + val isNewLeader = + if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) { + false + } else { + leaderReplicaIdOpt = Some(localBrokerId) + true + } + val leaderReplica = getReplica().get // we may need to increment high watermark since ISR could be down to 1 - maybeIncrementLeaderHW(newLeaderReplica) + if (isNewLeader) { + // construct the high watermark metadata for the new leader replica + leaderReplica.convertHWToLocalOffsetMetadata() + // reset log end offset for remote replicas + assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult)) + } + (maybeIncrementLeaderHW(leaderReplica), isNewLeader) } - // some delayed operations may be unblocked after HW changed if (leaderHWIncremented) tryCompleteDelayedRequests() + isNewLeader } /** * Make the local replica the follower by setting the new leader and ISR to empty * If the leader replica id does not change, return false to indicate the replica manager */ - def makeFollower(controllerId: Int, - partitionStateInfo: PartitionStateInfo, - correlationId: Int): Boolean = { + def makeFollower(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId: Int): Boolean = { inWriteLock(leaderIsrUpdateLock) { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch http://git-wip-us.apache.org/repos/asf/kafka/blob/6df9e7ff/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 7823659..0dde914 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -618,6 +618,7 @@ class ReplicaManager(val config: KafkaConfig, "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s") .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, topic, partition.partitionId, partitionStateInfo.allReplicas.mkString(","))) + responseMap.put((topic, partitionId), ErrorMapping.UnknownTopicOrPartitionCode) } } else { // Otherwise record the error code in response @@ -663,7 +664,9 @@ class ReplicaManager(val config: KafkaConfig, * 3. Add these partitions to the leader partitions set * * If an unexpected error is thrown in this function, it will be propagated to KafkaApis where - * the error message will be set on each partition since we do not know which partition caused it + * the error message will be set on each partition since we do not know which partition caused it. Otherwise, + * return the set of partitions that are made leader due to this method + * * TODO: the above may need to be fixed later */ private def makeLeaders(controllerId: Int, @@ -679,18 +682,25 @@ class ReplicaManager(val config: KafkaConfig, for (partition <- partitionState.keys) responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) + val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set() + try { // First stop fetchers for all the partitions replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))) - partitionState.foreach { state => + // Update the partition information to be the leader + partitionState.foreach{ case (partition, partitionStateInfo) => + if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) + partitionsToMakeLeaders += partition + else + stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from " + + "controller %d epoch %d for partition %s since it is already the leader for the partition.") + .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(partition.topic, partition.partitionId))); + } + partitionsToMakeLeaders.foreach { partition => stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " + "%d epoch %d with correlation id %d for partition %s") - .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId))) + .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId))) } - // Update the partition information to be the leader - partitionState.foreach{ case (partition, partitionStateInfo) => - partition.makeLeader(controllerId, partitionStateInfo, correlationId)} - } catch { case e: Throwable => partitionState.foreach { state => @@ -709,7 +719,7 @@ class ReplicaManager(val config: KafkaConfig, .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) } - partitionState.keySet + partitionsToMakeLeaders } /* @@ -726,7 +736,8 @@ class ReplicaManager(val config: KafkaConfig, * are guaranteed to be flushed to disks * * If an unexpected error is thrown in this function, it will be propagated to KafkaApis where - * the error message will be set on each partition since we do not know which partition caused it + * the error message will be set on each partition since we do not know which partition caused it. Otherwise, + * return the set of partitions that are made follower due to this method */ private def makeFollowers(controllerId: Int, epoch: Int,
