This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c724f02e6ab [improve] [broker] PIP-355: Enhancing Broker-Level Metrics 
for Pulsar (#22779)
c724f02e6ab is described below

commit c724f02e6ab4f342e805b21cc99c394b31aaf612
Author: Hang Chen <[email protected]>
AuthorDate: Wed Jun 12 11:21:03 2024 +0800

    [improve] [broker] PIP-355: Enhancing Broker-Level Metrics for Pulsar 
(#22779)
    
    PIP: #22778
---
 .../pulsar/broker/service/AbstractTopic.java       | 21 ++++++
 .../service/nonpersistent/NonPersistentTopic.java  |  6 ++
 .../broker/service/persistent/PersistentTopic.java | 13 +++-
 .../stats/prometheus/AggregatedBrokerStats.java    | 12 +++
 .../stats/prometheus/NamespaceStatsAggregator.java | 29 ++++++-
 .../pulsar/broker/stats/prometheus/TopicStats.java |  2 +
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 88 ++++++++++++++++++++++
 .../common/policies/data/stats/TopicStatsImpl.java |  6 ++
 8 files changed, 170 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index b6ce43b060c..572b54e0d3e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
 import com.google.common.base.MoreObjects;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -32,6 +33,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -128,6 +130,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
 
     private LongAdder bytesInCounter = new LongAdder();
     private LongAdder msgInCounter = new LongAdder();
+    private LongAdder systemTopicBytesInCounter = new LongAdder();
     private final LongAdder filteredEntriesCounter = new LongAdder();
 
     private static final AtomicLongFieldUpdater<AbstractTopic> 
RATE_LIMITED_UPDATER =
@@ -157,10 +160,13 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
 
     protected final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
     protected final LongAdder bytesOutFromRemovedSubscriptions = new 
LongAdder();
+    protected final LongAdder bytesOutFromRemovedSystemSubscriptions = new 
LongAdder();
     protected volatile Pair<String, List<EntryFilter>> entryFilters;
     protected volatile boolean transferring = false;
     private volatile List<PublishRateLimiter> activeRateLimiters;
 
+    protected Set<String> additionalSystemCursorNames = new TreeSet<>();
+
     public AbstractTopic(String topic, BrokerService brokerService) {
         this.topic = topic;
         this.brokerService = brokerService;
@@ -176,6 +182,8 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         this.preciseTopicPublishRateLimitingEnable = 
config.isPreciseTopicPublishRateLimiterEnable();
         topicPublishRateLimiter = new 
PublishRateLimiterImpl(brokerService.getPulsar().getMonotonicSnapshotClock());
         updateActiveRateLimiters();
+
+        additionalSystemCursorNames = 
brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames();
     }
 
     public SubscribeRate getSubscribeRate() {
@@ -921,6 +929,10 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         // increase counters
         bytesInCounter.add(msgSizeInBytes);
         msgInCounter.add(numOfMessages);
+
+        if (isSystemTopic()) {
+            systemTopicBytesInCounter.add(msgSizeInBytes);
+        }
     }
 
     private void handlePublishThrottling(Producer producer, int numOfMessages, 
long msgSizeInBytes) {
@@ -1184,6 +1196,10 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
                 + sumSubscriptions(AbstractSubscription::getMsgOutCounter);
     }
 
+    public long getSystemTopicBytesInCounter() {
+        return systemTopicBytesInCounter.longValue();
+    }
+
     public long getBytesOutCounter() {
         return bytesOutFromRemovedSubscriptions.longValue()
                 + sumSubscriptions(AbstractSubscription::getBytesOutCounter);
@@ -1369,4 +1385,9 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         }
         return Optional.empty();
     }
+
+    public boolean isSystemCursor(String sub) {
+        return COMPACTION_SUBSCRIPTION.equals(sub)
+                || (additionalSystemCursorNames != null && 
additionalSystemCursorNames.contains(sub));
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index ad09e7b756d..a6f65f6da32 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -940,9 +940,11 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
         stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : 
(stats.msgThroughputIn / stats.msgRateIn);
         stats.msgInCounter = getMsgInCounter();
         stats.bytesInCounter = getBytesInCounter();
+        stats.systemTopicBytesInCounter = getSystemTopicBytesInCounter();
         stats.waitingPublishers = getWaitingProducersCount();
         stats.bytesOutCounter = bytesOutFromRemovedSubscriptions.longValue();
         stats.msgOutCounter = msgOutFromRemovedSubscriptions.longValue();
+        stats.bytesOutInternalCounter = 
bytesOutFromRemovedSystemSubscriptions.longValue();
 
         subscriptions.forEach((name, subscription) -> {
             NonPersistentSubscriptionStatsImpl subStats = 
subscription.getStats(getStatsOptions);
@@ -952,6 +954,10 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
             stats.bytesOutCounter += subStats.bytesOutCounter;
             stats.msgOutCounter += subStats.msgOutCounter;
             stats.getSubscriptions().put(name, subStats);
+
+            if (isSystemCursor(name)) {
+                stats.bytesOutInternalCounter += subStats.bytesOutCounter;
+            }
         });
 
         replicators.forEach((cluster, replicator) -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d78dac899b7..d9f9c4689f6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -40,7 +40,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -287,7 +286,6 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     private final ExecutorService orderedExecutor;
 
     private volatile CloseFutures closeFutures;
-    private Set<String> additionalSystemCursorNames = new TreeSet<>();
 
     @Getter
     private final PersistentTopicMetrics persistentTopicMetrics = new 
PersistentTopicMetrics();
@@ -431,7 +429,6 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         } else {
             shadowSourceTopic = null;
         }
-        additionalSystemCursorNames = 
brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames();
     }
 
     @Override
@@ -1401,6 +1398,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             SubscriptionStatsImpl stats = sub.getStats(new 
GetStatsOptions(false, false, false, false, false));
             bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
             msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+
+            if (isSystemCursor(subscriptionName)) {
+                
bytesOutFromRemovedSystemSubscriptions.add(stats.bytesOutCounter);
+            }
         }
     }
 
