This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 79ef292 Convert latency to millisecond in producer stats (#5096) 79ef292 is described below commit 79ef292c943d08ed7133890f9b33232e9669d529 Author: Like <ke.l...@outlook.com> AuthorDate: Fri Sep 6 05:50:32 2019 +0800 Convert latency to millisecond in producer stats (#5096) * Convert latency to millisecond in producer stats * Format header --- .../client/impl/ProducerStatsRecorderImpl.java | 2 +- .../client/impl/ProducerStatsRecorderImplTest.java | 57 ++++++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java index 25cb724..fde2d75 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java @@ -191,7 +191,7 @@ public class ProducerStatsRecorderImpl implements ProducerStatsRecorder { public void incrementNumAcksReceived(long latencyNs) { numAcksReceived.increment(); synchronized (ds) { - ds.update(TimeUnit.NANOSECONDS.toMicros(latencyNs)); + ds.update(TimeUnit.NANOSECONDS.toMillis(latencyNs)); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java new file mode 100644 index 0000000..d654158 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timer; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.testng.annotations.Test; + +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; + +/** + * Unit tests of {@link ProducerStatsRecorderImpl}. + */ +public class ProducerStatsRecorderImplTest { + + @Test + public void testIncrementNumAcksReceived() throws Exception { + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setStatsIntervalSeconds(1); + PulsarClientImpl client = mock(PulsarClientImpl.class); + when(client.getConfiguration()).thenReturn(conf); + Timer timer = new HashedWheelTimer(); + when(client.timer()).thenReturn(timer); + ProducerImpl<?> producer = mock(ProducerImpl.class); + when(producer.getTopic()).thenReturn("topic-test"); + when(producer.getProducerName()).thenReturn("producer-test"); + when(producer.getPendingQueueSize()).thenReturn(1); + ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData(); + ProducerStatsRecorderImpl recorder = new ProducerStatsRecorderImpl(client, producerConfigurationData, producer); + long latencyNs = TimeUnit.SECONDS.toNanos(1); + recorder.incrementNumAcksReceived(latencyNs); + Thread.sleep(1200); + assertEquals(1000.0, recorder.getSendLatencyMillisMax(), 0.5); + } +}