This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ea90f4d2ae [FLINK-38372][Runtime / Metrics] Introduces whitelist to 
the flink-metrics-prometheus
7ea90f4d2ae is described below

commit 7ea90f4d2aef3abc3ce6c09dd67513c52089ed3f
Author: Myracle <[email protected]>
AuthorDate: Wed Sep 17 17:30:53 2025 +0800

    [FLINK-38372][Runtime / Metrics] Introduces whitelist to the 
flink-metrics-prometheus
---
 .../content.zh/docs/deployment/metric_reporters.md |  1 +
 docs/content/docs/deployment/metric_reporters.md   |  1 +
 ...etheus_push_gateway_reporter_configuration.html |  6 +++++
 .../prometheus/AbstractPrometheusReporter.java     | 28 ++++++++++++++++++++
 .../PrometheusPushGatewayReporterOptions.java      |  7 +++++
 .../metrics/prometheus/PrometheusReporterTest.java | 30 ++++++++++++++++++++++
 6 files changed, 73 insertions(+)

diff --git a/docs/content.zh/docs/deployment/metric_reporters.md 
b/docs/content.zh/docs/deployment/metric_reporters.md
index bd8b1b5577e..947b97742db 100644
--- a/docs/content.zh/docs/deployment/metric_reporters.md
+++ b/docs/content.zh/docs/deployment/metric_reporters.md
@@ -225,6 +225,7 @@ metrics.reporter.promgateway.randomJobNameSuffix: true
 metrics.reporter.promgateway.deleteOnShutdown: false
 metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
 metrics.reporter.promgateway.interval: 60 SECONDS
+metrics.reporter.promgateway.allowList: metricA,metricB
 ```
 
 PrometheusPushGatewayReporter 发送器将运行指标发送给 
[Pushgateway](https://github.com/prometheus/pushgateway),Prometheus 再从 
Pushgateway 拉取、解析运行指标。
diff --git a/docs/content/docs/deployment/metric_reporters.md 
b/docs/content/docs/deployment/metric_reporters.md
index 3bc5d0d336f..0c7fe48d942 100644
--- a/docs/content/docs/deployment/metric_reporters.md
+++ b/docs/content/docs/deployment/metric_reporters.md
@@ -213,6 +213,7 @@ metrics.reporter.promgateway.randomJobNameSuffix: true
 metrics.reporter.promgateway.deleteOnShutdown: false
 metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
 metrics.reporter.promgateway.interval: 60 SECONDS
+metrics.reporter.promgateway.allowList: metricA,metricB
 ```
 
 The PrometheusPushGatewayReporter pushes metrics to a 
