This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 4f8a11c4883 [FLINK-30020][prometheus] Use separate CollectorRegistry 4f8a11c4883 is described below commit 4f8a11c48832dd7f8f0804e34aeedd7abd806ef6 Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Mon Nov 14 10:25:52 2022 +0100 [FLINK-30020][prometheus] Use separate CollectorRegistry --- .../prometheus/AbstractPrometheusReporter.java | 8 ++++--- .../prometheus/PrometheusPushGatewayReporter.java | 3 +-- .../metrics/prometheus/PrometheusReporter.java | 4 ++-- .../PrometheusReporterTaskScopeTest.java | 25 +++++++++++----------- 4 files changed, 20 insertions(+), 20 deletions(-) diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java index e612de49fdc..7e2dab464dc 100644 --- a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java +++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java @@ -75,6 +75,8 @@ public abstract class AbstractPrometheusReporter implements MetricReporter { private CharacterFilter labelValueCharactersFilter = CHARACTER_FILTER; + @VisibleForTesting final CollectorRegistry registry = new CollectorRegistry(true); + @Override public void open(MetricConfig config) { boolean filterLabelValueCharacters = @@ -89,7 +91,7 @@ public abstract class AbstractPrometheusReporter implements MetricReporter { @Override public void close() { - CollectorRegistry.defaultRegistry.clear(); + registry.clear(); } @Override @@ -126,7 +128,7 @@ public abstract class AbstractPrometheusReporter implements MetricReporter { scopedMetricName, helpString); try { - collector.register(); + collector.register(registry); } catch (Exception e) { log.warn("There was a problem registering metric {}.", metricName, e); } @@ -245,7 +247,7 @@ public abstract class AbstractPrometheusReporter implements MetricReporter { if (count == 1) { try { - CollectorRegistry.defaultRegistry.unregister(collector); + registry.unregister(collector); } catch (Exception e) { log.warn("There was a problem unregistering metric {}.", scopedMetricName, e); } diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java index 3a2e6782de6..226858a3182 100644 --- a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java +++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporter.java @@ -25,7 +25,6 @@ import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.Scheduled; import org.apache.flink.util.Preconditions; -import io.prometheus.client.CollectorRegistry; import io.prometheus.client.exporter.PushGateway; import java.io.IOException; @@ -59,7 +58,7 @@ public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter im @Override public void report() { try { - pushGateway.push(CollectorRegistry.defaultRegistry, jobName, groupingKey); + pushGateway.push(registry, jobName, groupingKey); } catch (Exception e) { log.warn( "Failed to push metrics to PushGateway with jobName {}, groupingKey {}.", diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java index 2547a3057c8..50ce79b5a7a 100644 --- a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java +++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java @@ -27,6 +27,7 @@ import org.apache.flink.util.Preconditions; import io.prometheus.client.exporter.HTTPServer; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.Iterator; /** {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus. */ @@ -46,8 +47,7 @@ public class PrometheusReporter extends AbstractPrometheusReporter { while (ports.hasNext()) { port = ports.next(); try { - // internally accesses CollectorRegistry.defaultRegistry - httpServer = new HTTPServer(port); + httpServer = new HTTPServer(new InetSocketAddress(port), this.registry); log.info("Started PrometheusReporter HTTP server on port {}.", port); break; } catch (IOException ioe) { // assume port conflict diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java index a522d20738e..29b8805d5e6 100644 --- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java +++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java @@ -28,7 +28,6 @@ import org.apache.flink.metrics.util.TestMeter; import org.apache.flink.util.NetUtils; import com.mashape.unirest.http.exceptions.UnirestException; -import io.prometheus.client.CollectorRegistry; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -81,11 +80,11 @@ class PrometheusReporterTaskScopeTest { reporter.notifyOfAddedMetric(counter2, METRIC_NAME, metricGroup2); assertThat( - CollectorRegistry.defaultRegistry.getSampleValue( + reporter.registry.getSampleValue( getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES_1)) .isEqualTo(1.); assertThat( - CollectorRegistry.defaultRegistry.getSampleValue( + reporter.registry.getSampleValue( getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES_2)) .isEqualTo(2.); } @@ -99,11 +98,11 @@ class PrometheusReporterTaskScopeTest { reporter.notifyOfAddedMetric(gauge2, METRIC_NAME, metricGroup2); assertThat( - CollectorRegistry.defaultRegistry.getSampleValue( + reporter.registry.getSampleValue( getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES_1)) .isEqualTo(3.); assertThat( - CollectorRegistry.defaultRegistry.getSampleValue( + reporter.registry.getSampleValue( getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES_2)) .isEqualTo(4.); } @@ -117,11 +116,11 @@ class PrometheusReporterTaskScopeTest { reporter.notifyOfAddedMetric(meter2, METRIC_NAME, metricGroup2); assertThat( - CollectorRegistry.defaultRegistry.getSampleValue( + reporter.registry.getSampleValue( getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES_1)) .isEqualTo(meter1.getRate()); assertThat( - CollectorRegistry.defaultRegistry.getSampleValue( + reporter.registry.getSampleValue( getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES_2)) .isEqualTo(meter2.getRate()); } @@ -143,13 +142,13 @@ class PrometheusReporterTaskScopeTest { final String[] labelNamesWithQuantile = addToArray(LABEL_NAMES, "quantile"); for (Double quantile : PrometheusReporter.HistogramSummaryProxy.QUANTILES) { assertThat( - CollectorRegistry.defaultRegistry.getSampleValue( + reporter.registry.getSampleValue( getLogicalScope(METRIC_NAME), labelNamesWithQuantile, addToArray(LABEL_VALUES_1, "" + quantile))) .isEqualTo(quantile); assertThat( - CollectorRegistry.defaultRegistry.getSampleValue( + reporter.registry.getSampleValue( getLogicalScope(METRIC_NAME), labelNamesWithQuantile, addToArray(LABEL_VALUES_2, "" + quantile))) @@ -168,23 +167,23 @@ class PrometheusReporterTaskScopeTest { reporter.notifyOfAddedMetric(counter2, METRIC_NAME, metricGroup2); assertThat( - CollectorRegistry.defaultRegistry.getSampleValue( + reporter.registry.getSampleValue( getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES_1)) .isEqualTo(1.); assertThat( - CollectorRegistry.defaultRegistry.getSampleValue( + reporter.registry.getSampleValue( getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES_2)) .isEqualTo(2.); reporter.notifyOfRemovedMetric(counter2, METRIC_NAME, metricGroup2); assertThat( - CollectorRegistry.defaultRegistry.getSampleValue( + reporter.registry.getSampleValue( getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES_1)) .isEqualTo(1.); reporter.notifyOfRemovedMetric(counter1, METRIC_NAME, metricGroup1); assertThat( - CollectorRegistry.defaultRegistry.getSampleValue( + reporter.registry.getSampleValue( getLogicalScope(METRIC_NAME), LABEL_NAMES, LABEL_VALUES_1)) .isNull(); }