This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c724f02e6ab [improve] [broker] PIP-355: Enhancing Broker-Level Metrics
for Pulsar (#22779)
c724f02e6ab is described below
commit c724f02e6ab4f342e805b21cc99c394b31aaf612
Author: Hang Chen <[email protected]>
AuthorDate: Wed Jun 12 11:21:03 2024 +0800
[improve] [broker] PIP-355: Enhancing Broker-Level Metrics for Pulsar
(#22779)
PIP: #22778
---
.../pulsar/broker/service/AbstractTopic.java | 21 ++++++
.../service/nonpersistent/NonPersistentTopic.java | 6 ++
.../broker/service/persistent/PersistentTopic.java | 13 +++-
.../stats/prometheus/AggregatedBrokerStats.java | 12 +++
.../stats/prometheus/NamespaceStatsAggregator.java | 29 ++++++-
.../pulsar/broker/stats/prometheus/TopicStats.java | 2 +
.../pulsar/broker/stats/PrometheusMetricsTest.java | 88 ++++++++++++++++++++++
.../common/policies/data/stats/TopicStatsImpl.java | 6 ++
8 files changed, 170 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index b6ce43b060c..572b54e0d3e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import com.google.common.base.MoreObjects;
import java.util.ArrayList;
import java.util.Arrays;
@@ -32,6 +33,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -128,6 +130,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
private LongAdder bytesInCounter = new LongAdder();
private LongAdder msgInCounter = new LongAdder();
+ private LongAdder systemTopicBytesInCounter = new LongAdder();
private final LongAdder filteredEntriesCounter = new LongAdder();
private static final AtomicLongFieldUpdater<AbstractTopic>
RATE_LIMITED_UPDATER =
@@ -157,10 +160,13 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
protected final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
protected final LongAdder bytesOutFromRemovedSubscriptions = new
LongAdder();
+ protected final LongAdder bytesOutFromRemovedSystemSubscriptions = new
LongAdder();
protected volatile Pair<String, List<EntryFilter>> entryFilters;
protected volatile boolean transferring = false;
private volatile List<PublishRateLimiter> activeRateLimiters;
+ protected Set<String> additionalSystemCursorNames = new TreeSet<>();
+
public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.brokerService = brokerService;
@@ -176,6 +182,8 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
this.preciseTopicPublishRateLimitingEnable =
config.isPreciseTopicPublishRateLimiterEnable();
topicPublishRateLimiter = new
PublishRateLimiterImpl(brokerService.getPulsar().getMonotonicSnapshotClock());
updateActiveRateLimiters();
+
+ additionalSystemCursorNames =
brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames();
}
public SubscribeRate getSubscribeRate() {
@@ -921,6 +929,10 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
// increase counters
bytesInCounter.add(msgSizeInBytes);
msgInCounter.add(numOfMessages);
+
+ if (isSystemTopic()) {
+ systemTopicBytesInCounter.add(msgSizeInBytes);
+ }
}
private void handlePublishThrottling(Producer producer, int numOfMessages,
long msgSizeInBytes) {
@@ -1184,6 +1196,10 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
+ sumSubscriptions(AbstractSubscription::getMsgOutCounter);
}
+ public long getSystemTopicBytesInCounter() {
+ return systemTopicBytesInCounter.longValue();
+ }
+
public long getBytesOutCounter() {
return bytesOutFromRemovedSubscriptions.longValue()
+ sumSubscriptions(AbstractSubscription::getBytesOutCounter);
@@ -1369,4 +1385,9 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
}
return Optional.empty();
}
+
+ public boolean isSystemCursor(String sub) {
+ return COMPACTION_SUBSCRIPTION.equals(sub)
+ || (additionalSystemCursorNames != null &&
additionalSystemCursorNames.contains(sub));
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index ad09e7b756d..a6f65f6da32 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -940,9 +940,11 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 :
(stats.msgThroughputIn / stats.msgRateIn);
stats.msgInCounter = getMsgInCounter();
stats.bytesInCounter = getBytesInCounter();
+ stats.systemTopicBytesInCounter = getSystemTopicBytesInCounter();
stats.waitingPublishers = getWaitingProducersCount();
stats.bytesOutCounter = bytesOutFromRemovedSubscriptions.longValue();
stats.msgOutCounter = msgOutFromRemovedSubscriptions.longValue();
+ stats.bytesOutInternalCounter =
bytesOutFromRemovedSystemSubscriptions.longValue();
subscriptions.forEach((name, subscription) -> {
NonPersistentSubscriptionStatsImpl subStats =
subscription.getStats(getStatsOptions);
@@ -952,6 +954,10 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
stats.bytesOutCounter += subStats.bytesOutCounter;
stats.msgOutCounter += subStats.msgOutCounter;
stats.getSubscriptions().put(name, subStats);
+
+ if (isSystemCursor(name)) {
+ stats.bytesOutInternalCounter += subStats.bytesOutCounter;
+ }
});
replicators.forEach((cluster, replicator) -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d78dac899b7..d9f9c4689f6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -40,7 +40,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.TreeSet;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -287,7 +286,6 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
private final ExecutorService orderedExecutor;
private volatile CloseFutures closeFutures;
- private Set<String> additionalSystemCursorNames = new TreeSet<>();
@Getter
private final PersistentTopicMetrics persistentTopicMetrics = new
PersistentTopicMetrics();
@@ -431,7 +429,6 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
} else {
shadowSourceTopic = null;
}
- additionalSystemCursorNames =
brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames();
}
@Override
@@ -1401,6 +1398,10 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
SubscriptionStatsImpl stats = sub.getStats(new
GetStatsOptions(false, false, false, false, false));
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+
+ if (isSystemCursor(subscriptionName)) {
+
bytesOutFromRemovedSystemSubscriptions.add(stats.bytesOutCounter);
+ }
}
}
@@ -2566,10 +2567,12 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 :
(stats.msgThroughputIn / stats.msgRateIn);
stats.msgInCounter = getMsgInCounter();
stats.bytesInCounter = getBytesInCounter();
+ stats.systemTopicBytesInCounter = getSystemTopicBytesInCounter();
stats.msgChunkPublished = this.msgChunkPublished;
stats.waitingPublishers = getWaitingProducersCount();
stats.bytesOutCounter = bytesOutFromRemovedSubscriptions.longValue();
stats.msgOutCounter = msgOutFromRemovedSubscriptions.longValue();
+ stats.bytesOutInternalCounter =
bytesOutFromRemovedSystemSubscriptions.longValue();
stats.publishRateLimitedTimes = publishRateLimitedTimes;
TransactionBuffer txnBuffer = getTransactionBuffer();
stats.ongoingTxnCount = txnBuffer.getOngoingTxnCount();
@@ -2596,6 +2599,10 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
topicMetricBean.labelsAndValues = v.labelsAndValues;
topicMetricBean.value += v.value;
});
+
+ if (isSystemCursor(name)) {
+ stats.bytesOutInternalCounter += subStats.bytesOutCounter;
+ }
});
replicators.forEach((cluster, replicator) -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
index 037fb29a999..85096be9b00 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
@@ -35,6 +35,10 @@ public class AggregatedBrokerStats {
public long msgBacklog;
public long sizeBasedBacklogQuotaExceededEvictionCount;
public long timeBasedBacklogQuotaExceededEvictionCount;
+ public long bytesInCounter;
+ public long bytesOutCounter;
+ public long systemTopicBytesInCounter;
+ public long bytesOutInternalCounter;
@SuppressWarnings("DuplicatedCode")
void updateStats(TopicStats stats) {
@@ -54,6 +58,10 @@ public class AggregatedBrokerStats {
msgBacklog += stats.msgBacklog;
timeBasedBacklogQuotaExceededEvictionCount +=
stats.timeBasedBacklogQuotaExceededEvictionCount;
sizeBasedBacklogQuotaExceededEvictionCount +=
stats.sizeBasedBacklogQuotaExceededEvictionCount;
+ bytesInCounter += stats.bytesInCounter;
+ bytesOutCounter += stats.bytesOutCounter;
+ systemTopicBytesInCounter += stats.systemTopicBytesInCounter;
+ bytesOutInternalCounter += stats.bytesOutInternalCounter;
}
@SuppressWarnings("DuplicatedCode")
@@ -74,5 +82,9 @@ public class AggregatedBrokerStats {
msgBacklog = 0;
sizeBasedBacklogQuotaExceededEvictionCount = 0;
timeBasedBacklogQuotaExceededEvictionCount = 0;
+ bytesInCounter = 0;
+ bytesOutCounter = 0;
+ systemTopicBytesInCounter = 0;
+ bytesOutInternalCounter = 0;
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 3728c3edd1e..3bbc9100b36 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -211,6 +211,8 @@ public class NamespaceStatsAggregator {
stats.msgInCounter = tStatus.msgInCounter;
stats.bytesInCounter = tStatus.bytesInCounter;
stats.msgOutCounter = tStatus.msgOutCounter;
+ stats.systemTopicBytesInCounter = tStatus.systemTopicBytesInCounter;
+ stats.bytesOutInternalCounter = tStatus.getBytesOutInternalCounter();
stats.bytesOutCounter = tStatus.bytesOutCounter;
stats.averageMsgSize = tStatus.averageMsgSize;
stats.publishRateLimitedTimes = tStatus.publishRateLimitedTimes;
@@ -358,6 +360,16 @@ public class NamespaceStatsAggregator {
brokerStats.timeBasedBacklogQuotaExceededEvictionCount,
cluster, BacklogQuotaType.message_age);
writeMetric(stream, "pulsar_broker_msg_backlog",
brokerStats.msgBacklog, cluster);
+ long userOutBytes = brokerStats.bytesOutCounter -
brokerStats.bytesOutInternalCounter;
+ writeMetric(stream, "pulsar_broker_out_bytes_total",
+ userOutBytes, cluster, "system_subscription", "false");
+ writeMetric(stream, "pulsar_broker_out_bytes_total",
+ brokerStats.bytesOutInternalCounter, cluster,
"system_subscription", "true");
+ long userTopicInBytes = brokerStats.bytesInCounter -
brokerStats.systemTopicBytesInCounter;
+ writeMetric(stream, "pulsar_broker_in_bytes_total",
+ userTopicInBytes, cluster, "system_topic", "false");
+ writeMetric(stream, "pulsar_broker_in_bytes_total",
+ brokerStats.systemTopicBytesInCounter, cluster,
"system_topic", "true");
}
private static void printTopicsCountStats(PrometheusMetricStreams stream,
Map<String, Long> namespaceTopicsCount,
@@ -412,7 +424,8 @@ public class NamespaceStatsAggregator {
namespace);
stats.bucketDelayedIndexStats.forEach((k, metric) -> {
- writeMetric(stream, metric.name, metric.value, cluster, namespace,
metric.labelsAndValues);
+ String[] labels = ArrayUtils.addAll(new String[]{"namespace",
namespace}, metric.labelsAndValues);
+ writeMetric(stream, metric.name, metric.value, cluster, labels);
});
writePulsarMsgBacklog(stream, stats.msgBacklog, cluster, namespace);
@@ -534,13 +547,21 @@ public class NamespaceStatsAggregator {
stream.writeSample(metricName, value, "cluster", cluster);
}
+ private static void writeMetric(PrometheusMetricStreams stream, String
metricName, Number value,
+ String cluster, String...
extraLabelsAndValues) {
+ String[] labels = ArrayUtils.addAll(new String[]{"cluster", cluster},
extraLabelsAndValues);
+ stream.writeSample(metricName, value, labels);
+ }
+
+
private static void writeMetric(PrometheusMetricStreams stream, String
metricName, Number value, String cluster,
- String namespace, String...
extraLabelsAndValues) {
- String[] labelsAndValues = new String[]{"cluster", cluster,
"namespace", namespace};
- String[] labels = ArrayUtils.addAll(labelsAndValues,
extraLabelsAndValues);
+ String namespace) {
+ String[] labels = new String[]{"cluster", cluster, "namespace",
namespace};
stream.writeSample(metricName, value, labels);
}
+
+
private static void writeReplicationStat(PrometheusMetricStreams stream,
String metricName,
AggregatedNamespaceStats
namespaceStats,
Function<AggregatedReplicationStats, Number> sampleValueFunction,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index e8ab7b095dc..9eb4077225c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -56,6 +56,8 @@ class TopicStats {
long msgOutCounter;
@PulsarDeprecatedMetric(newMetricName =
OpenTelemetryTopicStats.BYTES_OUT_COUNTER)
long bytesOutCounter;
+ long systemTopicBytesInCounter;
+ long bytesOutInternalCounter;
@PulsarDeprecatedMetric // Can be derived from MESSAGE_IN_COUNTER and
BYTES_IN_COUNTER
double averageMsgSize;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 1fe0e99b498..0d7f8eb0aa3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -205,6 +205,94 @@ public class PrometheusMetricsTest extends BrokerTestBase {
producer3.close();
}
+ @Test
+ public void testBrokerMetrics() throws Exception {
+ cleanup();
+ conf.setAdditionalSystemCursorNames(Set.of("test-cursor"));
+ setup();
+
+ Producer<byte[]> p1 =
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
+ Producer<byte[]> p2 =
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+ // system topic
+ Producer<byte[]> p3 =
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/__change_events").create();
+
+ Consumer<byte[]> c1 = pulsarClient.newConsumer()
+ .topic("persistent://my-property/use/my-ns/my-topic1")
+ .subscriptionName("test")
+ .subscribe();
+
+ // additional system cursor
+ Consumer<byte[]> c2 = pulsarClient.newConsumer()
+ .topic("persistent://my-property/use/my-ns/my-topic2")
+ .subscriptionName("test-cursor")
+ .subscribe();
+
+ Consumer<byte[]> c3 = pulsarClient.newConsumer()
+ .topic("persistent://my-property/use/my-ns/__change_events")
+ .subscriptionName("test-v1")
+ .subscribe();
+
+ final int messages = 10;
+ for (int i = 0; i < messages; i++) {
+ String message = "my-message-" + i;
+ p1.send(message.getBytes());
+ p2.send(message.getBytes());
+ p3.send(message.getBytes());
+ }
+
+ for (int i = 0; i < messages; i++) {
+ c1.acknowledge(c1.receive());
+ c2.acknowledge(c2.receive());
+ c3.acknowledge(c3.receive());
+ }
+
+ // unsubscribe to test remove cursor impact on metric
+ c1.unsubscribe();
+ c2.unsubscribe();
+
+
//admin.topics().unload("persistent://my-property/use/my-ns/my-topic1");
+
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsTestUtil.generate(pulsar, true, false, false,
statsOut);
+ String metricsStr = statsOut.toString();
+ Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+
+ metrics.entries().forEach(e -> {
+ System.out.println(e.getKey() + ": " + e.getValue());
+ });
+
+ List<Metric> bytesOutTotal = (List<Metric>)
metrics.get("pulsar_broker_out_bytes_total");
+ List<Metric> bytesInTotal = (List<Metric>)
metrics.get("pulsar_broker_in_bytes_total");
+ assertEquals(bytesOutTotal.size(), 2);
+ assertEquals(bytesInTotal.size(), 2);
+
+ double systemOutBytes = 0.0;
+ double userOutBytes = 0.0;
+ switch
(bytesOutTotal.get(0).tags.get("system_subscription").toString()) {
+ case "true":
+ systemOutBytes = bytesOutTotal.get(0).value;
+ userOutBytes = bytesOutTotal.get(1).value;
+ case "false":
+ systemOutBytes = bytesOutTotal.get(1).value;
+ userOutBytes = bytesOutTotal.get(0).value;
+ }
+
+ double systemInBytes = 0.0;
+ double userInBytes = 0.0;
+ switch (bytesInTotal.get(0).tags.get("system_topic").toString()) {
+ case "true":
+ systemInBytes = bytesInTotal.get(0).value;
+ userInBytes = bytesInTotal.get(1).value;
+ case "false":
+ systemInBytes = bytesInTotal.get(1).value;
+ userInBytes = bytesInTotal.get(0).value;
+ }
+
+ assertEquals(userOutBytes / 2, systemOutBytes);
+ assertEquals(userInBytes / 2, systemInBytes);
+ assertEquals(userOutBytes + systemOutBytes, userInBytes +
systemInBytes);
+ }
+
@Test
public void testMetricsTopicCount() throws Exception {
String ns1 = "prop/ns-abc1";
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index 70cf4cd3414..022fffd3a7e 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -66,12 +66,18 @@ public class TopicStatsImpl implements TopicStats {
/** Total messages published to the topic (msg). */
public long msgInCounter;
+ /** Total bytes published to the system topic (bytes). */
+ public long systemTopicBytesInCounter;
+
/** Total bytes delivered to consumer (bytes). */
public long bytesOutCounter;
/** Total messages delivered to consumer (msg). */
public long msgOutCounter;
+ /** Total bytes delivered to internal cursors. */
+ public long bytesOutInternalCounter;
+
/** Average size of published messages (bytes). */
public double averageMsgSize;