[ 
https://issues.apache.org/jira/browse/KAFKA-19936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18041199#comment-18041199
 ] 

PoAn Yang commented on KAFKA-19936:
-----------------------------------

This test case cannot pass in trunk branch, because ReplicaManager counts 
duplicated records to metrics.
{noformat}
@Test
def testBrokerTopicStatsOnAppend(): Unit = {
  val localId = 0
  val topicPartition = new TopicPartition("foo", 0)
  val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId)

  try {
    // Create partition and make it leader
    val brokerList = Seq[Integer](localId).asJava
    val delta = createLeaderDelta(topicId, topicPartition, 0, brokerList, 
brokerList)
    val leaderMetadataImage = imageFromTopics(delta.apply())
    replicaManager.applyDelta(delta, leaderMetadataImage)

    // Check the state of that partition
    val HostedPartition.Online(partition) = 
replicaManager.getPartition(topicPartition)
    assertTrue(partition.isLeader)
    assertEquals(Set(localId), partition.inSyncReplicaIds)
    assertEquals(0, partition.getLeaderEpoch)

    // Get initial metric values
    val topicStats = replicaManager.brokerTopicStats.topicStats("foo")
    val allTopicsStats = replicaManager.brokerTopicStats.allTopicsStats
    var initialTopicBytesIn = topicStats.bytesInRate.count()
    var initialAllTopicsBytesIn = allTopicsStats.bytesInRate.count()
    var initialTopicMessagesIn = topicStats.messagesInRate.count()
    var initialAllTopicsMessagesIn = allTopicsStats.messagesInRate.count()

    // Append records
    val producerId = 234L
    val epoch = 5.toShort
    val records = MemoryRecords.withIdempotentRecords(Compression.NONE, 
producerId, epoch, 0,
      new SimpleRecord(s"message 0".getBytes))
    appendRecords(replicaManager, topicPartition, records).onFire { response =>
      assertEquals(Errors.NONE, response.error)
    }

    // Verify broker topic stats were updated correctly
    assertTrue(topicStats.bytesInRate.count() > initialTopicBytesIn)
    assertTrue(allTopicsStats.bytesInRate.count() > initialAllTopicsBytesIn)
    assertTrue(topicStats.messagesInRate.count() > initialTopicMessagesIn)
    assertTrue(allTopicsStats.messagesInRate.count() > 
initialAllTopicsMessagesIn)

    initialTopicBytesIn = topicStats.bytesInRate.count()
    initialAllTopicsBytesIn = allTopicsStats.bytesInRate.count()
    initialTopicMessagesIn = topicStats.messagesInRate.count()
    initialAllTopicsMessagesIn = allTopicsStats.messagesInRate.count()

    // Append duplicated records
    appendRecords(replicaManager, topicPartition, records).onFire { response =>
      assertEquals(Errors.NONE, response.error)
    }

    // Verify broker topic stats skips duplicated records
    assertEquals(initialTopicBytesIn, topicStats.bytesInRate.count())
    assertEquals(initialAllTopicsBytesIn, allTopicsStats.bytesInRate.count())
    assertEquals(initialTopicMessagesIn, topicStats.messagesInRate.count())
    assertEquals(initialAllTopicsMessagesIn, 
allTopicsStats.messagesInRate.count())
  } finally {
    replicaManager.shutdown(checkpointHW = false)
  }
}{noformat}

> ReplicaManager counts duplicated records to BytesInPerSec and 
> MessagesInPerSec metric
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-19936
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19936
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: PoAn Yang
>            Assignee: PoAn Yang
>            Priority: Major
>
> For an idempotent producer, duplicated records are not written to disk; 
> however, they still contribute to the {{BytesInPerSec}} and 
> {{MessagesInPerSec}} metrics.
> 1. If the records are duplicated, UnifiedLog skips these messages.
> [https://github.com/apache/kafka/blob/d27d90ccb3b2b98e02de42afd50910fbbbc162d0/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java#L1221-L1234]
> 2. ReplicaManager counts result from Partition#appendRecordsToLeader to 
> metrics.
> [https://github.com/apache/kafka/blob/d27d90ccb3b2b98e02de42afd50910fbbbc162d0/core/src/main/scala/kafka/server/ReplicaManager.scala#L1429-L1437]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to