This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b6cb13a52f1 [FLINK-39126][metrics] Support exporting metrics through
grpc in batches
b6cb13a52f1 is described below
commit b6cb13a52f17e33b9e47c532c24025b52eca4632
Author: Aleksandr Iushmanov <[email protected]>
AuthorDate: Thu Feb 26 18:33:08 2026 +0000
[FLINK-39126][metrics] Support exporting metrics through grpc in batches
Add support for batched metric export in the OpenTelemetry metric reporter.
When exporting a large number of metrics, sending them all in a single request
can cause timeouts or memory pressure on both the Flink reporter and the OTel
collector. This change introduces a configurable batch.size option that
partitions metrics into smaller batches before exporting, with async completion
tracking and per-batch failure handling.
---
.../content.zh/docs/deployment/metric_reporters.md | 30 +++
docs/content/docs/deployment/metric_reporters.md | 9 +
.../open_telemetry_reporter_configuration.html | 12 +
flink-metrics/flink-metrics-otel/pom.xml | 6 +
.../metrics/otel/OpenTelemetryMetricReporter.java | 184 +++++++++++---
.../metrics/otel/OpenTelemetryReporterOptions.java | 23 ++
.../otel/OpenTelemetryMetricReporterITCase.java | 55 ++++
.../otel/OpenTelemetryMetricReporterTest.java | 276 +++++++++++++++++++++
.../flink/metrics/otel/OpenTelemetryTestBase.java | 41 +++
9 files changed, 607 insertions(+), 29 deletions(-)
diff --git a/docs/content.zh/docs/deployment/metric_reporters.md
b/docs/content.zh/docs/deployment/metric_reporters.md
index 9c02675b872..bd8b1b5577e 100644
--- a/docs/content.zh/docs/deployment/metric_reporters.md
+++ b/docs/content.zh/docs/deployment/metric_reporters.md
@@ -305,6 +305,36 @@ metrics.reporter.dghttp.interval: 60 SECONDS
metrics.reporter.dghttp.useLogicalIdentifier: true
```
+### OpenTelemetry
+#### (org.apache.flink.metrics.otel.OpenTelemetryMetricReporterFactory)
+
+Parameters:
+
+{{< include_reporter_config
"layouts/shortcodes/generated/open_telemetry_reporter_configuration.html" >}}
+
+Example configurations:
+
+```yaml
+metrics.reporter.otel.factory.class:
org.apache.flink.metrics.otel.OpenTelemetryMetricReporterFactory
+metrics.reporter.otel.exporter.endpoint: http://127.0.0.1:1337
+metrics.reporter.otel.exporter.protocol: gRPC
+```
+
+```yaml
+metrics.reporter.otel.factory.class:
org.apache.flink.metrics.otel.OpenTelemetryMetricReporterFactory
+metrics.reporter.otel.exporter.endpoint: http://127.0.0.1:9090
+metrics.reporter.otel.exporter.protocol: HTTP
+```
+
+```yaml
+# With batching enabled (500 metrics per export request)
+metrics.reporter.otel.factory.class:
org.apache.flink.metrics.otel.OpenTelemetryMetricReporterFactory
+metrics.reporter.otel.exporter.endpoint: http://127.0.0.1:1337
+metrics.reporter.otel.exporter.protocol: gRPC
+metrics.reporter.otel.batch.size: 1500
+metrics.reporter.otel.export-completion-timeout-millis: 60000
+```
+
<a name="slf4j"></a>
### Slf4j
diff --git a/docs/content/docs/deployment/metric_reporters.md
b/docs/content/docs/deployment/metric_reporters.md
index d0b7246f274..3bc5d0d336f 100644
--- a/docs/content/docs/deployment/metric_reporters.md
+++ b/docs/content/docs/deployment/metric_reporters.md
@@ -310,6 +310,15 @@ metrics.reporter.otel.exporter.endpoint:
http://127.0.0.1:9090
metrics.reporter.otel.exporter.protocol: HTTP
```
+```yaml
+# With batching enabled (500 metrics per export request)
+metrics.reporter.otel.factory.class:
org.apache.flink.metrics.otel.OpenTelemetryMetricReporterFactory
+metrics.reporter.otel.exporter.endpoint: http://127.0.0.1:1337
+metrics.reporter.otel.exporter.protocol: gRPC
+metrics.reporter.otel.batch.size: 1500
+metrics.reporter.otel.export-completion-timeout-millis: 60000
+```
+
### Slf4j
#### (org.apache.flink.metrics.slf4j.Slf4jReporter)
diff --git
a/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html
b/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html
index 1c1193f19bd..df9032e4e79 100644
---
a/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html
+++
b/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html
@@ -8,6 +8,18 @@
</tr>
</thead>
<tbody>
+ <tr>
+ <td><h5>metrics.reporter.OpenTelemetry.batch.size</h5></td>
+ <td style="word-wrap: break-word;">0</td>
+ <td>Integer</td>
+ <td>Number of metrics per export batch. Values <= 0 disable
batching and all metrics are exported in a single request.</td>
+ </tr>
+ <tr>
+
<td><h5>metrics.reporter.OpenTelemetry.export-completion-timeout-millis</h5></td>
+ <td style="word-wrap: break-word;">300000</td>
+ <td>Long</td>
+ <td>Timeout in milliseconds for waiting on async export
completion.</td>
+ </tr>
<tr>
<td><h5>metrics.reporter.OpenTelemetry.exporter.compression</h5></td>
<td style="word-wrap: break-word;">"none"</td>
diff --git a/flink-metrics/flink-metrics-otel/pom.xml
b/flink-metrics/flink-metrics-otel/pom.xml
index 86ec9b3b63c..0f3e10e4895 100644
--- a/flink-metrics/flink-metrics-otel/pom.xml
+++ b/flink-metrics/flink-metrics-otel/pom.xml
@@ -51,6 +51,12 @@ under the License.
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-guava</artifactId>
+ <optional>${flink.markBundledAsOptional}</optional>
+ </dependency>
+
<!-- OpenTelemetry -->
<dependency>
diff --git
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
index e0e03db71ce..33215ab3678 100644
---
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
+++
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
@@ -32,6 +32,8 @@ import org.apache.flink.metrics.reporter.AbstractReporter;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.shaded.guava33.com.google.common.collect.Lists;
+
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
import
io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
@@ -44,6 +46,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
import java.time.Clock;
import java.time.Instant;
@@ -55,6 +58,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -72,16 +77,34 @@ public class OpenTelemetryMetricReporter extends
OpenTelemetryReporterBase
private static final String LOGICAL_SCOPE_PREFIX = "flink.";
+ @GuardedBy("this")
private final Map<Gauge<?>, MetricMetadata> gauges = new HashMap<>();
+
+ @GuardedBy("this")
private final Map<Counter, MetricMetadata> counters = new HashMap<>();
+
+ @GuardedBy("this")
private final Map<Histogram, MetricMetadata> histograms = new HashMap<>();
+
+ @GuardedBy("this")
private final Map<Meter, MetricMetadata> meters = new HashMap<>();
+
private final Clock clock;
// In order to produce deltas, we keep a snapshot of the previous counter
collection.
+ @GuardedBy("this")
private Map<Metric, Long> lastValueSnapshots = Collections.emptyMap();
+
+ @GuardedBy("this")
private long lastCollectTimeNanos = 0;
+ @GuardedBy("this")
+ private int batchSize =
OpenTelemetryReporterOptions.BATCH_SIZE.defaultValue();
+
+ @GuardedBy("this")
+ private long exportCompletionTimeoutMillis =
+
OpenTelemetryReporterOptions.EXPORT_COMPLETION_TIMEOUT_MILLIS.defaultValue();
+
public OpenTelemetryMetricReporter() {
this(Clock.systemUTC());
}
@@ -92,10 +115,18 @@ public class OpenTelemetryMetricReporter extends
OpenTelemetryReporterBase
}
@Override
- public void open(MetricConfig metricConfig) {
+ public void open(final MetricConfig metricConfig) {
LOG.info("Starting OpenTelemetryMetricReporter");
super.open(metricConfig);
+ synchronized (this) {
+ exporter = createExporter(metricConfig);
+ configureBatching(metricConfig);
+ }
+ }
+
+ /** Creates the {@link MetricExporter} based on the configured protocol. */
+ protected MetricExporter createExporter(final MetricConfig metricConfig) {
final String protocol =
Optional.ofNullable(
metricConfig.getProperty(
@@ -104,29 +135,27 @@ public class OpenTelemetryMetricReporter extends
OpenTelemetryReporterBase
switch (protocol.toLowerCase()) {
case "http":
- OtlpHttpMetricExporterBuilder httpBuilder =
OtlpHttpMetricExporter.builder();
+ final OtlpHttpMetricExporterBuilder httpBuilder =
OtlpHttpMetricExporter.builder();
tryConfigureEndpoint(metricConfig, httpBuilder::setEndpoint);
tryConfigureTimeout(metricConfig, httpBuilder::setTimeout);
tryConfigureCompression(metricConfig,
httpBuilder::setCompression);
- exporter = httpBuilder.build();
- break;
+ return httpBuilder.build();
default:
LOG.warn(
"Unknown protocol '{}' for
OpenTelemetryMetricReporter, defaulting to gRPC",
protocol);
// Fall through to the "gRPC" case
case "grpc":
- OtlpGrpcMetricExporterBuilder grpcBuilder =
OtlpGrpcMetricExporter.builder();
+ final OtlpGrpcMetricExporterBuilder grpcBuilder =
OtlpGrpcMetricExporter.builder();
tryConfigureEndpoint(metricConfig, grpcBuilder::setEndpoint);
tryConfigureTimeout(metricConfig, grpcBuilder::setTimeout);
tryConfigureCompression(metricConfig,
grpcBuilder::setCompression);
- exporter = grpcBuilder.build();
- break;
+ return grpcBuilder.build();
}
}
@Override
- public void close() {
+ public synchronized void close() {
if (exporter != null) {
exporter.flush();
waitForLastReportToComplete();
@@ -269,39 +298,136 @@ public class OpenTelemetryMetricReporter extends
OpenTelemetryReporterBase
return map;
}
- private @Nullable CompletableResultCode lastResult;
+ private volatile @Nullable CompletableFuture<Void> lastReportFuture;
@Override
public void report() {
- Collection<MetricData> metricData = collectAllMetrics();
+ final List<MetricData> metricData = List.copyOf(collectAllMetrics());
+ final int totalMetrics = metricData.size();
+ if (totalMetrics == 0) {
+ return;
+ }
+
+ // In order to avoid potentially large memory allocations on
`.partition()` call.
+ // doesn't require additional synchronized as it comes after
collectAllMetrics, which
+ // is synchronized
+ final int localBatchSize = Math.min(batchSize, totalMetrics);
+
+ final Iterable<List<MetricData>> batches = Lists.partition(metricData,
localBatchSize);
+ final int totalBatches = (totalMetrics + localBatchSize - 1) /
localBatchSize;
+ final List<CompletableResultCode> results = new
ArrayList<>(totalBatches);
+ for (final List<MetricData> batch : batches) {
+ results.add(exportBatch(batch));
+ }
+
+ lastReportFuture =
+ CompletableFuture.runAsync(
+ () ->
+ reportWhenExportIsComplete(
+ totalMetrics, results,
localBatchSize))
+ .whenComplete(
+ (ignored, throwable) -> {
+ if (throwable != null) {
+ LOG.warn("Error while reporting
metrics", throwable);
+ }
+ });
+ }
+
+ private CompletableResultCode exportBatch(final Collection<MetricData>
batch) {
try {
- lastResult = exporter.export(metricData);
- lastResult.whenComplete(
- () -> {
- if (lastResult.isSuccess()) {
- LOG.debug(
- "Exported {} metrics using {}",
- metricData.size(),
- exporter.getClass().getName());
- } else {
- LOG.warn(
- "Failed to export {} metrics using {}",
- metricData.size(),
- exporter.getClass().getName());
- }
- });
+ // exporter used without synchronize block, because `exportBatch`
happens
+ // after `collectAllMetrics` which adds memory barrier.
+ return exporter.export(batch);
} catch (Exception e) {
LOG.error(
"Failed to call export for {} metrics using {}",
- metricData.size(),
- exporter.getClass().getName());
+ batch.size(),
+ exporter.getClass().getName(),
+ e);
+ return CompletableResultCode.ofFailure();
+ }
+ }
+
+ private void reportWhenExportIsComplete(
+ final int totalMetrics,
+ final List<CompletableResultCode> results,
+ final int localBatchSize) {
+ final CountDownLatch completedResults = new
CountDownLatch(results.size());
+ results.forEach(result ->
result.whenComplete(completedResults::countDown));
+
+ try {
+ final boolean isCompleteReport =
+ completedResults.await(exportCompletionTimeoutMillis,
TimeUnit.MILLISECONDS);
+
+ final int successfulBatches =
+ (int)
results.stream().filter(CompletableResultCode::isSuccess).count();
+ final int failedBatches = results.size() - successfulBatches;
+
+ logFinalStatistics(
+ localBatchSize,
+ successfulBatches,
+ failedBatches,
+ totalMetrics,
+ isCompleteReport);
+ } catch (final InterruptedException e) {
+ LOG.warn(
+ "Thread waiting for metrics export results have been
interrupted. "
+ + "Exports results are unknown",
+ e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void logFinalStatistics(
+ int localBatchSize,
+ int successfulBatches,
+ int failedBatches,
+ int totalMetrics,
+ boolean isCompleteReport) {
+ if (failedBatches > 0 || !isCompleteReport) {
+ LOG.warn(
+ "Metric export completed with issues: totalMetrics={},
batchSize={}, "
+ + "successfulBatches={}, failedBatches={},
completedInTime={}",
+ totalMetrics,
+ localBatchSize,
+ successfulBatches,
+ failedBatches,
+ isCompleteReport);
+ } else {
+ LOG.debug(
+ "Metric export completed successfully: totalMetrics={},
batchSize={}, "
+ + "successfulBatches={}",
+ totalMetrics,
+ localBatchSize,
+ successfulBatches);
}
}
+ private void configureBatching(MetricConfig metricConfig) {
+ int configuredBatchSize =
+ metricConfig.getInteger(
+ OpenTelemetryReporterOptions.BATCH_SIZE.key(),
+
OpenTelemetryReporterOptions.BATCH_SIZE.defaultValue());
+ batchSize = configuredBatchSize <= 0 ? Integer.MAX_VALUE :
configuredBatchSize;
+ exportCompletionTimeoutMillis =
+ metricConfig.getLong(
+
OpenTelemetryReporterOptions.EXPORT_COMPLETION_TIMEOUT_MILLIS.key(),
+
OpenTelemetryReporterOptions.EXPORT_COMPLETION_TIMEOUT_MILLIS
+ .defaultValue());
+ LOG.info(
+ "Configured batching: batchSize={},
exportCompletionTimeoutMillis={}",
+ batchSize,
+ exportCompletionTimeoutMillis);
+ }
+
@VisibleForTesting
void waitForLastReportToComplete() {
- if (lastResult != null) {
- lastResult.join(1, TimeUnit.MINUTES);
+ if (lastReportFuture != null) {
+ try {
+ lastReportFuture.get(1, TimeUnit.MINUTES);
+ } catch (Exception e) {
+ LOG.warn("Waiting for last report to complete failed", e);
+ }
}
}
}
diff --git
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
index 94362ec8ec7..f810750b985 100644
---
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
+++
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
@@ -109,6 +109,29 @@ public final class OpenTelemetryReporterOptions {
.text("service.version passed to
OpenTelemetry Reporters.")
.build());
+ @PublicEvolving
+ public static final ConfigOption<Integer> BATCH_SIZE =
+ ConfigOptions.key("batch.size")
+ .intType()
+ .defaultValue(0)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Number of metrics per export
batch. "
+ + "Values <= 0 disable
batching and all metrics are exported in a single request.")
+ .build());
+
+ @PublicEvolving
+ public static final ConfigOption<Long> EXPORT_COMPLETION_TIMEOUT_MILLIS =
+ ConfigOptions.key("export-completion-timeout-millis")
+ .longType()
+ .defaultValue(300_000L)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Timeout in milliseconds for
waiting on async export completion.")
+ .build());
+
@Internal
public static void tryConfigureTimeout(MetricConfig metricConfig,
Consumer<Duration> builder) {
final String timeoutConfKey = EXPORTER_TIMEOUT.key();
diff --git
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java
index 96b935fec36..e7a96940f95 100644
---
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java
+++
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java
@@ -266,6 +266,61 @@ public class OpenTelemetryMetricReporterITCase extends
OpenTelemetryTestBase {
});
}
+ @Test
+ public void testReportWithBatching() throws Exception {
+ MetricConfig metricConfig = createMetricConfig();
+
metricConfig.setProperty(OpenTelemetryReporterOptions.BATCH_SIZE.key(),
String.valueOf(2));
+ MetricGroup group = new TestMetricGroup();
+
+ reporter.open(metricConfig);
+
+ SimpleCounter counter1 = new SimpleCounter();
+ SimpleCounter counter2 = new SimpleCounter();
+ SimpleCounter counter3 = new SimpleCounter();
+ reporter.notifyOfAddedMetric(counter1, "batch.counter1", group);
+ reporter.notifyOfAddedMetric(counter2, "batch.counter2", group);
+ reporter.notifyOfAddedMetric(counter3, "batch.counter3", group);
+
+ reporter.report();
+ reporter.waitForLastReportToComplete();
+ reporter.close();
+
+ // With batching, each batch produces a separate output line in the
collector.
+ // Verify that we get exactly 2 batches (sizes 2+1) containing all 3
metrics.
+ eventuallyConsumeAllJson(
+ jsonLines -> {
+ final List<List<String>> batches =
+ jsonLines.stream()
+
.map(OpenTelemetryTestBase::extractMetricNames)
+ .filter(names -> !names.isEmpty())
+ .filter(
+ names ->
+ names.stream()
+ .anyMatch(
+ n ->
+
n.contains(
+
"batch.counter")))
+ .collect(Collectors.toList());
+
+ assertThat(batches).hasSize(2);
+
assertThat(batches.stream().mapToInt(List::size).sum()).isEqualTo(3);
+ assertThat(
+ batches.stream()
+ .map(List::size)
+ .sorted()
+ .collect(Collectors.toList()))
+ .containsExactly(1, 2);
+
+ final List<String> allNames =
+
batches.stream().flatMap(List::stream).collect(Collectors.toList());
+ assertThat(allNames)
+ .containsExactlyInAnyOrder(
+ "flink.logical.scope.batch.counter1",
+ "flink.logical.scope.batch.counter2",
+ "flink.logical.scope.batch.counter3");
+ });
+ }
+
static class TestMetricGroup extends UnregisteredMetricsGroup implements
LogicalScopeProvider {
@Override
diff --git
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterTest.java
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterTest.java
new file mode 100644
index 00000000000..caaff990100
--- /dev/null
+++
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.otel;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.LogicalScopeProvider;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import io.opentelemetry.sdk.common.CompletableResultCode;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.export.MetricExporter;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link OpenTelemetryMetricReporter} batching logic. */
+class OpenTelemetryMetricReporterTest {
+
+ private TestingOpenTelemetryMetricReporter reporter;
+ private TestExporter testExporter;
+
+ @BeforeEach
+ void setUp() {
+ testExporter = new TestExporter();
+ reporter = new TestingOpenTelemetryMetricReporter(testExporter);
+ }
+
+ @AfterEach
+ void tearDown() {
+ reporter.close();
+ }
+
+ @Test
+ void testReportingWithBatchingEnabled() {
+ final MetricConfig config = new MetricConfig();
+ config.put(OpenTelemetryReporterOptions.BATCH_SIZE.key(), 2);
+ reporter.open(config);
+
+ final MetricGroup group = new TestMetricGroup();
+ reporter.notifyOfAddedMetric(new SimpleCounter(), "counter1", group);
+ reporter.notifyOfAddedMetric(new SimpleCounter(), "counter2", group);
+ reporter.notifyOfAddedMetric(new SimpleCounter(), "counter3", group);
+
+ reporter.report();
+ reporter.waitForLastReportToComplete();
+
+ // 3 metrics with batch size 2 -> 2 batches (2 + 1)
+ assertThat(testExporter.getExportedBatches()).hasSize(2);
+ assertThat(testExporter.getExportedBatches().get(0)).hasSize(2);
+ assertThat(testExporter.getExportedBatches().get(1)).hasSize(1);
+ }
+
+ @Test
+ void testReportingWithBatchingDisabled() {
+ final MetricConfig config = new MetricConfig();
+ // Default batch.size=0 disables batching
+ reporter.open(config);
+
+ final MetricGroup group = new TestMetricGroup();
+ reporter.notifyOfAddedMetric(new SimpleCounter(), "counter1", group);
+ reporter.notifyOfAddedMetric(new SimpleCounter(), "counter2", group);
+ reporter.notifyOfAddedMetric(new SimpleCounter(), "counter3", group);
+
+ reporter.report();
+ reporter.waitForLastReportToComplete();
+
+ // All metrics in a single export call
+ assertThat(testExporter.getExportedBatches()).hasSize(1);
+ assertThat(testExporter.getExportedBatches().get(0)).hasSize(3);
+ }
+
+ @Test
+ void testReportingWithNegativeBatchSize() {
+ final MetricConfig config = new MetricConfig();
+ config.put(OpenTelemetryReporterOptions.BATCH_SIZE.key(), -1);
+ reporter.open(config);
+
+ final MetricGroup group = new TestMetricGroup();
+ reporter.notifyOfAddedMetric(new SimpleCounter(), "counter1", group);
+ reporter.notifyOfAddedMetric(new SimpleCounter(), "counter2", group);
+
+ reporter.report();
+ reporter.waitForLastReportToComplete();
+
+ // Negative batch size disables batching -> single export
+ assertThat(testExporter.getExportedBatches()).hasSize(1);
+ assertThat(testExporter.getExportedBatches().get(0)).hasSize(2);
+ }
+
+ @Test
+ void testReportEmptyMetrics() {
+ final MetricConfig config = new MetricConfig();
+ reporter.open(config);
+
+ reporter.report();
+
+ // No metrics registered -> no export calls
+ assertThat(testExporter.getExportedBatches()).isEmpty();
+ }
+
+ @Test
+ void testReportingWithPartialBatchFailure() {
+ final PartialFailureExporter failureExporter = new
PartialFailureExporter(1);
+ final TestingOpenTelemetryMetricReporter failureReporter =
+ new TestingOpenTelemetryMetricReporter(failureExporter);
+
+ final MetricConfig config = new MetricConfig();
+ config.put(OpenTelemetryReporterOptions.BATCH_SIZE.key(), 2);
+ failureReporter.open(config);
+
+ final MetricGroup group = new TestMetricGroup();
+ failureReporter.notifyOfAddedMetric(new SimpleCounter(), "counter1",
group);
+ failureReporter.notifyOfAddedMetric(new SimpleCounter(), "counter2",
group);
+ failureReporter.notifyOfAddedMetric(new SimpleCounter(), "counter3",
group);
+
+ failureReporter.report();
+ failureReporter.waitForLastReportToComplete();
+ failureReporter.close();
+
+ // 2 batches attempted, first succeeds, second fails
+ assertThat(failureExporter.getExportedBatches()).hasSize(2);
+ assertThat(failureExporter.getExportedBatches().get(0)).hasSize(2);
+ assertThat(failureExporter.getExportedBatches().get(1)).hasSize(1);
+ assertThat(failureExporter.getSuccessCount()).isEqualTo(1);
+ assertThat(failureExporter.getFailureCount()).isEqualTo(1);
+ }
+
+ private static final class TestingOpenTelemetryMetricReporter
+ extends OpenTelemetryMetricReporter {
+
+ private final MetricExporter testExporter;
+
+ TestingOpenTelemetryMetricReporter(MetricExporter testExporter) {
+ super(Clock.fixed(Instant.ofEpochMilli(1234),
Clock.systemUTC().getZone()));
+ this.testExporter = testExporter;
+ }
+
+ @Override
+ protected MetricExporter createExporter(MetricConfig metricConfig) {
+ return testExporter;
+ }
+ }
+
+ /** A test {@link MetricExporter} that records all export calls. */
+ private static final class TestExporter implements MetricExporter {
+
+ private final List<List<MetricData>> exportedBatches =
+ Collections.synchronizedList(new ArrayList<>());
+
+ @Override
+ public CompletableResultCode export(Collection<MetricData> metrics) {
+ exportedBatches.add(new ArrayList<>(metrics));
+ return CompletableResultCode.ofSuccess();
+ }
+
+ @Override
+ public CompletableResultCode flush() {
+ return CompletableResultCode.ofSuccess();
+ }
+
+ @Override
+ public CompletableResultCode shutdown() {
+ return CompletableResultCode.ofSuccess();
+ }
+
+ @Override
+ public AggregationTemporality getAggregationTemporality(InstrumentType
instrumentType) {
+ return AggregationTemporality.DELTA;
+ }
+
+ List<List<MetricData>> getExportedBatches() {
+ return exportedBatches;
+ }
+ }
+
+ /** A test {@link MetricExporter} that fails after a specified number of
successful exports. */
+ private static final class PartialFailureExporter implements
MetricExporter {
+
+ private final List<List<MetricData>> exportedBatches =
+ Collections.synchronizedList(new ArrayList<>());
+ private final int failAfter;
+ private int successCount;
+ private int failureCount;
+
+ PartialFailureExporter(final int failAfter) {
+ this.failAfter = failAfter;
+ }
+
+ @Override
+ public CompletableResultCode export(final Collection<MetricData>
metrics) {
+ exportedBatches.add(new ArrayList<>(metrics));
+ if (exportedBatches.size() > failAfter) {
+ failureCount++;
+ return CompletableResultCode.ofFailure();
+ }
+ successCount++;
+ return CompletableResultCode.ofSuccess();
+ }
+
+ @Override
+ public CompletableResultCode flush() {
+ return CompletableResultCode.ofSuccess();
+ }
+
+ @Override
+ public CompletableResultCode shutdown() {
+ return CompletableResultCode.ofSuccess();
+ }
+
+ @Override
+ public AggregationTemporality getAggregationTemporality(
+ final InstrumentType instrumentType) {
+ return AggregationTemporality.DELTA;
+ }
+
+ List<List<MetricData>> getExportedBatches() {
+ return exportedBatches;
+ }
+
+ int getSuccessCount() {
+ return successCount;
+ }
+
+ int getFailureCount() {
+ return failureCount;
+ }
+ }
+
+ static class TestMetricGroup extends UnregisteredMetricsGroup implements
LogicalScopeProvider {
+
+ @Override
+ public String getLogicalScope(CharacterFilter characterFilter) {
+ return "test.scope";
+ }
+
+ @Override
+ public String getLogicalScope(CharacterFilter characterFilter, char c)
{
+ return "test.scope";
+ }
+
+ @Override
+ public MetricGroup getWrappedMetricGroup() {
+ return this;
+ }
+ }
+}
diff --git
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java
index 461a1683573..accd21a8acd 100644
---
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java
+++
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java
@@ -41,7 +41,11 @@ import org.testcontainers.containers.output.BaseConsumer;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.Slf4jLogConsumer;
+import javax.annotation.Nonnull;
+
import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.file.Path;
import java.time.Duration;
@@ -117,6 +121,43 @@ public class OpenTelemetryTestBase {
});
}
+ public static void eventuallyConsumeAllJson(
+ final ThrowingConsumer<List<JsonNode>, Exception> jsonConsumer)
throws Exception {
+ eventually(
+ () -> {
+ getOtelContainer()
+ .copyFileFromContainer(
+
getOtelContainer().getOutputLogPath().toString(),
+ inputStream -> {
+ final List<String> rawLines =
readRawLines(inputStream);
+
+ final ObjectMapper mapper = new
ObjectMapper();
+ final List<JsonNode> jsonLines = new
ArrayList<>();
+ for (final String rawLine : rawLines) {
+ jsonLines.add(
+ mapper.readValue(rawLine,
JsonNode.class));
+ }
+ try {
+ jsonConsumer.accept(jsonLines);
+ } catch (Throwable t) {
+ throw new
ConsumeDataLogException(t, rawLines);
+ }
+ return null;
+ });
+ });
+ }
+
+ private static @Nonnull List<String> readRawLines(final InputStream
inputStream)
+ throws IOException {
+ final List<String> rawLines = new ArrayList<>();
+ final BufferedReader input = new BufferedReader(new
InputStreamReader(inputStream));
+ String line;
+ while ((line = input.readLine()) != null) {
+ rawLines.add(line);
+ }
+ return rawLines;
+ }
+
public static void eventually(ThrowingRunnable<Exception> runnable) throws
Exception {
eventually(Deadline.fromNow(TIME_OUT), runnable);
}