[Pushgateway](https://github.com/prometheus/pushgateway), which can be scraped 
by Prometheus.
diff --git 
a/docs/layouts/shortcodes/generated/prometheus_push_gateway_reporter_configuration.html
 
b/docs/layouts/shortcodes/generated/prometheus_push_gateway_reporter_configuration.html
index 8d3fd370f37..8e1f406a011 100644
--- 
a/docs/layouts/shortcodes/generated/prometheus_push_gateway_reporter_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/prometheus_push_gateway_reporter_configuration.html
@@ -8,6 +8,12 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>metrics.reporter.prometheus.allowList</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The allow-list of metric name. The default is to report all 
metrics</td>
+        </tr>
         <tr>
             <td><h5>metrics.reporter.prometheus.deleteOnShutdown</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java
 
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java
index 7e2dab464dc..272a64eeb5a 100644
--- 
a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java
+++ 
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java
@@ -47,6 +47,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
 
+import static 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.ALLOW_LIST;
 import static 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.FILTER_LABEL_VALUE_CHARACTER;
 
 /** base prometheus reporter for prometheus metrics. */
@@ -61,10 +62,13 @@ public abstract class AbstractPrometheusReporter implements 
MetricReporter {
 
     @VisibleForTesting static final char SCOPE_SEPARATOR = '_';
     @VisibleForTesting static final String SCOPE_PREFIX = "flink" + 
SCOPE_SEPARATOR;
+    @VisibleForTesting static final char DEFAULT_SCOPE_SEPARATOR = '.';
 
     private final Map<String, AbstractMap.SimpleImmutableEntry<Collector, 
Integer>>
             collectorsWithCountByMetricName = new HashMap<>();
 
+    private final List<String> allowLists = new ArrayList<>();
+
     @VisibleForTesting
     static String replaceInvalidChars(final String input) {
         // https://prometheus.io/docs/instrumenting/writing_exporters/
@@ -87,6 +91,13 @@ public abstract class AbstractPrometheusReporter implements 
MetricReporter {
         if (!filterLabelValueCharacters) {
             labelValueCharactersFilter = input -> input;
         }
+
+        String allowList = config.getString(ALLOW_LIST.key(), 
ALLOW_LIST.defaultValue());
+        for (String ele : allowList.split(",")) {
+            if (!ele.trim().isEmpty()) {
+                allowLists.add(ele.trim());
+            }
+        }
     }
 
     @Override
@@ -97,6 +108,14 @@ public abstract class AbstractPrometheusReporter implements 
MetricReporter {
     @Override
     public void notifyOfAddedMetric(
             final Metric metric, final String metricName, final MetricGroup 
group) {
+        if (!allowLists.isEmpty()) {
+            String metricScope =
+                    
LogicalScopeProvider.castFrom(group).getLogicalScope(CHARACTER_FILTER);
+            String fullMetricName = metricScope + DEFAULT_SCOPE_SEPARATOR + 
metricName;
+            if (!isInAllowList(fullMetricName, allowLists)) {
+                return;
+            }
+        }
 
         List<String> dimensionKeys = new LinkedList<>();
         List<String> dimensionValues = new LinkedList<>();
@@ -390,4 +409,13 @@ public abstract class AbstractPrometheusReporter 
implements MetricReporter {
     private static String[] toArray(List<String> list) {
         return list.toArray(new String[list.size()]);
     }
+
+    public static boolean isInAllowList(String metricScopeName, List<String> 
patternList) {
+        for (String pattern : patternList) {
+            if (metricScopeName.contains(pattern)) {
+                return true;
+            }
+        }
+        return false;
+    }
 }
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java
 
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java
index bdb48e9c611..df75c7de923 100644
--- 
a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java
+++ 
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java
@@ -112,4 +112,11 @@ public class PrometheusPushGatewayReporterOptions {
                     .noDefaultValue()
                     .withDescription(
                             "(Optional) The password for HTTP Basic 
Authentication with the PushGateway.");
+
+    public static final ConfigOption<String> ALLOW_LIST =
+            ConfigOptions.key("allowList")
+                    .stringType()
+                    .defaultValue("")
+                    .withDescription(
+                            "The allow-list of metric name. The default is to 
report all metrics");
 }
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
index 8491dc0b834..8759ab0db07 100644
--- 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
+++ 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.util.TestHistogram;
@@ -43,6 +44,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.NoSuchElementException;
 
+import static 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.ALLOW_LIST;
+import static 
org.apache.flink.metrics.prometheus.PrometheusReporterFactory.ARG_PORT;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -243,6 +246,33 @@ class PrometheusReporterTest {
         new PrometheusReporter(portRange).close();
     }
 
+    @Test
+    public void testMetricFilter() throws Exception {
+        MetricConfig metricConfig = new MetricConfig();
+        metricConfig.setProperty(ALLOW_LIST.key(), LOGICAL_SCOPE + 
".metricName1");
+        metricConfig.setProperty(ARG_PORT, portRangeProvider.nextRange());
+
+        PrometheusReporterFactory prometheusReporterFactory = new 
PrometheusReporterFactory();
+        PrometheusReporter metricReporter =
+                prometheusReporterFactory.createMetricReporter(metricConfig);
+        metricReporter.open(metricConfig);
+
+        final Map<String, String> variables = new 
HashMap<>(metricGroup.getAllVariables());
+        final MetricGroup metricGroup2 = 
TestUtils.createTestMetricGroup(LOGICAL_SCOPE, variables);
+
+        Counter metric1 = new SimpleCounter();
+        metric1.inc(7);
+        Counter metric2 = new SimpleCounter();
+        metric2.inc(2);
+        metricReporter.notifyOfAddedMetric(metric1, "metricName1", 
metricGroup2);
+        metricReporter.notifyOfAddedMetric(metric2, "metricName2", 
metricGroup2);
+
+        String response = pollMetrics(metricReporter.getPort()).body();
+
+        assertThat(response).contains("metricName1");
+        assertThat(response).doesNotContain("metricName2");
+    }
+
     private String addMetricAndPollResponse(Metric metric, String metricName)
             throws IOException, InterruptedException {
         reporter.notifyOfAddedMetric(metric, metricName, metricGroup);

Reply via email to