asafm commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r977968050
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1632,6 +1639,35 @@ public static Multimap<String, Metric>
parseMetrics(String metrics) {
return parsed;
}
+ @Test
+ public void testRawMetricsProvider() throws IOException {
+ PrometheusMetricsProvider rawMetricsProvider = new
PrometheusMetricsProvider();
+ try {
+ rawMetricsProvider.start(new PropertiesConfiguration());
+ rawMetricsProvider.getStatsLogger("test_raw")
+ .scopeLabel("topic", "persistent://public/default/test-v1")
+ .getOpStatsLogger("writeLatency")
+ .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+ getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
+ HttpClient httpClient = HttpClientBuilder.create().build();
+ final String metricsEndPoint = getPulsar().getWebServiceAddress()
+ "/metrics";
+ HttpResponse response = httpClient.execute(new
HttpGet(metricsEndPoint));
+ Multimap<String, Metric> metrics =
parseMetrics(EntityUtils.toString(response.getEntity()));
+ if (((List<Metric>) metrics.get("test_raw_writeLatency_count"))
Review Comment:
Let's extract to variable `(List<Metric>)
metrics.get("test_raw_writeLatency_count"))
.get(0)` to make it easier to read the next lines
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -24,37 +24,44 @@
import io.prometheus.client.Collector;
import java.io.IOException;
import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
/**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider}
implementation.
*/
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider,
PrometheusRawMetricsProvider {
Review Comment:
I don't see a reason to implement `PrometheusRawMetricsProvider`
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsProvider.java:
##########
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.commons.configuration.Configuration;
+
+/**
+ * A {@code CachingStatsProvider} adds the caching functionality to an
existing {@code StatsProvider}.
+ *
+ * <p>The stats provider will cache the stats objects created by the other
{@code StatsProvider} to allow
+ * the reusability of stats objects and avoid creating a lot of stats objects.
+ */
+public class CachingStatsProvider implements StatsProvider {
+
+ protected final StatsProvider underlying;
+ protected final ConcurrentMap<String, StatsLogger> statsLoggers;
+
+ public CachingStatsProvider(StatsProvider provider) {
+ this.underlying = provider;
+ this.statsLoggers = new ConcurrentHashMap<String, StatsLogger>();
+ }
+
+ @Override
+ public void start(Configuration conf) {
+ this.underlying.start(conf);
+ }
+
+ @Override
+ public void stop() {
+ this.underlying.stop();
+ }
+
+ @Override
+ public StatsLogger getStatsLogger(String scope) {
Review Comment:
Now that we have labels, should `String` -> `ScopeContext`? since sope is
just the metric prefix, and in theory you can have same metric prefix,
different labels
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1632,6 +1639,35 @@ public static Multimap<String, Metric>
parseMetrics(String metrics) {
return parsed;
}
+ @Test
+ public void testRawMetricsProvider() throws IOException {
+ PrometheusMetricsProvider rawMetricsProvider = new
PrometheusMetricsProvider();
+ try {
+ rawMetricsProvider.start(new PropertiesConfiguration());
+ rawMetricsProvider.getStatsLogger("test_raw")
+ .scopeLabel("topic", "persistent://public/default/test-v1")
+ .getOpStatsLogger("writeLatency")
+ .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+ getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
Review Comment:
Why are you doing that test?
The whole feature is about adding support of labels to BK client metrics.
You changed `PrometheusMetricsProvider` to do that.
You changed the integration point of `PrometheusMetricsProvider` to
`PrometheusMetricsGenerator`
I don't see any relation to `PrometheusRawMetricsProvider`.
I reiterate the last comment - you should test how suddenly BK client
metrics have labels unless I'm missing something big here.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
+import io.prometheus.client.CollectorRegistry;
+import java.util.Enumeration;
+import java.util.Map;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+
+/**
+ * Logic to write metrics in Prometheus text format.
+ */
+public class PrometheusTextFormat {
+ public static void writeGauge(SimpleTextOutputStream w, String name,
SimpleGauge<? extends Number> gauge) {
+ // Example:
+ // # TYPE bookie_storage_entries_count gauge
+ // bookie_storage_entries_count 519
+ w.write("# TYPE ").write(name).write(" gauge\n");
+ w.write(name);
+ writeLabels(w, gauge.getLabels());
+ w.write(' ').write(gauge.getSample().toString()).write('\n');
+
+ }
+
+ public static void writeCounter(SimpleTextOutputStream w, String name,
LongAdderCounter counter) {
+ // Example:
+ // # TYPE jvm_threads_started_total counter
+ // jvm_threads_started_total 59
+ w.write("# TYPE ").write(name).write(" counter\n");
+ w.write(name);
+ writeLabels(w, counter.getLabels());
+ w.write(' ').write(counter.get().toString()).write('\n');
+ }
+
+ public static void writeOpStat(SimpleTextOutputStream w, String name,
DataSketchesOpStatsLogger opStat) {
+ // Example:
+ // # TYPE bookie_journal_JOURNAL_ADD_ENTRY summary
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.5",}
NaN
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.75",}
NaN
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.95",}
NaN
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.99",}
NaN
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.999",}
NaN
+ //
bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.9999",} NaN
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="1.0",}
NaN
+ // bookie_journal_JOURNAL_ADD_ENTRY_count{success="false",} 0.0
+ // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="false",} 0.0
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.5",}
1.706
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.75",}
1.89
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.95",}
2.121
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.99",}
10.708
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.999",}
10.902
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.9999",}
10.902
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="1.0",}
10.902
+ // bookie_journal_JOURNAL_ADD_ENTRY_count{success="true",} 658.0
+ // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="true",}
1265.0800000000002
+ w.write("# TYPE ").write(name).write(" summary\n");
+ writeQuantile(w, opStat, name, false, 0.5);
+ writeQuantile(w, opStat, name, false, 0.75);
+ writeQuantile(w, opStat, name, false, 0.95);
+ writeQuantile(w, opStat, name, false, 0.99);
+ writeQuantile(w, opStat, name, false, 0.999);
+ writeQuantile(w, opStat, name, false, 0.9999);
+ writeQuantile(w, opStat, name, false, 1.0);
+ writeCount(w, opStat, name, false);
+ writeSum(w, opStat, name, false);
+
+ writeQuantile(w, opStat, name, true, 0.5);
+ writeQuantile(w, opStat, name, true, 0.75);
+ writeQuantile(w, opStat, name, true, 0.95);
+ writeQuantile(w, opStat, name, true, 0.99);
+ writeQuantile(w, opStat, name, true, 0.999);
+ writeQuantile(w, opStat, name, true, 0.9999);
+ writeQuantile(w, opStat, name, true, 1.0);
+ writeCount(w, opStat, name, true);
+ writeSum(w, opStat, name, true);
+ }
+
+ private static void writeQuantile(SimpleTextOutputStream w,
DataSketchesOpStatsLogger opStat, String name,
+ Boolean success, double quantile) {
+ w.write(name)
+ .write("{success=\"").write(success.toString())
+ .write("\",quantile=\"").write(Double.toString(quantile));
+ if (!opStat.getLabels().isEmpty()) {
+ w.write("\", ");
+ writeLabelsNoBraces(w, opStat.getLabels());
+ } else {
+ w.write("\"");
+ }
+ w.write("} ")
+ .write(Double.toString(opStat.getQuantileValue(success,
quantile))).write('\n');
+ }
+
+ private static void writeCount(SimpleTextOutputStream w,
DataSketchesOpStatsLogger opStat, String name,
+ Boolean success) {
+ w.write(name).write("_count{success=\"").write(success.toString());
+ if (!opStat.getLabels().isEmpty()) {
+ w.write("\", ");
+ writeLabelsNoBraces(w, opStat.getLabels());
+ } else {
+ w.write("\"");
+ }
+ w.write("} ")
+ .write(Long.toString(opStat.getCount(success))).write('\n');
+ }
+
+ private static void writeSum(SimpleTextOutputStream w,
DataSketchesOpStatsLogger opStat, String name,
+ Boolean success) {
+ w.write(name).write("_sum{success=\"").write(success.toString());
+ if (!opStat.getLabels().isEmpty()) {
+ w.write("\", ");
+ writeLabelsNoBraces(w, opStat.getLabels());
+ } else {
+ w.write("\"");
+ }
+ w.write("} ")
+ .write(Double.toString(opStat.getSum(success))).write('\n');
+ }
+
+ public static void
writeMetricsCollectedByPrometheusClient(SimpleTextOutputStream w,
CollectorRegistry registry) {
Review Comment:
Do you need this method?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]