This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 642461c Expose the receiver queue size to client consumer stats
(#8663)
642461c is described below
commit 642461cf658c1f0ed7eed350a990fcaa39182703
Author: feynmanlin <[email protected]>
AuthorDate: Sun Nov 22 09:59:54 2020 +0800
Expose the receiver queue size to client consumer stats (#8663)
Fixes #8650
### Motivation
Currently, we log the receiver queue size. But we don't expose the receiver
queue size to the client consumer stats. We should expose it.
### Modifications
add API for `ConsumerStats.java`
---
.../client/api/SimpleProducerConsumerTest.java | 56 ++++++++++++++++++++--
.../apache/pulsar/client/api/ConsumerStats.java | 14 ++++++
.../pulsar/client/impl/ConsumerStatsDisabled.java | 11 +++++
.../client/impl/ConsumerStatsRecorderImpl.java | 55 +++++++++++++++++----
.../client/impl/MultiTopicsConsumerImpl.java | 2 +-
5 files changed, 125 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index b4d32d5..55dab8b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -36,7 +36,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.reflect.Reflection;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -72,11 +71,9 @@ import java.util.stream.Collectors;
import lombok.Cleanup;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.ReflectionUtils;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ClientCnx;
@@ -84,7 +81,6 @@ import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
-import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
@@ -100,6 +96,7 @@ import
org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -3647,4 +3644,55 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
Assert.assertTrue(producer.getLastDisconnectedTimestamp() > 0);
Assert.assertTrue(consumer.getLastDisconnectedTimestamp() > 0);
}
+
+ @Test
+ public void testGetStats() throws Exception {
+ final String topicName = "persistent://my-property/my-ns/testGetStats"
+ UUID.randomUUID();
+ final String subName = "my-sub";
+ final int receiveQueueSize = 100;
+ PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false).topic(topicName).create();
+ ConsumerImpl<String> consumer = (ConsumerImpl<String>)
client.newConsumer(Schema.STRING)
+
.topic(topicName).receiverQueueSize(receiveQueueSize).subscriptionName(subName).subscribe();
+ Assert.assertNull(consumer.getStats().getMsgNumInSubReceiverQueue());
+
Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(),
0);
+
+ for (int i = 0; i < receiveQueueSize; i++) {
+ producer.sendAsync("msg" + i);
+ }
+ //Give some time to consume
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() ->
Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(),
receiveQueueSize));
+ consumer.close();
+ producer.close();
+ }
+
+ @Test
+ public void testGetStatsForPartitionedTopic() throws Exception {
+ final String topicName =
"persistent://my-property/my-ns/testGetStatsForPartitionedTopic";
+ final String subName = "my-sub";
+ final int receiveQueueSize = 100;
+
+ admin.topics().createPartitionedTopic(topicName, 3);
+ PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false).topic(topicName).create();
+ MultiTopicsConsumerImpl<String> consumer =
(MultiTopicsConsumerImpl<String>) client.newConsumer(Schema.STRING)
+
.topic(topicName).receiverQueueSize(receiveQueueSize).subscriptionName(subName).subscribe();
+
Assert.assertEquals(consumer.getStats().getMsgNumInSubReceiverQueue().size(),
3);
+
Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(),
0);
+
+ consumer.getStats().getMsgNumInSubReceiverQueue()
+ .forEach((key, value) -> Assert.assertEquals((int) value, 0));
+
+ for (int i = 0; i < receiveQueueSize; i++) {
+ producer.sendAsync("msg" + i);
+ }
+ //Give some time to consume
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() ->
Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(),
receiveQueueSize));
+ consumer.close();
+ producer.close();
+ }
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
index c34c348..04b2ee1 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.client.api;
import java.io.Serializable;
+import java.util.Map;
+
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
@@ -101,4 +103,16 @@ public interface ConsumerStats extends Serializable {
* @return Total number of message acknowledgments failures on this
consumer
*/
long getTotalAcksFailed();
+
+ /**
+ * Get the size of receiver queue.
+ * @return
+ */
+ Integer getMsgNumInReceiverQueue();
+
+ /**
+ * Get the receiver queue size of sub-consumers.
+ * @return
+ */
+ Map<Long, Integer> getMsgNumInSubReceiverQueue();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
index 8ced7a6..7304548 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
+import java.util.Map;
import java.util.Optional;
import org.apache.pulsar.client.api.ConsumerStats;
@@ -116,6 +117,16 @@ public class ConsumerStatsDisabled implements
ConsumerStatsRecorder {
}
@Override
+ public Integer getMsgNumInReceiverQueue() {
+ return null;
+ }
+
+ @Override
+ public Map<Long, Integer> getMsgNumInSubReceiverQueue() {
+ return null;
+ }
+
+ @Override
public double getRateMsgsReceived() {
return 0;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
index c1a2559..c9b2c49 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
@@ -20,10 +20,14 @@ package org.apache.pulsar.client.impl;
import java.io.IOException;
import java.text.DecimalFormat;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -42,7 +46,7 @@ public class ConsumerStatsRecorderImpl implements
ConsumerStatsRecorder {
private static final long serialVersionUID = 1L;
private TimerTask stat;
private Timeout statTimeout;
- private ConsumerImpl<?> consumer;
+ private Consumer<?> consumer;
private PulsarClientImpl pulsarClient;
private long oldTime;
private long statsIntervalSeconds;
@@ -65,6 +69,11 @@ public class ConsumerStatsRecorderImpl implements
ConsumerStatsRecorder {
private static final DecimalFormat THROUGHPUT_FORMAT = new
DecimalFormat("0.00");
public ConsumerStatsRecorderImpl() {
+ this(null);
+ }
+
+ public ConsumerStatsRecorderImpl(Consumer<?> consumer) {
+ this.consumer = consumer;
numMsgsReceived = new LongAdder();
numBytesReceived = new LongAdder();
numReceiveFailed = new LongAdder();
@@ -80,7 +89,7 @@ public class ConsumerStatsRecorderImpl implements
ConsumerStatsRecorder {
}
public ConsumerStatsRecorderImpl(PulsarClientImpl pulsarClient,
ConsumerConfigurationData<?> conf,
- ConsumerImpl<?> consumer) {
+ Consumer<?> consumer) {
this.pulsarClient = pulsarClient;
this.consumer = consumer;
this.statsIntervalSeconds =
pulsarClient.getConfiguration().getStatsIntervalSeconds();
@@ -112,9 +121,10 @@ public class ConsumerStatsRecorderImpl implements
ConsumerStatsRecorder {
}
stat = (timeout) -> {
- if (timeout.isCancelled()) {
+ if (timeout.isCancelled() || !(consumer instanceof ConsumerImpl)) {
return;
}
+ ConsumerImpl<?> consumerImpl = (ConsumerImpl<?>) consumer;
try {
long now = System.nanoTime();
double elapsed = (now - oldTime) / 1e9;
@@ -135,7 +145,6 @@ public class ConsumerStatsRecorderImpl implements
ConsumerStatsRecorder {
receivedMsgsRate = currentNumMsgsReceived / elapsed;
receivedBytesRate = currentNumBytesReceived / elapsed;
-
if ((currentNumMsgsReceived | currentNumBytesReceived |
currentNumReceiveFailed | currentNumAcksSent
| currentNumAcksFailed) != 0) {
log.info(
@@ -143,15 +152,15 @@ public class ConsumerStatsRecorderImpl implements
ConsumerStatsRecorder {
+ "Consume throughput received: {} msgs/s
--- {} Mbit/s --- "
+ "Ack sent rate: {} ack/s --- " + "Failed
messages: {} --- batch messages: {} ---"
+ "Failed acks: {}",
- consumer.getTopic(), consumer.getSubscription(),
consumer.consumerName,
- consumer.incomingMessages.size(),
THROUGHPUT_FORMAT.format(receivedMsgsRate),
+ consumerImpl.getTopic(),
consumerImpl.getSubscription(), consumerImpl.consumerName,
+ consumerImpl.incomingMessages.size(),
THROUGHPUT_FORMAT.format(receivedMsgsRate),
THROUGHPUT_FORMAT.format(receivedBytesRate * 8 /
1024 / 1024),
THROUGHPUT_FORMAT.format(currentNumAcksSent /
elapsed), currentNumReceiveFailed,
currentNumBatchReceiveFailed,
currentNumAcksFailed);
}
} catch (Exception e) {
- log.error("[{}] [{}] [{}]: {}", consumer.getTopic(),
consumer.subscription, consumer.consumerName,
- e.getMessage());
+ log.error("[{}] [{}] [{}]: {}", consumerImpl.getTopic(),
consumerImpl.subscription
+ , consumerImpl.consumerName, e.getMessage());
} finally {
// schedule the next stat info
statTimeout = pulsarClient.timer().newTimeout(stat,
statsIntervalSeconds, TimeUnit.SECONDS);
@@ -230,22 +239,47 @@ public class ConsumerStatsRecorderImpl implements
ConsumerStatsRecorder {
totalAcksFailed.add(stats.getTotalAcksFailed());
}
+ @Override
+ public Integer getMsgNumInReceiverQueue() {
+ if (consumer instanceof ConsumerBase) {
+ return ((ConsumerBase<?>) consumer).incomingMessages.size();
+ }
+ return null;
+ }
+
+ @Override
+ public Map<Long, Integer> getMsgNumInSubReceiverQueue() {
+ if (consumer instanceof MultiTopicsConsumerImpl) {
+ List<ConsumerImpl<?>> consumerList = ((MultiTopicsConsumerImpl)
consumer).getConsumers();
+ return consumerList.stream().collect(
+ Collectors.toMap((consumerImpl) -> consumerImpl.consumerId
+ , (consumerImpl) ->
consumerImpl.incomingMessages.size())
+ );
+ }
+ return null;
+ }
+
+ @Override
public long getNumMsgsReceived() {
return numMsgsReceived.longValue();
}
+ @Override
public long getNumBytesReceived() {
return numBytesReceived.longValue();
}
+ @Override
public long getNumAcksSent() {
return numAcksSent.longValue();
}
+ @Override
public long getNumAcksFailed() {
return numAcksFailed.longValue();
}
+ @Override
public long getNumReceiveFailed() {
return numReceiveFailed.longValue();
}
@@ -255,14 +289,17 @@ public class ConsumerStatsRecorderImpl implements
ConsumerStatsRecorder {
return numBatchReceiveFailed.longValue();
}
+ @Override
public long getTotalMsgsReceived() {
return totalMsgsReceived.longValue();
}
+ @Override
public long getTotalBytesReceived() {
return totalBytesReceived.longValue();
}
+ @Override
public long getTotalReceivedFailed() {
return totalReceiveFailed.longValue();
}
@@ -272,10 +309,12 @@ public class ConsumerStatsRecorderImpl implements
ConsumerStatsRecorder {
return totalBatchReceiveFailed.longValue();
}
+ @Override
public long getTotalAcksSent() {
return totalAcksSent.longValue();
}
+ @Override
public long getTotalAcksFailed() {
return totalAcksFailed.longValue();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 26e82dd..49e31f4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -157,7 +157,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
this.internalConfig = getInternalConsumerConfig();
- this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ?
new ConsumerStatsRecorderImpl() : null;
+ this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ?
new ConsumerStatsRecorderImpl(this) : null;
// start track and auto subscribe partition increasement
if (conf.isAutoUpdatePartitions()) {