This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b6cb13a52f1 [FLINK-39126][metrics] Support exporting metrics through 
grpc in batches
b6cb13a52f1 is described below

commit b6cb13a52f17e33b9e47c532c24025b52eca4632
Author: Aleksandr Iushmanov <[email protected]>
AuthorDate: Thu Feb 26 18:33:08 2026 +0000

    [FLINK-39126][metrics] Support exporting metrics through grpc in batches
    
    Add support for batched metric export in the OpenTelemetry metric reporter. 
When exporting a large number of metrics, sending them all in a single request 
can cause timeouts or memory pressure on both the Flink reporter and the OTel 
collector. This change introduces a configurable batch.size option that 
partitions metrics into smaller batches before exporting, with async completion 
tracking and per-batch failure handling.
---
 .../content.zh/docs/deployment/metric_reporters.md |  30 +++
 docs/content/docs/deployment/metric_reporters.md   |   9 +
 .../open_telemetry_reporter_configuration.html     |  12 +
 flink-metrics/flink-metrics-otel/pom.xml           |   6 +
 .../metrics/otel/OpenTelemetryMetricReporter.java  | 184 +++++++++++---
 .../metrics/otel/OpenTelemetryReporterOptions.java |  23 ++
 .../otel/OpenTelemetryMetricReporterITCase.java    |  55 ++++
 .../otel/OpenTelemetryMetricReporterTest.java      | 276 +++++++++++++++++++++
 .../flink/metrics/otel/OpenTelemetryTestBase.java  |  41 +++
 9 files changed, 607 insertions(+), 29 deletions(-)

diff --git a/docs/content.zh/docs/deployment/metric_reporters.md 
b/docs/content.zh/docs/deployment/metric_reporters.md
index 9c02675b872..bd8b1b5577e 100644
--- a/docs/content.zh/docs/deployment/metric_reporters.md
+++ b/docs/content.zh/docs/deployment/metric_reporters.md
@@ -305,6 +305,36 @@ metrics.reporter.dghttp.interval: 60 SECONDS
 metrics.reporter.dghttp.useLogicalIdentifier: true
 ```
 
+### OpenTelemetry
+#### (org.apache.flink.metrics.otel.OpenTelemetryMetricReporterFactory)
+
+Parameters:
+
+{{< include_reporter_config 
"layouts/shortcodes/generated/open_telemetry_reporter_configuration.html" >}}
+
+Example configurations:
+
+```yaml
+metrics.reporter.otel.factory.class: 
org.apache.flink.metrics.otel.OpenTelemetryMetricReporterFactory
+metrics.reporter.otel.exporter.endpoint: http://127.0.0.1:1337
+metrics.reporter.otel.exporter.protocol: gRPC
+```
+
+```yaml
+metrics.reporter.otel.factory.class: 
org.apache.flink.metrics.otel.OpenTelemetryMetricReporterFactory
+metrics.reporter.otel.exporter.endpoint: http://127.0.0.1:9090
+metrics.reporter.otel.exporter.protocol: HTTP
+```
+
+```yaml
+# With batching enabled (500 metrics per export request)
+metrics.reporter.otel.factory.class: 
org.apache.flink.metrics.otel.OpenTelemetryMetricReporterFactory
+metrics.reporter.otel.exporter.endpoint: http://127.0.0.1:1337
+metrics.reporter.otel.exporter.protocol: gRPC
+metrics.reporter.otel.batch.size: 1500
+metrics.reporter.otel.export-completion-timeout-millis: 60000
+```
+
 <a name="slf4j"></a>
 
 ### Slf4j
diff --git a/docs/content/docs/deployment/metric_reporters.md 
b/docs/content/docs/deployment/metric_reporters.md
index d0b7246f274..3bc5d0d336f 100644
--- a/docs/content/docs/deployment/metric_reporters.md
+++ b/docs/content/docs/deployment/metric_reporters.md
@@ -310,6 +310,15 @@ metrics.reporter.otel.exporter.endpoint: 
http://127.0.0.1:9090
 metrics.reporter.otel.exporter.protocol: HTTP
 ```
 
