This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 8486906 Update Producer stats on producer close() (#12500) 8486906 is described below commit 848690621299353284932e9281e4689813835855 Author: Kai Wang <wangkai744567...@gmail.com> AuthorDate: Wed Oct 27 00:50:47 2021 +0800 Update Producer stats on producer close() (#12500) --- .../client/impl/ProducerStatsRecorderImpl.java | 90 +++++++++++----------- .../client/impl/PartitionedProducerImplTest.java | 33 ++++++++ .../client/impl/ProducerStatsRecorderImplTest.java | 20 +++++ 3 files changed, 100 insertions(+), 43 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java index faf73cb..6b435d6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java @@ -111,49 +111,7 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder { } try { - long now = System.nanoTime(); - double elapsed = (now - oldTime) / 1e9; - oldTime = now; - - long currentNumMsgsSent = numMsgsSent.sumThenReset(); - long currentNumBytesSent = numBytesSent.sumThenReset(); - long currentNumSendFailedMsgs = numSendFailed.sumThenReset(); - long currentNumAcksReceived = numAcksReceived.sumThenReset(); - - totalMsgsSent.add(currentNumMsgsSent); - totalBytesSent.add(currentNumBytesSent); - totalSendFailed.add(currentNumSendFailedMsgs); - totalAcksReceived.add(currentNumAcksReceived); - - synchronized (ds) { - latencyPctValues = ds.getQuantiles(PERCENTILES); - ds.reset(); - } - - sendMsgsRate = currentNumMsgsSent / elapsed; - sendBytesRate = currentNumBytesSent / elapsed; - - if ((currentNumMsgsSent | currentNumSendFailedMsgs | currentNumAcksReceived - | currentNumMsgsSent) != 0) { - - for (int i = 0; i < latencyPctValues.length; i++) { - if (Double.isNaN(latencyPctValues[i])) { - latencyPctValues[i] = 0; - } - } - - log.info("[{}] [{}] Pending messages: {} --- Publish throughput: {} msg/s --- {} Mbit/s --- " - + "Latency: med: {} ms - 95pct: {} ms - 99pct: {} ms - 99.9pct: {} ms - max: {} ms --- " - + "Ack received rate: {} ack/s --- Failed messages: {}", producer.getTopic(), - producer.getProducerName(), producer.getPendingQueueSize(), - THROUGHPUT_FORMAT.format(sendMsgsRate), - THROUGHPUT_FORMAT.format(sendBytesRate / 1024 / 1024 * 8), - DEC.format(latencyPctValues[0]), DEC.format(latencyPctValues[2]), - DEC.format(latencyPctValues[3]), DEC.format(latencyPctValues[4]), - DEC.format(latencyPctValues[5]), - THROUGHPUT_FORMAT.format(currentNumAcksReceived / elapsed), currentNumSendFailedMsgs); - } - + updateStats(); } catch (Exception e) { log.error("[{}] [{}]: {}", producer.getTopic(), producer.getProducerName(), e.getMessage()); } finally { @@ -171,6 +129,51 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder { return statTimeout; } + protected void updateStats() { + long now = System.nanoTime(); + double elapsed = (now - oldTime) / 1e9; + oldTime = now; + + long currentNumMsgsSent = numMsgsSent.sumThenReset(); + long currentNumBytesSent = numBytesSent.sumThenReset(); + long currentNumSendFailedMsgs = numSendFailed.sumThenReset(); + long currentNumAcksReceived = numAcksReceived.sumThenReset(); + + totalMsgsSent.add(currentNumMsgsSent); + totalBytesSent.add(currentNumBytesSent); + totalSendFailed.add(currentNumSendFailedMsgs); + totalAcksReceived.add(currentNumAcksReceived); + + synchronized (ds) { + latencyPctValues = ds.getQuantiles(PERCENTILES); + ds.reset(); + } + + sendMsgsRate = currentNumMsgsSent / elapsed; + sendBytesRate = currentNumBytesSent / elapsed; + + if ((currentNumMsgsSent | currentNumSendFailedMsgs | currentNumAcksReceived + | currentNumMsgsSent) != 0) { + + for (int i = 0; i < latencyPctValues.length; i++) { + if (Double.isNaN(latencyPctValues[i])) { + latencyPctValues[i] = 0; + } + } + + log.info("[{}] [{}] Pending messages: {} --- Publish throughput: {} msg/s --- {} Mbit/s --- " + + "Latency: med: {} ms - 95pct: {} ms - 99pct: {} ms - 99.9pct: {} ms - max: {} ms --- " + + "Ack received rate: {} ack/s --- Failed messages: {}", producer.getTopic(), + producer.getProducerName(), producer.getPendingQueueSize(), + THROUGHPUT_FORMAT.format(sendMsgsRate), + THROUGHPUT_FORMAT.format(sendBytesRate / 1024 / 1024 * 8), + DEC.format(latencyPctValues[0]), DEC.format(latencyPctValues[2]), + DEC.format(latencyPctValues[3]), DEC.format(latencyPctValues[4]), + DEC.format(latencyPctValues[5]), + THROUGHPUT_FORMAT.format(currentNumAcksReceived / elapsed), currentNumSendFailedMsgs); + } + } + @Override public void updateNumMsgsSent(long numMsgs, long totalMsgsSize) { numMsgsSent.add(numMsgs); @@ -297,6 +300,7 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder { } public void cancelStatsTimeout() { + this.updateStats(); if (statTimeout != null) { statTimeout.cancel(); statTimeout = null; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java index 1f9496b..ad2c992 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java @@ -202,4 +202,37 @@ public class PartitionedProducerImplTest { impl.getStats(); } + @Test + public void testGetStatsWithoutArriveUpdateInterval() throws Exception { + String topicName = "test-stats-without-arrive-interval"; + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setServiceUrl("pulsar://localhost:6650"); + conf.setStatsIntervalSeconds(100); + + ThreadFactory threadFactory = + new DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon()); + EventLoopGroup eventLoopGroup = EventLoopUtil + .newEventLoopGroup(conf.getNumIoThreads(), false, threadFactory); + + PulsarClientImpl clientImpl = new PulsarClientImpl(conf, eventLoopGroup); + + ProducerConfigurationData producerConfData = new ProducerConfigurationData(); + producerConfData.setMessageRoutingMode(MessageRoutingMode.CustomPartition); + producerConfData.setCustomMessageRouter(new CustomMessageRouter()); + + assertEquals(Long.parseLong("100"), clientImpl.getConfiguration().getStatsIntervalSeconds()); + + PartitionedProducerImpl<byte[]> impl = new PartitionedProducerImpl<>( + clientImpl, topicName, producerConfData, + 1, null, null, null); + + impl.getProducers().get(0).getStats().incrementSendFailed(); + ProducerStatsRecorderImpl stats = impl.getStats(); + assertEquals(stats.getTotalSendFailed(), 0); + // When close producer, the ProducerStatsRecorder will update stats immediately + impl.close(); + stats = impl.getStats(); + assertEquals(stats.getTotalSendFailed(), 1); + } + } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java index d654158..f6e7f28 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java @@ -54,4 +54,24 @@ public class ProducerStatsRecorderImplTest { Thread.sleep(1200); assertEquals(1000.0, recorder.getSendLatencyMillisMax(), 0.5); } + + @Test + public void testGetStatsAndCancelStatsTimeoutWithoutArriveUpdateInterval() { + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setStatsIntervalSeconds(60); + PulsarClientImpl client = mock(PulsarClientImpl.class); + when(client.getConfiguration()).thenReturn(conf); + Timer timer = new HashedWheelTimer(); + when(client.timer()).thenReturn(timer); + ProducerImpl<?> producer = mock(ProducerImpl.class); + when(producer.getTopic()).thenReturn("topic-test"); + when(producer.getProducerName()).thenReturn("producer-test"); + when(producer.getPendingQueueSize()).thenReturn(1); + ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData(); + ProducerStatsRecorderImpl recorder = new ProducerStatsRecorderImpl(client, producerConfigurationData, producer); + long latencyNs = TimeUnit.SECONDS.toNanos(1); + recorder.incrementNumAcksReceived(latencyNs); + recorder.cancelStatsTimeout(); + assertEquals(1000.0, recorder.getSendLatencyMillisMax(), 0.5); + } }