heesung-sn commented on code in PR #22518:
URL: https://github.com/apache/pulsar/pull/22518#discussion_r1569087887


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java:
##########
@@ -1899,4 +1900,106 @@ public void testUpdatePropertiesOnNonDurableSub() 
throws Exception {
         assertEquals(cursor.getCursorProperties().size(), 1);
         assertEquals(cursor.getCursorProperties().get("foo"), "bar");
     }
+
+    @Test
+    public void testPrecomputeProducerStatsInTopicStatsForPersistentTopic() 
throws Exception {
+        final int numberOfMessages = 10;
+        final String testLocalTopicName = 
"testPrecomputeProducerStatsInTopicStatsForPersistentTopic";
+        final String topicName = "persistent://" + testTenant + "/" + 
testNamespace + "/" + testLocalTopicName;
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        // 1) Use producer stats to compute msgRateIn and msgThroughputIn for 
persistentTopic
+        
pulsar.getConfiguration().setPrecomputeProducerStatsInTopicStats(false);
+
+        // produce numberOfMessages message to pulsar
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName).create();
+        for (int i = 0; i < numberOfMessages; i++) {
+            log.info("Produce messages: " + producer.send(new 
byte[10]).toString());
+        }
+        producer.close();
+
+        // trigger update rates
+        pulsar.getBrokerService().updateRates();
+
+        AsyncResponse response = mock(AsyncResponse.class);
+        persistentTopics.getStats(response, testTenant, testNamespace, 
testLocalTopicName, true, true, false, false, false, false);
+        ArgumentCaptor<TopicStats> statCaptor = 
ArgumentCaptor.forClass(TopicStats.class);
+        verify(response, timeout(5000).times(1)).resume(statCaptor.capture());
+        TopicStats topicStats = statCaptor.getValue();
+        assertEquals(topicStats.getMsgRateIn(), 0);
+        assertEquals(topicStats.getMsgThroughputIn(), 0);
+

Review Comment:
   Plz cover other publisher stats: bytesInCounter, msgInCounter, and 
averageMsgSize



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java:
##########
@@ -1899,4 +1900,106 @@ public void testUpdatePropertiesOnNonDurableSub() 
throws Exception {
         assertEquals(cursor.getCursorProperties().size(), 1);
         assertEquals(cursor.getCursorProperties().get("foo"), "bar");
     }
+
+    @Test
+    public void testPrecomputeProducerStatsInTopicStatsForPersistentTopic() 
throws Exception {
+        final int numberOfMessages = 10;
+        final String testLocalTopicName = 
"testPrecomputeProducerStatsInTopicStatsForPersistentTopic";
+        final String topicName = "persistent://" + testTenant + "/" + 
testNamespace + "/" + testLocalTopicName;
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        // 1) Use producer stats to compute msgRateIn and msgThroughputIn for 
persistentTopic
+        
pulsar.getConfiguration().setPrecomputeProducerStatsInTopicStats(false);
+
+        // produce numberOfMessages message to pulsar
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName).create();
+        for (int i = 0; i < numberOfMessages; i++) {
+            log.info("Produce messages: " + producer.send(new 
byte[10]).toString());
+        }
+        producer.close();
+
+        // trigger update rates
+        pulsar.getBrokerService().updateRates();
+
+        AsyncResponse response = mock(AsyncResponse.class);
+        persistentTopics.getStats(response, testTenant, testNamespace, 
testLocalTopicName, true, true, false, false, false, false);
+        ArgumentCaptor<TopicStats> statCaptor = 
ArgumentCaptor.forClass(TopicStats.class);
+        verify(response, timeout(5000).times(1)).resume(statCaptor.capture());
+        TopicStats topicStats = statCaptor.getValue();
+        assertEquals(topicStats.getMsgRateIn(), 0);
+        assertEquals(topicStats.getMsgThroughputIn(), 0);
+
+
+        // 2) Use topic stats to compute msgRateIn and msgThroughputIn for 
persistentTopic
+        pulsar.getConfiguration().setPrecomputeProducerStatsInTopicStats(true);
+
+        // produce numberOfMessages message to pulsar
+        producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer().topic(topicName).create();
+        for (int i = 0; i < numberOfMessages; i++) {
+            log.info("Produce messages: " + producer.send(new 
byte[10]).toString());
+        }
+        producer.close();
+
+        // trigger update rates
+        pulsar.getBrokerService().updateRates();
+
+        response = mock(AsyncResponse.class);
+        persistentTopics.getStats(response, testTenant, testNamespace, 
testLocalTopicName, true, true, false, false, false, false);
+        statCaptor = ArgumentCaptor.forClass(TopicStats.class);
+        verify(response, timeout(5000).times(1)).resume(statCaptor.capture());
+        topicStats = statCaptor.getValue();
+        Assert.assertTrue(topicStats.getMsgRateIn() > 0);
+        Assert.assertTrue(topicStats.getMsgThroughputIn() > 0);
+    }

Review Comment:
   can you also confirm if `/metrics` API from BrokerStatsBase correctly 
returns?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to