[ 
https://issues.apache.org/jira/browse/KAFKA-15894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Minoru Tomioka updated KAFKA-15894:
-----------------------------------
    Description: 
{{KafkaApis.updateRecordConversionStats}} may be called multiple times for a 
request, and {{request.messageConversionsTimeNanos}} and 
{{request.temporaryMemoryBytes}} are overwritten in this method.

[https://github.com/apache/kafka/blob/95f41d59b389b6f25000b7bc4ddb948cfdb90448/core/src/main/scala/kafka/server/KafkaApis.scala#L3777-L3779]
{code:java}
private def updateRecordConversionStats(request: RequestChannel.Request,
                                          tp: TopicPartition,
                                          conversionStats: 
RecordValidationStats): Unit = {
...
    request.messageConversionsTimeNanos = conversionStats.conversionTimeNanos
  }
  request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes
}{code}
 

[https://github.com/apache/kafka/blob/95f41d59b389b6f25000b7bc4ddb948cfdb90448/core/src/main/scala/kafka/server/KafkaApis.scala#L706-L708]
{code:java}
def processingStatsCallback(processingStats: FetchResponseStats): Unit = {
  processingStats.forKeyValue { (tp, info) =>
    updateRecordConversionStats(request, tp, info)
  }
}{code}
 

So, in this case, MessageConversionsTimeMs and TemporaryMemoryBytes are not 
recorded correctly.

  was:
{{KafkaApis.updateRecordConversionStats}} may be called multiple times for a 
request, and {{request.messageConversionsTimeNanos}} and 
{{request.temporaryMemoryBytes}} are overwritten in this method.

[https://github.com/apache/kafka/blob/95f41d59b389b6f25000b7bc4ddb948cfdb90448/core/src/main/scala/kafka/server/KafkaApis.scala#L3777-L3779]
{code:java}
private def updateRecordConversionStats(request: RequestChannel.Request,
                                          tp: TopicPartition,
                                          conversionStats: 
RecordValidationStats): Unit = {
...
    request.messageConversionsTimeNanos = conversionStats.conversionTimeNanos
  }
  request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes
}{code}
 

[https://github.com/apache/kafka/blob/95f41d59b389b6f25000b7bc4ddb948cfdb90448/core/src/main/scala/kafka/server/KafkaApis.scala#L706-L708]
{code:java}
def processingStatsCallback(processingStats: FetchResponseStats): Unit = {
  processingStats.forKeyValue { (tp, info) =>
    updateRecordConversionStats(request, tp, info)
  }
}{code}
 

So, in this case, MessageConversionsTimeMs and TemporaryMemoryBytes may not be 
recorded correctly.


> MessageConversionsTimeMs and TemporaryMemoryBytes may not be recorded 
> correctly.
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-15894
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15894
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, metrics
>            Reporter: Minoru Tomioka
>            Assignee: Minoru Tomioka
>            Priority: Major
>
> {{KafkaApis.updateRecordConversionStats}} may be called multiple times for a 
> request, and {{request.messageConversionsTimeNanos}} and 
> {{request.temporaryMemoryBytes}} are overwritten in this method.
> [https://github.com/apache/kafka/blob/95f41d59b389b6f25000b7bc4ddb948cfdb90448/core/src/main/scala/kafka/server/KafkaApis.scala#L3777-L3779]
> {code:java}
> private def updateRecordConversionStats(request: RequestChannel.Request,
>                                           tp: TopicPartition,
>                                           conversionStats: 
> RecordValidationStats): Unit = {
> ...
>     request.messageConversionsTimeNanos = conversionStats.conversionTimeNanos
>   }
>   request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes
> }{code}
>  
> [https://github.com/apache/kafka/blob/95f41d59b389b6f25000b7bc4ddb948cfdb90448/core/src/main/scala/kafka/server/KafkaApis.scala#L706-L708]
> {code:java}
> def processingStatsCallback(processingStats: FetchResponseStats): Unit = {
>   processingStats.forKeyValue { (tp, info) =>
>     updateRecordConversionStats(request, tp, info)
>   }
> }{code}
>  
> So, in this case, MessageConversionsTimeMs and TemporaryMemoryBytes are not 
> recorded correctly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to