This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e9af5cb18905bd4dd5f1f2853bbf34a838cc16c6 Author: Rajan Dhabalia <[email protected]> AuthorDate: Thu Oct 27 05:54:12 2022 -0700 [improve][client] support aggregate metrics for partition topic stats (#18214) --- .../pulsar/client/impl/ProducerStatsRecorderImpl.java | 17 +++++++++++++++-- .../client/impl/ProducerStatsRecorderImplTest.java | 13 +++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java index 5a60dafb2b5..897193fff32 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java @@ -27,6 +27,7 @@ import io.netty.util.TimerTask; import java.io.IOException; import java.text.DecimalFormat; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.DoubleAdder; import java.util.concurrent.atomic.LongAdder; import org.apache.pulsar.client.api.ProducerStats; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; @@ -50,6 +51,8 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder { private final LongAdder totalBytesSent; private final LongAdder totalSendFailed; private final LongAdder totalAcksReceived; + private final DoubleAdder sendMsgsRateAggregate; + private final DoubleAdder sendBytesRateAggregate; private static final DecimalFormat DEC = new DecimalFormat("0.000"); private static final DecimalFormat THROUGHPUT_FORMAT = new DecimalFormat("0.00"); private final transient DoublesSketch ds; @@ -58,6 +61,7 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder { private volatile double sendMsgsRate; private volatile double sendBytesRate; + private int partitions = 0; private volatile double[] latencyPctValues = new double[PERCENTILES.length]; private volatile double[] batchSizePctValues = new double[PERCENTILES.length]; private volatile double[] msgSizePctValues = new double[PERCENTILES.length]; @@ -73,6 +77,8 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder { totalBytesSent = new LongAdder(); totalSendFailed = new LongAdder(); totalAcksReceived = new LongAdder(); + sendMsgsRateAggregate = new DoubleAdder(); + sendBytesRateAggregate = new DoubleAdder(); ds = DoublesSketch.builder().build(256); batchSizeDs = DoublesSketch.builder().build(256); msgSizeDs = DoublesSketch.builder().build(256); @@ -91,6 +97,8 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder { totalBytesSent = new LongAdder(); totalSendFailed = new LongAdder(); totalAcksReceived = new LongAdder(); + sendMsgsRateAggregate = new DoubleAdder(); + sendBytesRateAggregate = new DoubleAdder(); ds = DoublesSketch.builder().build(256); batchSizeDs = DoublesSketch.builder().build(256); msgSizeDs = DoublesSketch.builder().build(256); @@ -239,6 +247,7 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder { totalBytesSent.reset(); totalSendFailed.reset(); totalAcksReceived.reset(); + partitions = 0; } void updateCumulativeStats(ProducerStats stats) { @@ -253,6 +262,10 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder { totalBytesSent.add(stats.getTotalBytesSent()); totalSendFailed.add(stats.getTotalSendFailed()); totalAcksReceived.add(stats.getTotalAcksReceived()); + // update rates + sendMsgsRateAggregate.add(stats.getSendMsgsRate()); + sendBytesRateAggregate.add(stats.getSendBytesRate()); + partitions++; } @Override @@ -293,12 +306,12 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder { @Override public double getSendMsgsRate() { - return sendMsgsRate; + return partitions != 0 ? sendMsgsRateAggregate.doubleValue() / partitions : sendMsgsRate; } @Override public double getSendBytesRate() { - return sendBytesRate; + return partitions != 0 ? sendBytesRateAggregate.doubleValue() / partitions : sendBytesRate; } @Override diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java index f6e7f284ce6..7cac1bf14f3 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java @@ -26,7 +26,9 @@ import org.testng.annotations.Test; import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; @@ -74,4 +76,15 @@ public class ProducerStatsRecorderImplTest { recorder.cancelStatsTimeout(); assertEquals(1000.0, recorder.getSendLatencyMillisMax(), 0.5); } + + @Test + public void testPartitionTopicAggegationStats() { + ProducerStatsRecorderImpl recorder1 = spy(new ProducerStatsRecorderImpl()); + ProducerStatsRecorderImpl recorder2 = new ProducerStatsRecorderImpl(); + when(recorder1.getSendMsgsRate()).thenReturn(1000.0); + when(recorder1.getSendBytesRate()).thenReturn(1000.0); + recorder2.updateCumulativeStats(recorder1); + assertTrue(recorder2.getSendBytesRate() > 0); + assertTrue(recorder2.getSendMsgsRate() > 0); + } }
