This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8486906  Update Producer stats on producer close() (#12500)
8486906 is described below

commit 848690621299353284932e9281e4689813835855
Author: Kai Wang <wangkai744567...@gmail.com>
AuthorDate: Wed Oct 27 00:50:47 2021 +0800

    Update Producer stats on producer close() (#12500)
---
 .../client/impl/ProducerStatsRecorderImpl.java     | 90 +++++++++++-----------
 .../client/impl/PartitionedProducerImplTest.java   | 33 ++++++++
 .../client/impl/ProducerStatsRecorderImplTest.java | 20 +++++
 3 files changed, 100 insertions(+), 43 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index faf73cb..6b435d6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -111,49 +111,7 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
             }
 
             try {
-                long now = System.nanoTime();
-                double elapsed = (now - oldTime) / 1e9;
-                oldTime = now;
-
-                long currentNumMsgsSent = numMsgsSent.sumThenReset();
-                long currentNumBytesSent = numBytesSent.sumThenReset();
-                long currentNumSendFailedMsgs = numSendFailed.sumThenReset();
-                long currentNumAcksReceived = numAcksReceived.sumThenReset();
-
-                totalMsgsSent.add(currentNumMsgsSent);
-                totalBytesSent.add(currentNumBytesSent);
-                totalSendFailed.add(currentNumSendFailedMsgs);
-                totalAcksReceived.add(currentNumAcksReceived);
-
-                synchronized (ds) {
-                    latencyPctValues = ds.getQuantiles(PERCENTILES);
-                    ds.reset();
-                }
-
-                sendMsgsRate = currentNumMsgsSent / elapsed;
-                sendBytesRate = currentNumBytesSent / elapsed;
-
-                if ((currentNumMsgsSent | currentNumSendFailedMsgs | 
currentNumAcksReceived
-                        | currentNumMsgsSent) != 0) {
-
-                    for (int i = 0; i < latencyPctValues.length; i++) {
-                        if (Double.isNaN(latencyPctValues[i])) {
-                            latencyPctValues[i] = 0;
-                        }
-                    }
-
-                    log.info("[{}] [{}] Pending messages: {} --- Publish 
throughput: {} msg/s --- {} Mbit/s --- "
-                            + "Latency: med: {} ms - 95pct: {} ms - 99pct: {} 
ms - 99.9pct: {} ms - max: {} ms --- "
-                            + "Ack received rate: {} ack/s --- Failed 
messages: {}", producer.getTopic(),
-                            producer.getProducerName(), 
producer.getPendingQueueSize(),
-                            THROUGHPUT_FORMAT.format(sendMsgsRate),
-                            THROUGHPUT_FORMAT.format(sendBytesRate / 1024 / 
1024 * 8),
-                            DEC.format(latencyPctValues[0]), 
DEC.format(latencyPctValues[2]),
-                            DEC.format(latencyPctValues[3]), 
DEC.format(latencyPctValues[4]),
-                            DEC.format(latencyPctValues[5]),
-                            THROUGHPUT_FORMAT.format(currentNumAcksReceived / 
elapsed), currentNumSendFailedMsgs);
-                }
-
+                updateStats();
             } catch (Exception e) {
                 log.error("[{}] [{}]: {}", producer.getTopic(), 
producer.getProducerName(), e.getMessage());
             } finally {
@@ -171,6 +129,51 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
         return statTimeout;
     }
 
+    protected void updateStats() {
+        long now = System.nanoTime();
+        double elapsed = (now - oldTime) / 1e9;
+        oldTime = now;
+
+        long currentNumMsgsSent = numMsgsSent.sumThenReset();
+        long currentNumBytesSent = numBytesSent.sumThenReset();
+        long currentNumSendFailedMsgs = numSendFailed.sumThenReset();
+        long currentNumAcksReceived = numAcksReceived.sumThenReset();
+
+        totalMsgsSent.add(currentNumMsgsSent);
+        totalBytesSent.add(currentNumBytesSent);
+        totalSendFailed.add(currentNumSendFailedMsgs);
+        totalAcksReceived.add(currentNumAcksReceived);
+
+        synchronized (ds) {
+            latencyPctValues = ds.getQuantiles(PERCENTILES);
+            ds.reset();
+        }
+
+        sendMsgsRate = currentNumMsgsSent / elapsed;
+        sendBytesRate = currentNumBytesSent / elapsed;
+
+        if ((currentNumMsgsSent | currentNumSendFailedMsgs | 
currentNumAcksReceived
+                | currentNumMsgsSent) != 0) {
+
+            for (int i = 0; i < latencyPctValues.length; i++) {
+                if (Double.isNaN(latencyPctValues[i])) {
+                    latencyPctValues[i] = 0;
+                }
+            }
+
+            log.info("[{}] [{}] Pending messages: {} --- Publish throughput: 
{} msg/s --- {} Mbit/s --- "
+                            + "Latency: med: {} ms - 95pct: {} ms - 99pct: {} 
ms - 99.9pct: {} ms - max: {} ms --- "
+                            + "Ack received rate: {} ack/s --- Failed 
messages: {}", producer.getTopic(),
+                    producer.getProducerName(), producer.getPendingQueueSize(),
+                    THROUGHPUT_FORMAT.format(sendMsgsRate),
+                    THROUGHPUT_FORMAT.format(sendBytesRate / 1024 / 1024 * 8),
+                    DEC.format(latencyPctValues[0]), 
DEC.format(latencyPctValues[2]),
+                    DEC.format(latencyPctValues[3]), 
DEC.format(latencyPctValues[4]),
+                    DEC.format(latencyPctValues[5]),
+                    THROUGHPUT_FORMAT.format(currentNumAcksReceived / 
elapsed), currentNumSendFailedMsgs);
+        }
+    }
+
     @Override
     public void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
         numMsgsSent.add(numMsgs);
@@ -297,6 +300,7 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
     }
 
     public void cancelStatsTimeout() {
+        this.updateStats();
         if (statTimeout != null) {
             statTimeout.cancel();
             statTimeout = null;
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
index 1f9496b..ad2c992 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
@@ -202,4 +202,37 @@ public class PartitionedProducerImplTest {
         impl.getStats();
     }
 
+    @Test
+    public void testGetStatsWithoutArriveUpdateInterval() throws Exception {
+        String topicName = "test-stats-without-arrive-interval";
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setServiceUrl("pulsar://localhost:6650");
+        conf.setStatsIntervalSeconds(100);
+
+        ThreadFactory threadFactory =
+                new DefaultThreadFactory("client-test-stats", 
Thread.currentThread().isDaemon());
+        EventLoopGroup eventLoopGroup = EventLoopUtil
+                .newEventLoopGroup(conf.getNumIoThreads(), false, 
threadFactory);
+
+        PulsarClientImpl clientImpl = new PulsarClientImpl(conf, 
eventLoopGroup);
+
+        ProducerConfigurationData producerConfData = new 
ProducerConfigurationData();
+        
producerConfData.setMessageRoutingMode(MessageRoutingMode.CustomPartition);
+        producerConfData.setCustomMessageRouter(new CustomMessageRouter());
+
+        assertEquals(Long.parseLong("100"), 
clientImpl.getConfiguration().getStatsIntervalSeconds());
+
+        PartitionedProducerImpl<byte[]> impl = new PartitionedProducerImpl<>(
+                clientImpl, topicName, producerConfData,
+                1, null, null, null);
+
+        impl.getProducers().get(0).getStats().incrementSendFailed();
+        ProducerStatsRecorderImpl stats = impl.getStats();
+        assertEquals(stats.getTotalSendFailed(), 0);
+        // When close producer, the ProducerStatsRecorder will update stats 
immediately
+        impl.close();
+        stats = impl.getStats();
+        assertEquals(stats.getTotalSendFailed(), 1);
+    }
+
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
index d654158..f6e7f28 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
@@ -54,4 +54,24 @@ public class ProducerStatsRecorderImplTest {
         Thread.sleep(1200);
         assertEquals(1000.0, recorder.getSendLatencyMillisMax(), 0.5);
     }
+
+    @Test
+    public void testGetStatsAndCancelStatsTimeoutWithoutArriveUpdateInterval() 
{
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setStatsIntervalSeconds(60);
+        PulsarClientImpl client = mock(PulsarClientImpl.class);
+        when(client.getConfiguration()).thenReturn(conf);
+        Timer timer = new HashedWheelTimer();
+        when(client.timer()).thenReturn(timer);
+        ProducerImpl<?> producer = mock(ProducerImpl.class);
+        when(producer.getTopic()).thenReturn("topic-test");
+        when(producer.getProducerName()).thenReturn("producer-test");
+        when(producer.getPendingQueueSize()).thenReturn(1);
+        ProducerConfigurationData producerConfigurationData = new 
ProducerConfigurationData();
+        ProducerStatsRecorderImpl recorder = new 
ProducerStatsRecorderImpl(client, producerConfigurationData, producer);
+        long latencyNs = TimeUnit.SECONDS.toNanos(1);
+        recorder.incrementNumAcksReceived(latencyNs);
+        recorder.cancelStatsTimeout();
+        assertEquals(1000.0, recorder.getSendLatencyMillisMax(), 0.5);
+    }
 }

Reply via email to