This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4dae643221a14d19209835eb17a800d681f1b952
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Nov 1 12:57:11 2022 -0700

    [improve] [client] Add api to get producer/consumer stats for partition 
topic (#18212)
    
    * [improve] [client] Add api to get producer/consumer stats for partition 
topic
    
    * introduce partition topic stats interface
---
 .../client/api/SimpleProducerConsumerStatTest.java | 61 +++++++++++++++++
 .../pulsar/client/api/MultiTopicConsumerStats.java | 39 +++++++++++
 .../client/api/PartitionedTopicProducerStats.java  | 40 +++++++++++
 .../apache/pulsar/client/api/ProducerStats.java    |  1 -
 .../impl/MultiTopicConsumerStatsRecorderImpl.java  | 59 ++++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       |  6 +-
 .../client/impl/PartitionedProducerImpl.java       |  9 ++-
 .../PartitionedTopicProducerStatsRecorderImpl.java | 79 ++++++++++++++++++++++
 .../pulsar/client/impl/ProducerStatsDisabled.java  |  1 +
 .../client/impl/ProducerStatsRecorderImpl.java     | 17 +----
 .../client/impl/ProducerStatsRecorderImplTest.java |  4 +-
 11 files changed, 292 insertions(+), 24 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 40d34a7c0b7..1c02f2ea57e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -19,12 +19,14 @@
 package org.apache.pulsar.client.api;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -453,4 +455,63 @@ public class SimpleProducerConsumerStatTest extends 
ProducerConsumerBase {
                 .until(() -> producer.getStats().getPendingQueueSize() == 
numMessages);
         assertEquals(producer.getStats().getPendingQueueSize(), numMessages);
     }
+
+    /**
+     * This test verifies partitioned topic stats for producer and consumer.
+     * @throws Exception
+     */
+    @Test
+    public void testPartitionTopicStats() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        String topicName = 
"persistent://my-property/my-ns/testPartitionTopicStats";
+        int numPartitions = 10;
+        admin.topics().createPartitionedTopic(topicName, numPartitions);
+
+        ConsumerBuilder<byte[]> consumerBuilder = 
pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName("my-subscriber-name");
+
+        Consumer<byte[]> consumer = consumerBuilder.subscribe();
+
+        ProducerBuilder<byte[]> producerBuilder = 
pulsarClient.newProducer().enableBatching(false).topic(topicName);
+
+        Producer<byte[]> producer = producerBuilder.create();
+
+        int numMessages = 20;
+        for (int i = 0; i < numMessages; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        Message<byte[]> msg = null;
+        Set<String> messageSet = new HashSet<>();
+        for (int i = 0; i < numMessages; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.info("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+
+        MultiTopicConsumerStats cStat = (MultiTopicConsumerStats) 
consumer.getStats();
+        PartitionedTopicProducerStats pStat = (PartitionedTopicProducerStats) 
producer.getStats();
+        retryStrategically((test) -> !pStat.getPartitionStats().isEmpty(), 5, 
100);
+        retryStrategically((test) -> !cStat.getPartitionStats().isEmpty(), 5, 
100);
+        Map<String, ProducerStats> prodStatsMap = pStat.getPartitionStats();
+        Map<String, ConsumerStats> consStatsMap = cStat.getPartitionStats();
+        assertFalse(prodStatsMap.isEmpty());
+        assertFalse(consStatsMap.isEmpty());
+        for (int i = 0; i < numPartitions; i++) {
+            String topic = topicName + "-partition-" + i;
+            assertTrue(prodStatsMap.containsKey(topic));
+            assertTrue(consStatsMap.containsKey(topic));
+        }
+
+        consumer.close();
+        producer.close();
+
+        log.info("-- Exiting {} test --", methodName);
+    }
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MultiTopicConsumerStats.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MultiTopicConsumerStats.java
new file mode 100644
index 00000000000..e1c1d3372c3
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MultiTopicConsumerStats.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.Map;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+/**
+ * Multi-topic Consumer statistics recorded by client.
+ *
+ * <p>All the stats are relative to the last recording period. The interval of 
the stats refreshes is configured with
+ * {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} 
with a default of 1 minute.
+ */
[email protected]
[email protected]
+public interface MultiTopicConsumerStats extends ConsumerStats {
+
+    /**
+     * @return stats for each partition if topic is partitioned topic
+     */
+    Map<String, ConsumerStats> getPartitionStats();
+}
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PartitionedTopicProducerStats.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PartitionedTopicProducerStats.java
new file mode 100644
index 00000000000..5fd3c1f34a0
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PartitionedTopicProducerStats.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.Map;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+/**
+ * Partitioned topic Producer statistics recorded by client.
+ *
+ * <p>All the stats are relative to the last recording period. The interval of 
the stats refreshes is configured with
+ * {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} 
with a default of 1 minute.
+ */
[email protected]
[email protected]
+public interface PartitionedTopicProducerStats extends ProducerStats {
+
+    /**
+     * @return stats for each partition if topic is partitioned topic
+     */
+    Map<String, ProducerStats> getPartitionStats();
+
+}
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerStats.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerStats.java
index de028cddab4..30142c70480 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerStats.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerStats.java
@@ -115,5 +115,4 @@ public interface ProducerStats extends Serializable {
      * @return current pending send-message queue size of the producer
      */
     int getPendingQueueSize();
-
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
new file mode 100644
index 00000000000..17018be02be
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerStats;
+import org.apache.pulsar.client.api.MultiTopicConsumerStats;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MultiTopicConsumerStatsRecorderImpl extends 
ConsumerStatsRecorderImpl implements MultiTopicConsumerStats {
+
+    private static final long serialVersionUID = 1L;
+    private Map<String, ConsumerStats> partitionStats = new 
ConcurrentHashMap<>();
+
+    public MultiTopicConsumerStatsRecorderImpl() {
+        super();
+    }
+
+    public MultiTopicConsumerStatsRecorderImpl(Consumer<?> consumer) {
+        super(consumer);
+    }
+
+    public MultiTopicConsumerStatsRecorderImpl(PulsarClientImpl pulsarClient, 
ConsumerConfigurationData<?> conf,
+            Consumer<?> consumer) {
+        super(pulsarClient, conf, consumer);
+    }
+
+    public void updateCumulativeStats(String partition, ConsumerStats stats) {
+        super.updateCumulativeStats(stats);
+        partitionStats.put(partition, stats);
+    }
+
+    @Override
+    public Map<String, ConsumerStats> getPartitionStats() {
+        return partitionStats;
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(MultiTopicConsumerStatsRecorderImpl.class);
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 741716919e4..138e8535bca 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -94,7 +94,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     private volatile Timeout partitionsAutoUpdateTimeout = null;
     TopicsPartitionChangedListener topicsPartitionChangedListener;
     CompletableFuture<Void> partitionsAutoUpdateFuture = null;
-    private final ConsumerStatsRecorder stats;
+    private final MultiTopicConsumerStatsRecorderImpl stats;
     private UnAckedMessageTracker unAckedMessageTracker;
     private final ConsumerConfigurationData<T> internalConfig;
 
@@ -155,7 +155,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
         this.internalConfig = getInternalConsumerConfig();
         this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0
-                ? new ConsumerStatsRecorderImpl(this)
+                ? new MultiTopicConsumerStatsRecorderImpl(this)
                 : null;
 
         // start track and auto subscribe partition increment
@@ -863,7 +863,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         }
         stats.reset();
 
-        consumers.values().stream().forEach(consumer -> 
stats.updateCumulativeStats(consumer.getStats()));
+        consumers.forEach((partition, consumer) -> 
stats.updateCumulativeStats(partition, consumer.getStats()));
         return stats;
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 4a84ba03ebe..355a5395e74 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -62,7 +62,7 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
 
     private final ConcurrentOpenHashMap<Integer, ProducerImpl<T>> producers;
     private final MessageRouter routerPolicy;
-    private final ProducerStatsRecorderImpl stats;
+    private final PartitionedTopicProducerStatsRecorderImpl stats;
     private TopicMetadata topicMetadata;
     private final int firstPartitionIndex;
     private String overrideProducerName;
@@ -80,7 +80,9 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
                 ConcurrentOpenHashMap.<Integer, 
ProducerImpl<T>>newBuilder().build();
         this.topicMetadata = new TopicMetadataImpl(numPartitions);
         this.routerPolicy = getMessageRouter();
-        stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new 
ProducerStatsRecorderImpl() : null;
+        stats = client.getConfiguration().getStatsIntervalSeconds() > 0
+                ? new PartitionedTopicProducerStatsRecorderImpl()
+                : null;
 
         // MaxPendingMessagesAcrossPartitions doesn't support partial 
partition such as SinglePartition correctly
         int maxPendingMessages = Math.min(conf.getMaxPendingMessages(),
@@ -353,7 +355,8 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
             return null;
         }
         stats.reset();
-        producers.values().forEach(p -> 
stats.updateCumulativeStats(p.getStats()));
+        producers.forEach(
+                (partition, producer) -> 
stats.updateCumulativeStats(producer.getTopic(), producer.getStats()));
         return stats;
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java
new file mode 100644
index 00000000000..2f73a6af406
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.DoubleAdder;
+import org.apache.pulsar.client.api.PartitionedTopicProducerStats;
+import org.apache.pulsar.client.api.ProducerStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PartitionedTopicProducerStatsRecorderImpl extends 
ProducerStatsRecorderImpl
+        implements PartitionedTopicProducerStats {
+
+    private static final long serialVersionUID = 1L;
+    private Map<String, ProducerStats> partitionStats = Collections.emptyMap();
+    private final DoubleAdder sendMsgsRateAggregate;
+    private final DoubleAdder sendBytesRateAggregate;
+    private int partitions = 0;
+
+    public PartitionedTopicProducerStatsRecorderImpl() {
+        super();
+        partitionStats = new ConcurrentHashMap<>();
+        sendMsgsRateAggregate = new DoubleAdder();
+        sendBytesRateAggregate = new DoubleAdder();
+    }
+
+    void reset() {
+        super.reset();
+        partitions = 0;
+    }
+
+    void updateCumulativeStats(String partition, ProducerStats stats) {
+        super.updateCumulativeStats(stats);
+        if (stats == null) {
+            return;
+        }
+        partitionStats.put(partition, stats);
+        // update rates
+        sendMsgsRateAggregate.add(stats.getSendMsgsRate());
+        sendBytesRateAggregate.add(stats.getSendBytesRate());
+        partitions++;
+    }
+
+    @Override
+    public double getSendMsgsRate() {
+        return sendMsgsRateAggregate.doubleValue() / partitions;
+    }
+
+    @Override
+    public double getSendBytesRate() {
+        return sendBytesRateAggregate.doubleValue() / partitions;
+    }
+
+    @Override
+    public Map<String, ProducerStats> getPartitionStats() {
+        return partitionStats;
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(PartitionedTopicProducerStatsRecorderImpl.class);
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
index 0962abb1aee..b85fc4cb6b3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsDisabled.java
@@ -132,4 +132,5 @@ public class ProducerStatsDisabled implements 
ProducerStatsRecorder {
     public int getPendingQueueSize() {
         return 0;
     }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index 897193fff32..5a60dafb2b5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -27,7 +27,6 @@ import io.netty.util.TimerTask;
 import java.io.IOException;
 import java.text.DecimalFormat;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.DoubleAdder;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.pulsar.client.api.ProducerStats;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -51,8 +50,6 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
     private final LongAdder totalBytesSent;
     private final LongAdder totalSendFailed;
     private final LongAdder totalAcksReceived;
-    private final DoubleAdder sendMsgsRateAggregate;
-    private final DoubleAdder sendBytesRateAggregate;
     private static final DecimalFormat DEC = new DecimalFormat("0.000");
     private static final DecimalFormat THROUGHPUT_FORMAT = new 
DecimalFormat("0.00");
     private final transient DoublesSketch ds;
@@ -61,7 +58,6 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
 
     private volatile double sendMsgsRate;
     private volatile double sendBytesRate;
-    private int partitions = 0;
     private volatile double[] latencyPctValues = new 
double[PERCENTILES.length];
     private volatile double[] batchSizePctValues = new 
double[PERCENTILES.length];
     private volatile double[] msgSizePctValues = new 
double[PERCENTILES.length];
@@ -77,8 +73,6 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
         totalBytesSent = new LongAdder();
         totalSendFailed = new LongAdder();
         totalAcksReceived = new LongAdder();
-        sendMsgsRateAggregate = new DoubleAdder();
-        sendBytesRateAggregate = new DoubleAdder();
         ds = DoublesSketch.builder().build(256);
         batchSizeDs = DoublesSketch.builder().build(256);
         msgSizeDs = DoublesSketch.builder().build(256);
@@ -97,8 +91,6 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
         totalBytesSent = new LongAdder();
         totalSendFailed = new LongAdder();
         totalAcksReceived = new LongAdder();
-        sendMsgsRateAggregate = new DoubleAdder();
-        sendBytesRateAggregate = new DoubleAdder();
         ds = DoublesSketch.builder().build(256);
         batchSizeDs = DoublesSketch.builder().build(256);
         msgSizeDs = DoublesSketch.builder().build(256);
@@ -247,7 +239,6 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
         totalBytesSent.reset();
         totalSendFailed.reset();
         totalAcksReceived.reset();
-        partitions = 0;
     }
 
     void updateCumulativeStats(ProducerStats stats) {
@@ -262,10 +253,6 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
         totalBytesSent.add(stats.getTotalBytesSent());
         totalSendFailed.add(stats.getTotalSendFailed());
         totalAcksReceived.add(stats.getTotalAcksReceived());
-        // update rates
-        sendMsgsRateAggregate.add(stats.getSendMsgsRate());
-        sendBytesRateAggregate.add(stats.getSendBytesRate());
-        partitions++;
     }
 
     @Override
@@ -306,12 +293,12 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
 
     @Override
     public double getSendMsgsRate() {
-        return partitions != 0 ? sendMsgsRateAggregate.doubleValue() / 
partitions : sendMsgsRate;
+        return sendMsgsRate;
     }
 
     @Override
     public double getSendBytesRate() {
-        return partitions != 0 ? sendBytesRateAggregate.doubleValue() / 
partitions : sendBytesRate;
+        return sendBytesRate;
     }
 
     @Override
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
index 7cac1bf14f3..e7bef63a2cf 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
@@ -80,10 +80,10 @@ public class ProducerStatsRecorderImplTest {
     @Test
     public void testPartitionTopicAggegationStats() {
         ProducerStatsRecorderImpl recorder1 = spy(new 
ProducerStatsRecorderImpl());
-        ProducerStatsRecorderImpl recorder2 = new ProducerStatsRecorderImpl();
+        PartitionedTopicProducerStatsRecorderImpl recorder2 = new 
PartitionedTopicProducerStatsRecorderImpl();
         when(recorder1.getSendMsgsRate()).thenReturn(1000.0);
         when(recorder1.getSendBytesRate()).thenReturn(1000.0);
-        recorder2.updateCumulativeStats(recorder1);
+        recorder2.updateCumulativeStats("test", recorder1);
         assertTrue(recorder2.getSendBytesRate() > 0);
         assertTrue(recorder2.getSendMsgsRate() > 0);
     }

Reply via email to