+```yaml
+# With batching enabled (500 metrics per export request)
+metrics.reporter.otel.factory.class: 
org.apache.flink.metrics.otel.OpenTelemetryMetricReporterFactory
+metrics.reporter.otel.exporter.endpoint: http://127.0.0.1:1337
+metrics.reporter.otel.exporter.protocol: gRPC
+metrics.reporter.otel.batch.size: 1500
+metrics.reporter.otel.export-completion-timeout-millis: 60000
+```
+
 ### Slf4j
 #### (org.apache.flink.metrics.slf4j.Slf4jReporter)
 
diff --git 
a/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html 
b/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html
index 1c1193f19bd..df9032e4e79 100644
--- 
a/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html
@@ -8,6 +8,18 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>metrics.reporter.OpenTelemetry.batch.size</h5></td>
+            <td style="word-wrap: break-word;">0</td>
+            <td>Integer</td>
+            <td>Number of metrics per export batch. Values &lt;= 0 disable 
batching and all metrics are exported in a single request.</td>
+        </tr>
+        <tr>
+            
<td><h5>metrics.reporter.OpenTelemetry.export-completion-timeout-millis</h5></td>
+            <td style="word-wrap: break-word;">300000</td>
+            <td>Long</td>
+            <td>Timeout in milliseconds for waiting on async export 
completion.</td>
+        </tr>
         <tr>
             
<td><h5>metrics.reporter.OpenTelemetry.exporter.compression</h5></td>
             <td style="word-wrap: break-word;">"none"</td>
diff --git a/flink-metrics/flink-metrics-otel/pom.xml 
b/flink-metrics/flink-metrics-otel/pom.xml
index 86ec9b3b63c..0f3e10e4895 100644
--- a/flink-metrics/flink-metrics-otel/pom.xml
+++ b/flink-metrics/flink-metrics-otel/pom.xml
@@ -51,6 +51,12 @@ under the License.
                        <scope>provided</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-shaded-guava</artifactId>
+                       <optional>${flink.markBundledAsOptional}</optional>
+               </dependency>
+
                <!-- OpenTelemetry -->
 
                <dependency>
diff --git 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
index e0e03db71ce..33215ab3678 100644
--- 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
+++ 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
@@ -32,6 +32,8 @@ import org.apache.flink.metrics.reporter.AbstractReporter;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
 
+import org.apache.flink.shaded.guava33.com.google.common.collect.Lists;
+
 import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
 import 
io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder;
 import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
@@ -44,6 +46,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 import java.time.Clock;
 import java.time.Instant;
@@ -55,6 +58,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -72,16 +77,34 @@ public class OpenTelemetryMetricReporter extends 
OpenTelemetryReporterBase
 
     private static final String LOGICAL_SCOPE_PREFIX = "flink.";
 
+    @GuardedBy("this")
     private final Map<Gauge<?>, MetricMetadata> gauges = new HashMap<>();
+
+    @GuardedBy("this")
     private final Map<Counter, MetricMetadata> counters = new HashMap<>();
+
+    @GuardedBy("this")
     private final Map<Histogram, MetricMetadata> histograms = new HashMap<>();
+
+    @GuardedBy("this")
     private final Map<Meter, MetricMetadata> meters = new HashMap<>();
+
     private final Clock clock;
 
     // In order to produce deltas, we keep a snapshot of the previous counter 
collection.
+    @GuardedBy("this")
     private Map<Metric, Long> lastValueSnapshots = Collections.emptyMap();
+
+    @GuardedBy("this")
     private long lastCollectTimeNanos = 0;
 
