asafm commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r977944721
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
*/
package org.apache.pulsar.broker.stats.prometheus.metrics;
-import static
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
import java.io.IOException;
import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
/**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider}
implementation.
*/
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider,
PrometheusRawMetricsProvider {
private ScheduledExecutorService executor;
public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS =
"prometheusStatsLatencyRolloverSeconds";
public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS
= 60;
public static final String CLUSTER_NAME = "cluster";
public static final String DEFAULT_CLUSTER_NAME = "pulsar";
- private String cluster;
- private final CachingStatsProvider cachingStatsProvider;
-
+ private final CollectorRegistry registry;
+ private Map<String, String> labels;
/**
* These acts a registry of the metrics defined in this provider.
*/
- public final ConcurrentMap<String, LongAdderCounter> counters = new
ConcurrentSkipListMap<>();
- public final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges =
new ConcurrentSkipListMap<>();
- public final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats =
new ConcurrentSkipListMap<>();
+ public final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new
ConcurrentHashMap<>();
+ public final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>>
gauges = new ConcurrentHashMap<>();
+ public final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger>
opStats = new ConcurrentHashMap<>();
+ final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger>
threadScopedOpStats =
+ new ConcurrentHashMap<>();
+ final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter>
threadScopedCounters =
+ new ConcurrentHashMap<>();
+
public PrometheusMetricsProvider() {
- this.cachingStatsProvider = new CachingStatsProvider(new
StatsProvider() {
- @Override
- public void start(Configuration conf) {
- // nop
- }
-
- @Override
- public void stop() {
- // nop
- }
-
- @Override
- public StatsLogger getStatsLogger(String scope) {
- return new
PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
- }
-
- @Override
- public String getStatsName(String... statsComponents) {
- String completeName;
- if (statsComponents.length == 0) {
- return "";
- } else if (statsComponents[0].isEmpty()) {
- completeName = StringUtils.join(statsComponents, '_', 1,
statsComponents.length);
- } else {
- completeName = StringUtils.join(statsComponents, '_');
- }
- return Collector.sanitizeMetricName(completeName);
- }
- });
+ this(CollectorRegistry.defaultRegistry);
+ }
+
+ public PrometheusMetricsProvider(CollectorRegistry registry) {
+ this.registry = registry;
}
- @Override
public void start(Configuration conf) {
- executor = Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("metrics"));
+ executor = Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("metrics"));
int latencyRolloverSeconds =
conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
- cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+ labels = Collections.singletonMap(CLUSTER_NAME,
conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+
+ executor.scheduleAtFixedRate(() -> {
+ rotateLatencyCollection();
+ }, 1, latencyRolloverSeconds, TimeUnit.SECONDS);
-
executor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::rotateLatencyCollection),
- 1, latencyRolloverSeconds, TimeUnit.SECONDS);
}
- @Override
public void stop() {
- executor.shutdownNow();
+ executor.shutdown();
}
- @Override
public StatsLogger getStatsLogger(String scope) {
- return this.cachingStatsProvider.getStatsLogger(scope);
+ return new PrometheusStatsLogger(PrometheusMetricsProvider.this,
scope, labels);
}
@Override
public void writeAllMetrics(Writer writer) throws IOException {
- gauges.forEach((name, gauge) ->
PrometheusTextFormatUtil.writeGauge(writer, name, cluster, gauge));
- counters.forEach((name, counter) ->
PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter));
- opStats.forEach((name, opStatLogger) ->
PrometheusTextFormatUtil.writeOpStat(writer, name, cluster,
- opStatLogger));
+ PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat();
Review Comment:
Reiterate question: The only caveat I see is this: Do you see any chance
that we will have two different pairs of (scopeContext, gauge/metric/...) such
that they have the same metric name, maybe the only difference is in the labels?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]