This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e9af5cb18905bd4dd5f1f2853bbf34a838cc16c6
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Oct 27 05:54:12 2022 -0700

    [improve][client] support aggregate metrics for partition topic stats 
(#18214)
---
 .../pulsar/client/impl/ProducerStatsRecorderImpl.java   | 17 +++++++++++++++--
 .../client/impl/ProducerStatsRecorderImplTest.java      | 13 +++++++++++++
 2 files changed, 28 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index 5a60dafb2b5..897193fff32 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -27,6 +27,7 @@ import io.netty.util.TimerTask;
 import java.io.IOException;
 import java.text.DecimalFormat;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.DoubleAdder;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.pulsar.client.api.ProducerStats;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -50,6 +51,8 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
     private final LongAdder totalBytesSent;
     private final LongAdder totalSendFailed;
     private final LongAdder totalAcksReceived;
+    private final DoubleAdder sendMsgsRateAggregate;
+    private final DoubleAdder sendBytesRateAggregate;
     private static final DecimalFormat DEC = new DecimalFormat("0.000");
     private static final DecimalFormat THROUGHPUT_FORMAT = new 
DecimalFormat("0.00");
     private final transient DoublesSketch ds;
@@ -58,6 +61,7 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
 
     private volatile double sendMsgsRate;
     private volatile double sendBytesRate;
+    private int partitions = 0;
     private volatile double[] latencyPctValues = new 
double[PERCENTILES.length];
     private volatile double[] batchSizePctValues = new 
double[PERCENTILES.length];
     private volatile double[] msgSizePctValues = new 
double[PERCENTILES.length];
@@ -73,6 +77,8 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
         totalBytesSent = new LongAdder();
         totalSendFailed = new LongAdder();
         totalAcksReceived = new LongAdder();
+        sendMsgsRateAggregate = new DoubleAdder();
+        sendBytesRateAggregate = new DoubleAdder();
         ds = DoublesSketch.builder().build(256);
         batchSizeDs = DoublesSketch.builder().build(256);
         msgSizeDs = DoublesSketch.builder().build(256);
@@ -91,6 +97,8 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
         totalBytesSent = new LongAdder();
         totalSendFailed = new LongAdder();
         totalAcksReceived = new LongAdder();
+        sendMsgsRateAggregate = new DoubleAdder();
+        sendBytesRateAggregate = new DoubleAdder();
         ds = DoublesSketch.builder().build(256);
         batchSizeDs = DoublesSketch.builder().build(256);
         msgSizeDs = DoublesSketch.builder().build(256);
@@ -239,6 +247,7 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
         totalBytesSent.reset();
         totalSendFailed.reset();
         totalAcksReceived.reset();
+        partitions = 0;
     }
 
     void updateCumulativeStats(ProducerStats stats) {
@@ -253,6 +262,10 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
         totalBytesSent.add(stats.getTotalBytesSent());
         totalSendFailed.add(stats.getTotalSendFailed());
         totalAcksReceived.add(stats.getTotalAcksReceived());
+        // update rates
+        sendMsgsRateAggregate.add(stats.getSendMsgsRate());
+        sendBytesRateAggregate.add(stats.getSendBytesRate());
+        partitions++;
     }
 
     @Override
@@ -293,12 +306,12 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
 
     @Override
     public double getSendMsgsRate() {
-        return sendMsgsRate;
+        return partitions != 0 ? sendMsgsRateAggregate.doubleValue() / 
partitions : sendMsgsRate;
     }
 
     @Override
     public double getSendBytesRate() {
-        return sendBytesRate;
+        return partitions != 0 ? sendBytesRateAggregate.doubleValue() / 
partitions : sendBytesRate;
     }
 
     @Override
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
index f6e7f284ce6..7cac1bf14f3 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
@@ -26,7 +26,9 @@ import org.testng.annotations.Test;
 
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 
@@ -74,4 +76,15 @@ public class ProducerStatsRecorderImplTest {
         recorder.cancelStatsTimeout();
         assertEquals(1000.0, recorder.getSendLatencyMillisMax(), 0.5);
     }
+
+    @Test
+    public void testPartitionTopicAggegationStats() {
+        ProducerStatsRecorderImpl recorder1 = spy(new 
ProducerStatsRecorderImpl());
+        ProducerStatsRecorderImpl recorder2 = new ProducerStatsRecorderImpl();
+        when(recorder1.getSendMsgsRate()).thenReturn(1000.0);
+        when(recorder1.getSendBytesRate()).thenReturn(1000.0);
+        recorder2.updateCumulativeStats(recorder1);
+        assertTrue(recorder2.getSendBytesRate() > 0);
+        assertTrue(recorder2.getSendMsgsRate() > 0);
+    }
 }

Reply via email to