+    @GuardedBy("this")
+    private int batchSize = 
OpenTelemetryReporterOptions.BATCH_SIZE.defaultValue();
+
+    @GuardedBy("this")
+    private long exportCompletionTimeoutMillis =
+            
OpenTelemetryReporterOptions.EXPORT_COMPLETION_TIMEOUT_MILLIS.defaultValue();
+
     public OpenTelemetryMetricReporter() {
         this(Clock.systemUTC());
     }
@@ -92,10 +115,18 @@ public class OpenTelemetryMetricReporter extends 
OpenTelemetryReporterBase
     }
 
     @Override
-    public void open(MetricConfig metricConfig) {
+    public void open(final MetricConfig metricConfig) {
         LOG.info("Starting OpenTelemetryMetricReporter");
         super.open(metricConfig);
 
+        synchronized (this) {
+            exporter = createExporter(metricConfig);
+            configureBatching(metricConfig);
+        }
+    }
+
+    /** Creates the {@link MetricExporter} based on the configured protocol. */
+    protected MetricExporter createExporter(final MetricConfig metricConfig) {
         final String protocol =
                 Optional.ofNullable(
                                 metricConfig.getProperty(
@@ -104,29 +135,27 @@ public class OpenTelemetryMetricReporter extends 
OpenTelemetryReporterBase
 
         switch (protocol.toLowerCase()) {
             case "http":
-                OtlpHttpMetricExporterBuilder httpBuilder = 
OtlpHttpMetricExporter.builder();
+                final OtlpHttpMetricExporterBuilder httpBuilder = 
OtlpHttpMetricExporter.builder();
                 tryConfigureEndpoint(metricConfig, httpBuilder::setEndpoint);
                 tryConfigureTimeout(metricConfig, httpBuilder::setTimeout);
                 tryConfigureCompression(metricConfig, 
httpBuilder::setCompression);
-                exporter = httpBuilder.build();
-                break;
+                return httpBuilder.build();
             default:
                 LOG.warn(
                         "Unknown protocol '{}' for 
OpenTelemetryMetricReporter, defaulting to gRPC",
                         protocol);
             // Fall through to the "gRPC" case
             case "grpc":
-                OtlpGrpcMetricExporterBuilder grpcBuilder = 
OtlpGrpcMetricExporter.builder();
+                final OtlpGrpcMetricExporterBuilder grpcBuilder = 
OtlpGrpcMetricExporter.builder();
                 tryConfigureEndpoint(metricConfig, grpcBuilder::setEndpoint);
                 tryConfigureTimeout(metricConfig, grpcBuilder::setTimeout);
                 tryConfigureCompression(metricConfig, 
grpcBuilder::setCompression);
-                exporter = grpcBuilder.build();
-                break;
+                return grpcBuilder.build();
         }
     }
 
     @Override
-    public void close() {
+    public synchronized void close() {
         if (exporter != null) {
             exporter.flush();
             waitForLastReportToComplete();
@@ -269,39 +298,136 @@ public class OpenTelemetryMetricReporter extends 
OpenTelemetryReporterBase
         return map;
     }
 
-    private @Nullable CompletableResultCode lastResult;
+    private volatile @Nullable CompletableFuture<Void> lastReportFuture;
 
     @Override
     public void report() {
-        Collection<MetricData> metricData = collectAllMetrics();
+        final List<MetricData> metricData = List.copyOf(collectAllMetrics());
+        final int totalMetrics = metricData.size();
+        if (totalMetrics == 0) {
+            return;
+        }
+
+        // In order to avoid potentially large memory allocations on 
`.partition()` call.
+        // doesn't require additional synchronized as it comes after 
collectAllMetrics, which
+        // is synchronized
+        final int localBatchSize = Math.min(batchSize, totalMetrics);
+
+        final Iterable<List<MetricData>> batches = Lists.partition(metricData, 
localBatchSize);
+        final int totalBatches = (totalMetrics + localBatchSize - 1) / 
localBatchSize;
+        final List<CompletableResultCode> results = new 
ArrayList<>(totalBatches);
+        for (final List<MetricData> batch : batches) {
+            results.add(exportBatch(batch));
+        }
+
+        lastReportFuture =
+                CompletableFuture.runAsync(
+                                () ->
+                                        reportWhenExportIsComplete(
+                                                totalMetrics, results, 
localBatchSize))
+                        .whenComplete(
+                                (ignored, throwable) -> {
+                                    if (throwable != null) {
+                                        LOG.warn("Error while reporting 
metrics", throwable);
+                                    }
+                                });
+    }
+
+    private CompletableResultCode exportBatch(final Collection<MetricData> 
batch) {
         try {
-            lastResult = exporter.export(metricData);
-            lastResult.whenComplete(
-                    () -> {
-                        if (lastResult.isSuccess()) {
-                            LOG.debug(
-                                    "Exported {} metrics using {}",
-                                    metricData.size(),
-                                    exporter.getClass().getName());
-                        } else {
-                            LOG.warn(
-                                    "Failed to export {} metrics using {}",
-                                    metricData.size(),
-                                    exporter.getClass().getName());
-                        }
-                    });
+            // exporter used without synchronize block, because `exportBatch` 
happens
+            // after `collectAllMetrics` which adds memory barrier.
+            return exporter.export(batch);
         } catch (Exception e) {
             LOG.error(
                     "Failed to call export for {} metrics using {}",
-                    metricData.size(),
-                    exporter.getClass().getName());
+                    batch.size(),
+                    exporter.getClass().getName(),
+                    e);
+            return CompletableResultCode.ofFailure();
+        }
+    }
+
+    private void reportWhenExportIsComplete(
+            final int totalMetrics,
+            final List<CompletableResultCode> results,
+            final int localBatchSize) {
+        final CountDownLatch completedResults = new 
CountDownLatch(results.size());
+        results.forEach(result -> 
result.whenComplete(completedResults::countDown));
+
+        try {
+            final boolean isCompleteReport =
+                    completedResults.await(exportCompletionTimeoutMillis, 
TimeUnit.MILLISECONDS);
+
+            final int successfulBatches =
+                    (int) 
results.stream().filter(CompletableResultCode::isSuccess).count();
+            final int failedBatches = results.size() - successfulBatches;
+
+            logFinalStatistics(
+                    localBatchSize,
+                    successfulBatches,
+                    failedBatches,
+                    totalMetrics,
+                    isCompleteReport);
+        } catch (final InterruptedException e) {
+            LOG.warn(
+                    "Thread waiting for metrics export results have been 
interrupted. "
+                            + "Exports results are unknown",
+                    e);
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    private void logFinalStatistics(
+            int localBatchSize,
+            int successfulBatches,
+            int failedBatches,
+            int totalMetrics,
+            boolean isCompleteReport) {
+        if (failedBatches > 0 || !isCompleteReport) {
+            LOG.warn(
+                    "Metric export completed with issues: totalMetrics={}, 
batchSize={}, "
+                            + "successfulBatches={}, failedBatches={}, 
completedInTime={}",
+                    totalMetrics,
+                    localBatchSize,
+                    successfulBatches,
+                    failedBatches,
+                    isCompleteReport);
+        } else {
+            LOG.debug(
+                    "Metric export completed successfully: totalMetrics={}, 
batchSize={}, "
+                            + "successfulBatches={}",
+                    totalMetrics,
+                    localBatchSize,
+                    successfulBatches);
         }
     }
 
+    private void configureBatching(MetricConfig metricConfig) {
+        int configuredBatchSize =
+                metricConfig.getInteger(
+                        OpenTelemetryReporterOptions.BATCH_SIZE.key(),
+                        
OpenTelemetryReporterOptions.BATCH_SIZE.defaultValue());
+        batchSize = configuredBatchSize <= 0 ? Integer.MAX_VALUE : 
configuredBatchSize;
+        exportCompletionTimeoutMillis =
+                metricConfig.getLong(
+                        
OpenTelemetryReporterOptions.EXPORT_COMPLETION_TIMEOUT_MILLIS.key(),
+                        
OpenTelemetryReporterOptions.EXPORT_COMPLETION_TIMEOUT_MILLIS
+                                .defaultValue());
+        LOG.info(
+                "Configured batching: batchSize={}, 
exportCompletionTimeoutMillis={}",
+                batchSize,
+                exportCompletionTimeoutMillis);
+    }
+
     @VisibleForTesting
     void waitForLastReportToComplete() {
-        if (lastResult != null) {
-            lastResult.join(1, TimeUnit.MINUTES);
+        if (lastReportFuture != null) {
+            try {
+                lastReportFuture.get(1, TimeUnit.MINUTES);
+            } catch (Exception e) {
+                LOG.warn("Waiting for last report to complete failed", e);
+            }
         }
     }
 }
diff --git 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
index 94362ec8ec7..f810750b985 100644
--- 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
+++ 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
@@ -109,6 +109,29 @@ public final class OpenTelemetryReporterOptions {
                                     .text("service.version passed to 
OpenTelemetry Reporters.")
                                     .build());
 
+    @PublicEvolving
+    public static final ConfigOption<Integer> BATCH_SIZE =
+            ConfigOptions.key("batch.size")
+                    .intType()
+                    .defaultValue(0)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Number of metrics per export 
batch. "
+                                                    + "Values <= 0 disable 
batching and all metrics are exported in a single request.")
+                                    .build());
+
+    @PublicEvolving
+    public static final ConfigOption<Long> EXPORT_COMPLETION_TIMEOUT_MILLIS =
+            ConfigOptions.key("export-completion-timeout-millis")
+                    .longType()
+                    .defaultValue(300_000L)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Timeout in milliseconds for 
waiting on async export completion.")
+                                    .build());
+
     @Internal
     public static void tryConfigureTimeout(MetricConfig metricConfig, 
Consumer<Duration> builder) {
         final String timeoutConfKey = EXPORTER_TIMEOUT.key();
diff --git 
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java
 
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java
index 96b935fec36..e7a96940f95 100644
--- 
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java
+++ 
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java
@@ -266,6 +266,61 @@ public class OpenTelemetryMetricReporterITCase extends 
OpenTelemetryTestBase {
                 });
     }
 
+    @Test
+    public void testReportWithBatching() throws Exception {
+        MetricConfig metricConfig = createMetricConfig();
+        
metricConfig.setProperty(OpenTelemetryReporterOptions.BATCH_SIZE.key(), 
String.valueOf(2));
+        MetricGroup group = new TestMetricGroup();
+
+        reporter.open(metricConfig);
+
+        SimpleCounter counter1 = new SimpleCounter();
+        SimpleCounter counter2 = new SimpleCounter();
+        SimpleCounter counter3 = new SimpleCounter();
+        reporter.notifyOfAddedMetric(counter1, "batch.counter1", group);
+        reporter.notifyOfAddedMetric(counter2, "batch.counter2", group);
+        reporter.notifyOfAddedMetric(counter3, "batch.counter3", group);
+
+        reporter.report();
+        reporter.waitForLastReportToComplete();
+        reporter.close();
+
+        // With batching, each batch produces a separate output line in the 
collector.
+        // Verify that we get exactly 2 batches (sizes 2+1) containing all 3 
metrics.
+        eventuallyConsumeAllJson(
+                jsonLines -> {
+                    final List<List<String>> batches =
+                            jsonLines.stream()
+                                    
.map(OpenTelemetryTestBase::extractMetricNames)
+                                    .filter(names -> !names.isEmpty())
+                                    .filter(
+                                            names ->
+                                                    names.stream()
+                                                            .anyMatch(
+                                                                    n ->
+                                                                            
n.contains(
+                                                                               
     "batch.counter")))
+                                    .collect(Collectors.toList());
+
+                    assertThat(batches).hasSize(2);
+                    
assertThat(batches.stream().mapToInt(List::size).sum()).isEqualTo(3);
+                    assertThat(
+                                    batches.stream()
+                                            .map(List::size)
+                                            .sorted()
+                                            .collect(Collectors.toList()))
+                            .containsExactly(1, 2);
+
+                    final List<String> allNames =
+                            
batches.stream().flatMap(List::stream).collect(Collectors.toList());
+                    assertThat(allNames)
+                            .containsExactlyInAnyOrder(
+                                    "flink.logical.scope.batch.counter1",
+                                    "flink.logical.scope.batch.counter2",
+                                    "flink.logical.scope.batch.counter3");
+                });
+    }
+
     static class TestMetricGroup extends UnregisteredMetricsGroup implements 
LogicalScopeProvider {
 
         @Override
diff --git 
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterTest.java
 
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterTest.java
new file mode 100644
index 00000000000..caaff990100
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.otel;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.LogicalScopeProvider;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import io.opentelemetry.sdk.common.CompletableResultCode;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.export.MetricExporter;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link OpenTelemetryMetricReporter} batching logic. */
+class OpenTelemetryMetricReporterTest {
+
+    private TestingOpenTelemetryMetricReporter reporter;
+    private TestExporter testExporter;
+
+    @BeforeEach
+    void setUp() {
+        testExporter = new TestExporter();
+        reporter = new TestingOpenTelemetryMetricReporter(testExporter);
+    }
+
+    @AfterEach
+    void tearDown() {
+        reporter.close();
+    }
+
+    @Test
+    void testReportingWithBatchingEnabled() {
+        final MetricConfig config = new MetricConfig();
+        config.put(OpenTelemetryReporterOptions.BATCH_SIZE.key(), 2);
+        reporter.open(config);
+
+        final MetricGroup group = new TestMetricGroup();
+        reporter.notifyOfAddedMetric(new SimpleCounter(), "counter1", group);
+        reporter.notifyOfAddedMetric(new SimpleCounter(), "counter2", group);
+        reporter.notifyOfAddedMetric(new SimpleCounter(), "counter3", group);
+
+        reporter.report();
+        reporter.waitForLastReportToComplete();
+
+        // 3 metrics with batch size 2 -> 2 batches (2 + 1)
+        assertThat(testExporter.getExportedBatches()).hasSize(2);
+        assertThat(testExporter.getExportedBatches().get(0)).hasSize(2);
+        assertThat(testExporter.getExportedBatches().get(1)).hasSize(1);
+    }
+
+    @Test
+    void testReportingWithBatchingDisabled() {
+        final MetricConfig config = new MetricConfig();
+        // Default batch.size=0 disables batching
+        reporter.open(config);
+
+        final MetricGroup group = new TestMetricGroup();
+        reporter.notifyOfAddedMetric(new SimpleCounter(), "counter1", group);
+        reporter.notifyOfAddedMetric(new SimpleCounter(), "counter2", group);
+        reporter.notifyOfAddedMetric(new SimpleCounter(), "counter3", group);
+
+        reporter.report();
+        reporter.waitForLastReportToComplete();
+
+        // All metrics in a single export call
+        assertThat(testExporter.getExportedBatches()).hasSize(1);
+        assertThat(testExporter.getExportedBatches().get(0)).hasSize(3);
+    }
+
+    @Test
+    void testReportingWithNegativeBatchSize() {
+        final MetricConfig config = new MetricConfig();
+        config.put(OpenTelemetryReporterOptions.BATCH_SIZE.key(), -1);
+        reporter.open(config);
+
+        final MetricGroup group = new TestMetricGroup();
+        reporter.notifyOfAddedMetric(new SimpleCounter(), "counter1", group);
+        reporter.notifyOfAddedMetric(new SimpleCounter(), "counter2", group);
+
+        reporter.report();
+        reporter.waitForLastReportToComplete();
+
+        // Negative batch size disables batching -> single export
+        assertThat(testExporter.getExportedBatches()).hasSize(1);
+        assertThat(testExporter.getExportedBatches().get(0)).hasSize(2);
+    }
+
+    @Test
+    void testReportEmptyMetrics() {
+        final MetricConfig config = new MetricConfig();
+        reporter.open(config);
+
+        reporter.report();
+
+        // No metrics registered -> no export calls
+        assertThat(testExporter.getExportedBatches()).isEmpty();
+    }
+
+    @Test
+    void testReportingWithPartialBatchFailure() {
+        final PartialFailureExporter failureExporter = new 
PartialFailureExporter(1);
+        final TestingOpenTelemetryMetricReporter failureReporter =
+                new TestingOpenTelemetryMetricReporter(failureExporter);
+
+        final MetricConfig config = new MetricConfig();
+        config.put(OpenTelemetryReporterOptions.BATCH_SIZE.key(), 2);
+        failureReporter.open(config);
+
+        final MetricGroup group = new TestMetricGroup();
+        failureReporter.notifyOfAddedMetric(new SimpleCounter(), "counter1", 
group);
+        failureReporter.notifyOfAddedMetric(new SimpleCounter(), "counter2", 
group);
+        failureReporter.notifyOfAddedMetric(new SimpleCounter(), "counter3", 
group);
+
+        failureReporter.report();
+        failureReporter.waitForLastReportToComplete();
+        failureReporter.close();
+
+        // 2 batches attempted, first succeeds, second fails
+        assertThat(failureExporter.getExportedBatches()).hasSize(2);
+        assertThat(failureExporter.getExportedBatches().get(0)).hasSize(2);
+        assertThat(failureExporter.getExportedBatches().get(1)).hasSize(1);
+        assertThat(failureExporter.getSuccessCount()).isEqualTo(1);
+        assertThat(failureExporter.getFailureCount()).isEqualTo(1);
+    }
+
+    private static final class TestingOpenTelemetryMetricReporter
+            extends OpenTelemetryMetricReporter {
+
+        private final MetricExporter testExporter;
+
+        TestingOpenTelemetryMetricReporter(MetricExporter testExporter) {
+            super(Clock.fixed(Instant.ofEpochMilli(1234), 
Clock.systemUTC().getZone()));
+            this.testExporter = testExporter;
+        }
+
+        @Override
+        protected MetricExporter createExporter(MetricConfig metricConfig) {
+            return testExporter;
+        }
+    }
+
+    /** A test {@link MetricExporter} that records all export calls. */
+    private static final class TestExporter implements MetricExporter {
+
+        private final List<List<MetricData>> exportedBatches =
+                Collections.synchronizedList(new ArrayList<>());
+
+        @Override
+        public CompletableResultCode export(Collection<MetricData> metrics) {
+            exportedBatches.add(new ArrayList<>(metrics));
+            return CompletableResultCode.ofSuccess();
+        }
+
+        @Override
+        public CompletableResultCode flush() {
+            return CompletableResultCode.ofSuccess();
+        }
+
+        @Override
+        public CompletableResultCode shutdown() {
+            return CompletableResultCode.ofSuccess();
+        }
+
+        @Override
+        public AggregationTemporality getAggregationTemporality(InstrumentType 
instrumentType) {
+            return AggregationTemporality.DELTA;
+        }
+
+        List<List<MetricData>> getExportedBatches() {
+            return exportedBatches;
+        }
+    }
+
+    /** A test {@link MetricExporter} that fails after a specified number of 
successful exports. */
+    private static final class PartialFailureExporter implements 
MetricExporter {
+
+        private final List<List<MetricData>> exportedBatches =
+                Collections.synchronizedList(new ArrayList<>());
+        private final int failAfter;
+        private int successCount;
+        private int failureCount;
+
+        PartialFailureExporter(final int failAfter) {
+            this.failAfter = failAfter;
+        }
+
+        @Override
+        public CompletableResultCode export(final Collection<MetricData> 
metrics) {
+            exportedBatches.add(new ArrayList<>(metrics));
+            if (exportedBatches.size() > failAfter) {
+                failureCount++;
+                return CompletableResultCode.ofFailure();
+            }
+            successCount++;
+            return CompletableResultCode.ofSuccess();
+        }
+
+        @Override
+        public CompletableResultCode flush() {
+            return CompletableResultCode.ofSuccess();
+        }
+
+        @Override
+        public CompletableResultCode shutdown() {
+            return CompletableResultCode.ofSuccess();
+        }
+
+        @Override
+        public AggregationTemporality getAggregationTemporality(
+                final InstrumentType instrumentType) {
+            return AggregationTemporality.DELTA;
+        }
+
+        List<List<MetricData>> getExportedBatches() {
+            return exportedBatches;
+        }
+
+        int getSuccessCount() {
+            return successCount;
+        }
+
+        int getFailureCount() {
+            return failureCount;
+        }
+    }
+
+    static class TestMetricGroup extends UnregisteredMetricsGroup implements 
LogicalScopeProvider {
+
+        @Override
+        public String getLogicalScope(CharacterFilter characterFilter) {
+            return "test.scope";
+        }
+
+        @Override
+        public String getLogicalScope(CharacterFilter characterFilter, char c) 
{
+            return "test.scope";
+        }
+
+        @Override
+        public MetricGroup getWrappedMetricGroup() {
+            return this;
+        }
+    }
+}
diff --git 
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java
 
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java
index 461a1683573..accd21a8acd 100644
--- 
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java
+++ 
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java
@@ -41,7 +41,11 @@ import org.testcontainers.containers.output.BaseConsumer;
 import org.testcontainers.containers.output.OutputFrame;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 
+import javax.annotation.Nonnull;
+
 import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.nio.file.Path;
 import java.time.Duration;
@@ -117,6 +121,43 @@ public class OpenTelemetryTestBase {
                 });
     }
 
+    public static void eventuallyConsumeAllJson(
+            final ThrowingConsumer<List<JsonNode>, Exception> jsonConsumer) 
throws Exception {
+        eventually(
+                () -> {
+                    getOtelContainer()
+                            .copyFileFromContainer(
+                                    
getOtelContainer().getOutputLogPath().toString(),
+                                    inputStream -> {
+                                        final List<String> rawLines = 
readRawLines(inputStream);
+
+                                        final ObjectMapper mapper = new 
ObjectMapper();
+                                        final List<JsonNode> jsonLines = new 
ArrayList<>();
+                                        for (final String rawLine : rawLines) {
+                                            jsonLines.add(
+                                                    mapper.readValue(rawLine, 
JsonNode.class));
+                                        }
+                                        try {
+                                            jsonConsumer.accept(jsonLines);
+                                        } catch (Throwable t) {
+                                            throw new 
ConsumeDataLogException(t, rawLines);
+                                        }
+                                        return null;
+                                    });
+                });
+    }
+
+    private static @Nonnull List<String> readRawLines(final InputStream 
inputStream)
+            throws IOException {
+        final List<String> rawLines = new ArrayList<>();
+        final BufferedReader input = new BufferedReader(new 
InputStreamReader(inputStream));
+        String line;
+        while ((line = input.readLine()) != null) {
+            rawLines.add(line);
+        }
+        return rawLines;
+    }
+
     public static void eventually(ThrowingRunnable<Exception> runnable) throws 
Exception {
         eventually(Deadline.fromNow(TIME_OUT), runnable);
     }


Reply via email to