heesung-sn commented on code in PR #22518: URL: https://github.com/apache/pulsar/pull/22518#discussion_r1569087887
########## pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java: ########## @@ -1899,4 +1900,106 @@ public void testUpdatePropertiesOnNonDurableSub() throws Exception { assertEquals(cursor.getCursorProperties().size(), 1); assertEquals(cursor.getCursorProperties().get("foo"), "bar"); } + + @Test + public void testPrecomputeProducerStatsInTopicStatsForPersistentTopic() throws Exception { + final int numberOfMessages = 10; + final String testLocalTopicName = "testPrecomputeProducerStatsInTopicStatsForPersistentTopic"; + final String topicName = "persistent://" + testTenant + "/" + testNamespace + "/" + testLocalTopicName; + admin.topics().createNonPartitionedTopic(topicName); + + // 1) Use producer stats to compute msgRateIn and msgThroughputIn for persistentTopic + pulsar.getConfiguration().setPrecomputeProducerStatsInTopicStats(false); + + // produce numberOfMessages message to pulsar + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName).create(); + for (int i = 0; i < numberOfMessages; i++) { + log.info("Produce messages: " + producer.send(new byte[10]).toString()); + } + producer.close(); + + // trigger update rates + pulsar.getBrokerService().updateRates(); + + AsyncResponse response = mock(AsyncResponse.class); + persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false, false, false); + ArgumentCaptor<TopicStats> statCaptor = ArgumentCaptor.forClass(TopicStats.class); + verify(response, timeout(5000).times(1)).resume(statCaptor.capture()); + TopicStats topicStats = statCaptor.getValue(); + assertEquals(topicStats.getMsgRateIn(), 0); + assertEquals(topicStats.getMsgThroughputIn(), 0); + Review Comment: Plz cover other publisher stats: bytesInCounter, msgInCounter, and averageMsgSize ########## pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java: ########## @@ -1899,4 +1900,106 @@ public void testUpdatePropertiesOnNonDurableSub() throws Exception { assertEquals(cursor.getCursorProperties().size(), 1); assertEquals(cursor.getCursorProperties().get("foo"), "bar"); } + + @Test + public void testPrecomputeProducerStatsInTopicStatsForPersistentTopic() throws Exception { + final int numberOfMessages = 10; + final String testLocalTopicName = "testPrecomputeProducerStatsInTopicStatsForPersistentTopic"; + final String topicName = "persistent://" + testTenant + "/" + testNamespace + "/" + testLocalTopicName; + admin.topics().createNonPartitionedTopic(topicName); + + // 1) Use producer stats to compute msgRateIn and msgThroughputIn for persistentTopic + pulsar.getConfiguration().setPrecomputeProducerStatsInTopicStats(false); + + // produce numberOfMessages message to pulsar + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName).create(); + for (int i = 0; i < numberOfMessages; i++) { + log.info("Produce messages: " + producer.send(new byte[10]).toString()); + } + producer.close(); + + // trigger update rates + pulsar.getBrokerService().updateRates(); + + AsyncResponse response = mock(AsyncResponse.class); + persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false, false, false); + ArgumentCaptor<TopicStats> statCaptor = ArgumentCaptor.forClass(TopicStats.class); + verify(response, timeout(5000).times(1)).resume(statCaptor.capture()); + TopicStats topicStats = statCaptor.getValue(); + assertEquals(topicStats.getMsgRateIn(), 0); + assertEquals(topicStats.getMsgThroughputIn(), 0); + + + // 2) Use topic stats to compute msgRateIn and msgThroughputIn for persistentTopic + pulsar.getConfiguration().setPrecomputeProducerStatsInTopicStats(true); + + // produce numberOfMessages message to pulsar + producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName).create(); + for (int i = 0; i < numberOfMessages; i++) { + log.info("Produce messages: " + producer.send(new byte[10]).toString()); + } + producer.close(); + + // trigger update rates + pulsar.getBrokerService().updateRates(); + + response = mock(AsyncResponse.class); + persistentTopics.getStats(response, testTenant, testNamespace, testLocalTopicName, true, true, false, false, false, false); + statCaptor = ArgumentCaptor.forClass(TopicStats.class); + verify(response, timeout(5000).times(1)).resume(statCaptor.capture()); + topicStats = statCaptor.getValue(); + Assert.assertTrue(topicStats.getMsgRateIn() > 0); + Assert.assertTrue(topicStats.getMsgThroughputIn() > 0); + } Review Comment: can you also confirm if `/metrics` API from BrokerStatsBase correctly returns? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org