This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7ea90f4d2ae [FLINK-38372][Runtime / Metrics] Introduces whitelist to
the flink-metrics-prometheus
7ea90f4d2ae is described below
commit 7ea90f4d2aef3abc3ce6c09dd67513c52089ed3f
Author: Myracle <[email protected]>
AuthorDate: Wed Sep 17 17:30:53 2025 +0800
[FLINK-38372][Runtime / Metrics] Introduces whitelist to the
flink-metrics-prometheus
---
.../content.zh/docs/deployment/metric_reporters.md | 1 +
docs/content/docs/deployment/metric_reporters.md | 1 +
...etheus_push_gateway_reporter_configuration.html | 6 +++++
.../prometheus/AbstractPrometheusReporter.java | 28 ++++++++++++++++++++
.../PrometheusPushGatewayReporterOptions.java | 7 +++++
.../metrics/prometheus/PrometheusReporterTest.java | 30 ++++++++++++++++++++++
6 files changed, 73 insertions(+)
diff --git a/docs/content.zh/docs/deployment/metric_reporters.md
b/docs/content.zh/docs/deployment/metric_reporters.md
index bd8b1b5577e..947b97742db 100644
--- a/docs/content.zh/docs/deployment/metric_reporters.md
+++ b/docs/content.zh/docs/deployment/metric_reporters.md
@@ -225,6 +225,7 @@ metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
metrics.reporter.promgateway.interval: 60 SECONDS
+metrics.reporter.promgateway.allowList: metricA,metricB
```
PrometheusPushGatewayReporter 发送器将运行指标发送给
[Pushgateway](https://github.com/prometheus/pushgateway),Prometheus 再从
Pushgateway 拉取、解析运行指标。
diff --git a/docs/content/docs/deployment/metric_reporters.md
b/docs/content/docs/deployment/metric_reporters.md
index 3bc5d0d336f..0c7fe48d942 100644
--- a/docs/content/docs/deployment/metric_reporters.md
+++ b/docs/content/docs/deployment/metric_reporters.md
@@ -213,6 +213,7 @@ metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
metrics.reporter.promgateway.interval: 60 SECONDS
+metrics.reporter.promgateway.allowList: metricA,metricB
```
The PrometheusPushGatewayReporter pushes metrics to a
[Pushgateway](https://github.com/prometheus/pushgateway), which can be scraped
by Prometheus.
diff --git
a/docs/layouts/shortcodes/generated/prometheus_push_gateway_reporter_configuration.html
b/docs/layouts/shortcodes/generated/prometheus_push_gateway_reporter_configuration.html
index 8d3fd370f37..8e1f406a011 100644
---
a/docs/layouts/shortcodes/generated/prometheus_push_gateway_reporter_configuration.html
+++
b/docs/layouts/shortcodes/generated/prometheus_push_gateway_reporter_configuration.html
@@ -8,6 +8,12 @@
</tr>
</thead>
<tbody>
+ <tr>
+ <td><h5>metrics.reporter.prometheus.allowList</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The allow-list of metric name. The default is to report all
metrics</td>
+ </tr>
<tr>
<td><h5>metrics.reporter.prometheus.deleteOnShutdown</h5></td>
<td style="word-wrap: break-word;">true</td>
diff --git
a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java
index 7e2dab464dc..272a64eeb5a 100644
---
a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java
+++
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/AbstractPrometheusReporter.java
@@ -47,6 +47,7 @@ import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
+import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.ALLOW_LIST;
import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.FILTER_LABEL_VALUE_CHARACTER;
/** base prometheus reporter for prometheus metrics. */
@@ -61,10 +62,13 @@ public abstract class AbstractPrometheusReporter implements
MetricReporter {
@VisibleForTesting static final char SCOPE_SEPARATOR = '_';
@VisibleForTesting static final String SCOPE_PREFIX = "flink" +
SCOPE_SEPARATOR;
+ @VisibleForTesting static final char DEFAULT_SCOPE_SEPARATOR = '.';
private final Map<String, AbstractMap.SimpleImmutableEntry<Collector,
Integer>>
collectorsWithCountByMetricName = new HashMap<>();
+ private final List<String> allowLists = new ArrayList<>();
+
@VisibleForTesting
static String replaceInvalidChars(final String input) {
// https://prometheus.io/docs/instrumenting/writing_exporters/
@@ -87,6 +91,13 @@ public abstract class AbstractPrometheusReporter implements
MetricReporter {
if (!filterLabelValueCharacters) {
labelValueCharactersFilter = input -> input;
}
+
+ String allowList = config.getString(ALLOW_LIST.key(),
ALLOW_LIST.defaultValue());
+ for (String ele : allowList.split(",")) {
+ if (!ele.trim().isEmpty()) {
+ allowLists.add(ele.trim());
+ }
+ }
}
@Override
@@ -97,6 +108,14 @@ public abstract class AbstractPrometheusReporter implements
MetricReporter {
@Override
public void notifyOfAddedMetric(
final Metric metric, final String metricName, final MetricGroup
group) {
+ if (!allowLists.isEmpty()) {
+ String metricScope =
+
LogicalScopeProvider.castFrom(group).getLogicalScope(CHARACTER_FILTER);
+ String fullMetricName = metricScope + DEFAULT_SCOPE_SEPARATOR +
metricName;
+ if (!isInAllowList(fullMetricName, allowLists)) {
+ return;
+ }
+ }
List<String> dimensionKeys = new LinkedList<>();
List<String> dimensionValues = new LinkedList<>();
@@ -390,4 +409,13 @@ public abstract class AbstractPrometheusReporter
implements MetricReporter {
private static String[] toArray(List<String> list) {
return list.toArray(new String[list.size()]);
}
+
+ public static boolean isInAllowList(String metricScopeName, List<String>
patternList) {
+ for (String pattern : patternList) {
+ if (metricScopeName.contains(pattern)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git
a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java
index bdb48e9c611..df75c7de923 100644
---
a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java
+++
b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusPushGatewayReporterOptions.java
@@ -112,4 +112,11 @@ public class PrometheusPushGatewayReporterOptions {
.noDefaultValue()
.withDescription(
"(Optional) The password for HTTP Basic
Authentication with the PushGateway.");
+
+ public static final ConfigOption<String> ALLOW_LIST =
+ ConfigOptions.key("allowList")
+ .stringType()
+ .defaultValue("")
+ .withDescription(
+ "The allow-list of metric name. The default is to
report all metrics");
}
diff --git
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
index 8491dc0b834..8759ab0db07 100644
---
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
+++
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.util.TestHistogram;
@@ -43,6 +44,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
+import static
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.ALLOW_LIST;
+import static
org.apache.flink.metrics.prometheus.PrometheusReporterFactory.ARG_PORT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -243,6 +246,33 @@ class PrometheusReporterTest {
new PrometheusReporter(portRange).close();
}
+ @Test
+ public void testMetricFilter() throws Exception {
+ MetricConfig metricConfig = new MetricConfig();
+ metricConfig.setProperty(ALLOW_LIST.key(), LOGICAL_SCOPE +
".metricName1");
+ metricConfig.setProperty(ARG_PORT, portRangeProvider.nextRange());
+
+ PrometheusReporterFactory prometheusReporterFactory = new
PrometheusReporterFactory();
+ PrometheusReporter metricReporter =
+ prometheusReporterFactory.createMetricReporter(metricConfig);
+ metricReporter.open(metricConfig);
+
+ final Map<String, String> variables = new
HashMap<>(metricGroup.getAllVariables());
+ final MetricGroup metricGroup2 =
TestUtils.createTestMetricGroup(LOGICAL_SCOPE, variables);
+
+ Counter metric1 = new SimpleCounter();
+ metric1.inc(7);
+ Counter metric2 = new SimpleCounter();
+ metric2.inc(2);
+ metricReporter.notifyOfAddedMetric(metric1, "metricName1",
metricGroup2);
+ metricReporter.notifyOfAddedMetric(metric2, "metricName2",
metricGroup2);
+
+ String response = pollMetrics(metricReporter.getPort()).body();
+
+ assertThat(response).contains("metricName1");
+ assertThat(response).doesNotContain("metricName2");
+ }
+
private String addMetricAndPollResponse(Metric metric, String metricName)
throws IOException, InterruptedException {
reporter.notifyOfAddedMetric(metric, metricName, metricGroup);