@@ -2566,10 +2567,12 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : 
(stats.msgThroughputIn / stats.msgRateIn);
         stats.msgInCounter = getMsgInCounter();
         stats.bytesInCounter = getBytesInCounter();
+        stats.systemTopicBytesInCounter = getSystemTopicBytesInCounter();
         stats.msgChunkPublished = this.msgChunkPublished;
         stats.waitingPublishers = getWaitingProducersCount();
         stats.bytesOutCounter = bytesOutFromRemovedSubscriptions.longValue();
         stats.msgOutCounter = msgOutFromRemovedSubscriptions.longValue();
+        stats.bytesOutInternalCounter = 
bytesOutFromRemovedSystemSubscriptions.longValue();
         stats.publishRateLimitedTimes = publishRateLimitedTimes;
         TransactionBuffer txnBuffer = getTransactionBuffer();
         stats.ongoingTxnCount = txnBuffer.getOngoingTxnCount();
@@ -2596,6 +2599,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 topicMetricBean.labelsAndValues = v.labelsAndValues;
                 topicMetricBean.value += v.value;
             });
+
+            if (isSystemCursor(name)) {
+                stats.bytesOutInternalCounter += subStats.bytesOutCounter;
+            }
         });
 
         replicators.forEach((cluster, replicator) -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
index 037fb29a999..85096be9b00 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
@@ -35,6 +35,10 @@ public class AggregatedBrokerStats {
     public long msgBacklog;
     public long sizeBasedBacklogQuotaExceededEvictionCount;
     public long timeBasedBacklogQuotaExceededEvictionCount;
+    public long bytesInCounter;
+    public long bytesOutCounter;
+    public long systemTopicBytesInCounter;
+    public long bytesOutInternalCounter;
 
     @SuppressWarnings("DuplicatedCode")
     void updateStats(TopicStats stats) {
@@ -54,6 +58,10 @@ public class AggregatedBrokerStats {
         msgBacklog += stats.msgBacklog;
         timeBasedBacklogQuotaExceededEvictionCount += 
stats.timeBasedBacklogQuotaExceededEvictionCount;
         sizeBasedBacklogQuotaExceededEvictionCount += 
stats.sizeBasedBacklogQuotaExceededEvictionCount;
+        bytesInCounter += stats.bytesInCounter;
+        bytesOutCounter += stats.bytesOutCounter;
+        systemTopicBytesInCounter += stats.systemTopicBytesInCounter;
+        bytesOutInternalCounter += stats.bytesOutInternalCounter;
     }
 
     @SuppressWarnings("DuplicatedCode")
@@ -74,5 +82,9 @@ public class AggregatedBrokerStats {
         msgBacklog = 0;
         sizeBasedBacklogQuotaExceededEvictionCount = 0;
         timeBasedBacklogQuotaExceededEvictionCount = 0;
+        bytesInCounter = 0;
+        bytesOutCounter = 0;
+        systemTopicBytesInCounter = 0;
+        bytesOutInternalCounter = 0;
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 3728c3edd1e..3bbc9100b36 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -211,6 +211,8 @@ public class NamespaceStatsAggregator {
         stats.msgInCounter = tStatus.msgInCounter;
         stats.bytesInCounter = tStatus.bytesInCounter;
         stats.msgOutCounter = tStatus.msgOutCounter;
+        stats.systemTopicBytesInCounter = tStatus.systemTopicBytesInCounter;
+        stats.bytesOutInternalCounter = tStatus.getBytesOutInternalCounter();
         stats.bytesOutCounter = tStatus.bytesOutCounter;
         stats.averageMsgSize = tStatus.averageMsgSize;
         stats.publishRateLimitedTimes = tStatus.publishRateLimitedTimes;
@@ -358,6 +360,16 @@ public class NamespaceStatsAggregator {
                 brokerStats.timeBasedBacklogQuotaExceededEvictionCount, 
cluster, BacklogQuotaType.message_age);
 
         writeMetric(stream, "pulsar_broker_msg_backlog", 
brokerStats.msgBacklog, cluster);
+        long userOutBytes = brokerStats.bytesOutCounter - 
brokerStats.bytesOutInternalCounter;
+        writeMetric(stream, "pulsar_broker_out_bytes_total",
+                userOutBytes, cluster, "system_subscription", "false");
+        writeMetric(stream, "pulsar_broker_out_bytes_total",
+                brokerStats.bytesOutInternalCounter, cluster, 
"system_subscription", "true");
+        long userTopicInBytes = brokerStats.bytesInCounter - 
brokerStats.systemTopicBytesInCounter;
+        writeMetric(stream, "pulsar_broker_in_bytes_total",
+                userTopicInBytes, cluster, "system_topic", "false");
+        writeMetric(stream, "pulsar_broker_in_bytes_total",
+                brokerStats.systemTopicBytesInCounter, cluster, 
"system_topic", "true");
     }
 
     private static void printTopicsCountStats(PrometheusMetricStreams stream, 
Map<String, Long> namespaceTopicsCount,
@@ -412,7 +424,8 @@ public class NamespaceStatsAggregator {
                 namespace);
 
         stats.bucketDelayedIndexStats.forEach((k, metric) -> {
-            writeMetric(stream, metric.name, metric.value, cluster, namespace, 
metric.labelsAndValues);
+            String[] labels = ArrayUtils.addAll(new String[]{"namespace", 
namespace}, metric.labelsAndValues);
+            writeMetric(stream, metric.name, metric.value, cluster, labels);
         });
 
         writePulsarMsgBacklog(stream, stats.msgBacklog, cluster, namespace);
@@ -534,13 +547,21 @@ public class NamespaceStatsAggregator {
         stream.writeSample(metricName, value, "cluster", cluster);
     }
 
+    private static void writeMetric(PrometheusMetricStreams stream, String 
metricName, Number value,
+                                    String cluster, String... 
extraLabelsAndValues) {
+        String[] labels = ArrayUtils.addAll(new String[]{"cluster", cluster}, 
extraLabelsAndValues);
+        stream.writeSample(metricName, value, labels);
+    }
+
+
     private static void writeMetric(PrometheusMetricStreams stream, String 
metricName, Number value, String cluster,
-                                    String namespace, String... 
extraLabelsAndValues) {
-        String[] labelsAndValues = new String[]{"cluster", cluster, 
"namespace", namespace};
-        String[] labels = ArrayUtils.addAll(labelsAndValues, 
extraLabelsAndValues);
+                                    String namespace) {
+        String[] labels = new String[]{"cluster", cluster, "namespace", 
namespace};
         stream.writeSample(metricName, value, labels);
     }
 
+
+
     private static void writeReplicationStat(PrometheusMetricStreams stream, 
String metricName,
                                              AggregatedNamespaceStats 
namespaceStats,
                                              
Function<AggregatedReplicationStats, Number> sampleValueFunction,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index e8ab7b095dc..9eb4077225c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -56,6 +56,8 @@ class TopicStats {
     long msgOutCounter;
     @PulsarDeprecatedMetric(newMetricName = 
OpenTelemetryTopicStats.BYTES_OUT_COUNTER)
     long bytesOutCounter;
+    long systemTopicBytesInCounter;
+    long bytesOutInternalCounter;
     @PulsarDeprecatedMetric // Can be derived from MESSAGE_IN_COUNTER and 
BYTES_IN_COUNTER
     double averageMsgSize;
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 1fe0e99b498..0d7f8eb0aa3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -205,6 +205,94 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         producer3.close();
     }
 
+    @Test
+    public void testBrokerMetrics() throws Exception {
+        cleanup();
+        conf.setAdditionalSystemCursorNames(Set.of("test-cursor"));
+        setup();
+
+        Producer<byte[]> p1 = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
+        Producer<byte[]> p2 = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+        // system topic
+        Producer<byte[]> p3 = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/__change_events").create();
+
+        Consumer<byte[]> c1 = pulsarClient.newConsumer()
+                .topic("persistent://my-property/use/my-ns/my-topic1")
+                .subscriptionName("test")
+                .subscribe();
+
+        // additional system cursor
+        Consumer<byte[]> c2 = pulsarClient.newConsumer()
+                .topic("persistent://my-property/use/my-ns/my-topic2")
+                .subscriptionName("test-cursor")
+                .subscribe();
+
+        Consumer<byte[]> c3 = pulsarClient.newConsumer()
+                .topic("persistent://my-property/use/my-ns/__change_events")
+                .subscriptionName("test-v1")
+                .subscribe();
+
+        final int messages = 10;
+        for (int i = 0; i < messages; i++) {
+            String message = "my-message-" + i;
+            p1.send(message.getBytes());
+            p2.send(message.getBytes());
+            p3.send(message.getBytes());
+        }
+
+        for (int i = 0; i < messages; i++) {
+            c1.acknowledge(c1.receive());
+            c2.acknowledge(c2.receive());
+            c3.acknowledge(c3.receive());
+        }
+
+        // unsubscribe to test remove cursor impact on metric
+        c1.unsubscribe();
+        c2.unsubscribe();
+
+        
//admin.topics().unload("persistent://my-property/use/my-ns/my-topic1");
+
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
+        String metricsStr = statsOut.toString();
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+
+        metrics.entries().forEach(e -> {
+            System.out.println(e.getKey() + ": " + e.getValue());
+        });
+
+        List<Metric> bytesOutTotal = (List<Metric>) 
metrics.get("pulsar_broker_out_bytes_total");
+        List<Metric> bytesInTotal = (List<Metric>) 
metrics.get("pulsar_broker_in_bytes_total");
+        assertEquals(bytesOutTotal.size(), 2);
+        assertEquals(bytesInTotal.size(), 2);
+
+        double systemOutBytes = 0.0;
+        double userOutBytes = 0.0;
+        switch 
(bytesOutTotal.get(0).tags.get("system_subscription").toString()) {
+            case "true":
+                systemOutBytes = bytesOutTotal.get(0).value;
+                userOutBytes = bytesOutTotal.get(1).value;
+            case "false":
+                systemOutBytes = bytesOutTotal.get(1).value;
+                userOutBytes = bytesOutTotal.get(0).value;
+        }
+
+        double systemInBytes = 0.0;
+        double userInBytes = 0.0;
+        switch (bytesInTotal.get(0).tags.get("system_topic").toString()) {
+            case "true":
+                systemInBytes = bytesInTotal.get(0).value;
+                userInBytes = bytesInTotal.get(1).value;
+            case "false":
+                systemInBytes = bytesInTotal.get(1).value;
+                userInBytes = bytesInTotal.get(0).value;
+        }
+
+        assertEquals(userOutBytes / 2, systemOutBytes);
+        assertEquals(userInBytes / 2, systemInBytes);
+        assertEquals(userOutBytes + systemOutBytes, userInBytes + 
systemInBytes);
+    }
+
     @Test
     public void testMetricsTopicCount() throws Exception {
         String ns1 = "prop/ns-abc1";
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index 70cf4cd3414..022fffd3a7e 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -66,12 +66,18 @@ public class TopicStatsImpl implements TopicStats {
     /** Total messages published to the topic (msg). */
     public long msgInCounter;
 
+    /** Total bytes published to the system topic (bytes). */
+    public long systemTopicBytesInCounter;
+
     /** Total bytes delivered to consumer (bytes). */
     public long bytesOutCounter;
 
     /** Total messages delivered to consumer (msg). */
     public long msgOutCounter;
 
+    /** Total bytes delivered to internal cursors. */
+    public long bytesOutInternalCounter;
+
     /** Average size of published messages (bytes). */
     public double averageMsgSize;
 

Reply via email to