hachikuji commented on a change in pull request #4204: URL: https://github.com/apache/kafka/pull/4204#discussion_r418652324
########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1035,11 +1035,13 @@ class ReplicaManager(val config: KafkaConfig, val partitionFetchSize = fetchInfo.maxBytes val followerLogStartOffset = fetchInfo.logStartOffset - brokerTopicStats.topicStats(tp.topic).totalFetchRequestRate.mark() - brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark() - val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes) try { + brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark() + if (allPartitions.contains(tp)) { Review comment: Hmm.. Does this solve the issue or just make it less likely? Does anything protect a call to `stopReplica` between this check and the metric update below? It does seem this is related to #8586. Since we are updating the metric after completion of the `DelayedFetch` currently, then this case actually seems likely to be hit in practice. We would just need to have a fetch in purgatory when the call to stop replica is received. However, after #8586, I guess it becomes less likely? Still it would be nice to think through a complete fix. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org