hachikuji commented on a change in pull request #4204:
URL: https://github.com/apache/kafka/pull/4204#discussion_r418652324



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1035,11 +1035,13 @@ class ReplicaManager(val config: KafkaConfig,
       val partitionFetchSize = fetchInfo.maxBytes
       val followerLogStartOffset = fetchInfo.logStartOffset
 
-      brokerTopicStats.topicStats(tp.topic).totalFetchRequestRate.mark()
-      brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
-
       val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
       try {
+        brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
+        if (allPartitions.contains(tp)) {

Review comment:
       Hmm.. Does this solve the issue or just make it less likely? Does 
anything protect a call to `stopReplica` between this check and the metric 
update below?
   
   It does seem this is related to #8586. Since we are updating the metric 
after completion of the `DelayedFetch` currently, then this case actually seems 
likely to be hit in practice. We would just need to have a fetch in purgatory 
when the call to stop replica is received. However, after #8586, I guess it 
becomes less likely? Still it would be nice to think through a complete fix.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to