[ https://issues.apache.org/jira/browse/KAFKA-15894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Minoru Tomioka updated KAFKA-15894: ----------------------------------- Description: {{KafkaApis.updateRecordConversionStats}} may be called multiple times for a request, and {{request.messageConversionsTimeNanos}} and {{request.temporaryMemoryBytes}} are overwritten in this method. [https://github.com/apache/kafka/blob/95f41d59b389b6f25000b7bc4ddb948cfdb90448/core/src/main/scala/kafka/server/KafkaApis.scala#L3777-L3779] {code:java} private def updateRecordConversionStats(request: RequestChannel.Request, tp: TopicPartition, conversionStats: RecordValidationStats): Unit = { ... request.messageConversionsTimeNanos = conversionStats.conversionTimeNanos } request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes }{code} [https://github.com/apache/kafka/blob/95f41d59b389b6f25000b7bc4ddb948cfdb90448/core/src/main/scala/kafka/server/KafkaApis.scala#L706-L708] {code:java} def processingStatsCallback(processingStats: FetchResponseStats): Unit = { processingStats.forKeyValue { (tp, info) => updateRecordConversionStats(request, tp, info) } }{code} So, in this case, MessageConversionsTimeMs and TemporaryMemoryBytes are not recorded correctly. was: {{KafkaApis.updateRecordConversionStats}} may be called multiple times for a request, and {{request.messageConversionsTimeNanos}} and {{request.temporaryMemoryBytes}} are overwritten in this method. [https://github.com/apache/kafka/blob/95f41d59b389b6f25000b7bc4ddb948cfdb90448/core/src/main/scala/kafka/server/KafkaApis.scala#L3777-L3779] {code:java} private def updateRecordConversionStats(request: RequestChannel.Request, tp: TopicPartition, conversionStats: RecordValidationStats): Unit = { ... request.messageConversionsTimeNanos = conversionStats.conversionTimeNanos } request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes }{code} [https://github.com/apache/kafka/blob/95f41d59b389b6f25000b7bc4ddb948cfdb90448/core/src/main/scala/kafka/server/KafkaApis.scala#L706-L708] {code:java} def processingStatsCallback(processingStats: FetchResponseStats): Unit = { processingStats.forKeyValue { (tp, info) => updateRecordConversionStats(request, tp, info) } }{code} So, in this case, MessageConversionsTimeMs and TemporaryMemoryBytes may not be recorded correctly. > MessageConversionsTimeMs and TemporaryMemoryBytes may not be recorded > correctly. > -------------------------------------------------------------------------------- > > Key: KAFKA-15894 > URL: https://issues.apache.org/jira/browse/KAFKA-15894 > Project: Kafka > Issue Type: Bug > Components: core, metrics > Reporter: Minoru Tomioka > Assignee: Minoru Tomioka > Priority: Major > > {{KafkaApis.updateRecordConversionStats}} may be called multiple times for a > request, and {{request.messageConversionsTimeNanos}} and > {{request.temporaryMemoryBytes}} are overwritten in this method. > [https://github.com/apache/kafka/blob/95f41d59b389b6f25000b7bc4ddb948cfdb90448/core/src/main/scala/kafka/server/KafkaApis.scala#L3777-L3779] > {code:java} > private def updateRecordConversionStats(request: RequestChannel.Request, > tp: TopicPartition, > conversionStats: > RecordValidationStats): Unit = { > ... > request.messageConversionsTimeNanos = conversionStats.conversionTimeNanos > } > request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes > }{code} > > [https://github.com/apache/kafka/blob/95f41d59b389b6f25000b7bc4ddb948cfdb90448/core/src/main/scala/kafka/server/KafkaApis.scala#L706-L708] > {code:java} > def processingStatsCallback(processingStats: FetchResponseStats): Unit = { > processingStats.forKeyValue { (tp, info) => > updateRecordConversionStats(request, tp, info) > } > }{code} > > So, in this case, MessageConversionsTimeMs and TemporaryMemoryBytes are not > recorded correctly. -- This message was sent by Atlassian Jira (v8.20.10#820010)