Repository: kafka Updated Branches: refs/heads/trunk ad71b9d06 -> 406071c2b
KAFKA-6046; DeleteRecordsRequest to a follower should return NOT_LEADER Tested with DeleteRecordsRequestTest by Tom Bentley, which is part of a separate pull request. Author: tedyu <[email protected]> Reviewers: Tom Bentley <[email protected]>, Ismael Juma <[email protected]> Closes #4052 from tedyu/trunk Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/406071c2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/406071c2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/406071c2 Branch: refs/heads/trunk Commit: 406071c2b5c06b25f9167ffd566325fdb65b268d Parents: ad71b9d Author: tedyu <[email protected]> Authored: Fri Nov 17 15:16:25 2017 +0000 Committer: Ismael Juma <[email protected]> Committed: Fri Nov 17 15:18:48 2017 +0000 ---------------------------------------------------------------------- .../scala/kafka/server/ReplicaManager.scala | 26 ++++++-------------- 1 file changed, 8 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/406071c2/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 2de50d8..886e80e 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -418,6 +418,11 @@ class ReplicaManager(val config: KafkaConfig, def getReplicaOrException(topicPartition: TopicPartition): Replica = getReplicaOrException(topicPartition, localBrokerId) def getLeaderReplicaIfLocal(topicPartition: TopicPartition): Replica = { + val (_, replica) = getPartitionAndLeaderReplicaIfLocal(topicPartition) + replica + } + + def getPartitionAndLeaderReplicaIfLocal(topicPartition: TopicPartition): (Partition, Replica) = { val partitionOpt = getPartition(topicPartition) partitionOpt match { case None => @@ -426,7 +431,7 @@ class ReplicaManager(val config: KafkaConfig, if (partition eq ReplicaManager.OfflinePartition) throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId") else partition.leaderReplicaIfLocal match { - case Some(leaderReplica) => leaderReplica + case Some(leaderReplica) => (partition, leaderReplica) case None => throw new NotLeaderForPartitionException(s"Leader not local for partition $topicPartition on broker $localBrokerId") } @@ -514,23 +519,10 @@ class ReplicaManager(val config: KafkaConfig, (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}")))) } else { try { - val partition = getPartition(topicPartition) match { - case Some(p) => - if (p eq ReplicaManager.OfflinePartition) - throw new KafkaStorageException("Partition %s is in an offline log directory on broker %d".format(topicPartition, localBrokerId)) - p - case None => - throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d".format(topicPartition, localBrokerId)) - } + val (partition, replica) = getPartitionAndLeaderReplicaIfLocal(topicPartition) val convertedOffset = if (requestedOffset == DeleteRecordsRequest.HIGH_WATERMARK) { - partition.leaderReplicaIfLocal match { - case Some(leaderReplica) => - leaderReplica.highWatermark.messageOffset - case None => - throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d" - .format(topicPartition, localBrokerId)) - } + replica.highWatermark.messageOffset } else requestedOffset if (convertedOffset < 0) @@ -539,8 +531,6 @@ class ReplicaManager(val config: KafkaConfig, val lowWatermark = partition.deleteRecordsOnLeader(convertedOffset) (topicPartition, LogDeleteRecordsResult(convertedOffset, lowWatermark)) } catch { - // NOTE: Failed produce requests metric is not incremented for known exceptions - // it is supposed to indicate un-expected failures of a broker in handling a produce request case e@ (_: UnknownTopicOrPartitionException | _: NotLeaderForPartitionException | _: OffsetOutOfRangeException |
