This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f2ed20689 Flink: Backport: DynamicSink: Report writer records/bytes 
send metrics (#14971)
8f2ed20689 is described below

commit 8f2ed2068967e9cadb7ae4e263cfef490c844b8e
Author: aiborodin <[email protected]>
AuthorDate: Tue Jan 6 20:06:56 2026 +1100

    Flink: Backport: DynamicSink: Report writer records/bytes send metrics 
(#14971)
    
    Backports #14878
---
 .../iceberg/flink/sink/dynamic/DynamicWriter.java  |  1 +
 .../flink/sink/dynamic/DynamicWriterMetrics.java   | 31 +++++++++++++++++++---
 .../flink/sink/dynamic/TestDynamicWriter.java      |  2 +-
 .../iceberg/flink/sink/dynamic/DynamicWriter.java  |  1 +
 .../flink/sink/dynamic/DynamicWriterMetrics.java   | 31 +++++++++++++++++++---
 .../flink/sink/dynamic/TestDynamicWriter.java      |  2 +-
 6 files changed, 60 insertions(+), 8 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index c2a3032858..9073857974 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -145,6 +145,7 @@ class DynamicWriter implements 
CommittingSinkWriter<DynamicRecordInternal, Dynam
               return taskWriterFactory.create();
             })
         .write(element.rowData());
+    metrics.mainMetricsGroup().getNumRecordsSendCounter().inc();
   }
 
   @Override
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriterMetrics.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriterMetrics.java
index 2e1f82df9d..d50a41512a 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriterMetrics.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriterMetrics.java
@@ -18,24 +18,37 @@
  */
 package org.apache.iceberg.flink.sink.dynamic;
 
+import java.util.Arrays;
 import java.util.Map;
-import org.apache.flink.metrics.MetricGroup;
+import java.util.function.ToLongFunction;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.flink.sink.IcebergStreamWriterMetrics;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ScanTaskUtil;
 
 class DynamicWriterMetrics {
 
   private final Map<String, IcebergStreamWriterMetrics> metrics;
-  private final MetricGroup mainMetricsGroup;
+  private final SinkWriterMetricGroup mainMetricsGroup;
 
-  DynamicWriterMetrics(MetricGroup mainMetricsGroup) {
+  DynamicWriterMetrics(SinkWriterMetricGroup mainMetricsGroup) {
     this.mainMetricsGroup = mainMetricsGroup;
     this.metrics = Maps.newHashMap();
   }
 
+  SinkWriterMetricGroup mainMetricsGroup() {
+    return this.mainMetricsGroup;
+  }
+
   public void updateFlushResult(String fullTableName, WriteResult result) {
     writerMetrics(fullTableName).updateFlushResult(result);
+
+    long bytesOutTotal = sum(result.dataFiles()) + sum(result.deleteFiles());
+    this.mainMetricsGroup.getNumBytesSendCounter().inc(bytesOutTotal);
   }
 
   public void flushDuration(String fullTableName, long flushDurationMs) {
@@ -46,4 +59,16 @@ class DynamicWriterMetrics {
     return metrics.computeIfAbsent(
         fullTableName, tableName -> new 
IcebergStreamWriterMetrics(mainMetricsGroup, tableName));
   }
+
+  private static long sum(DataFile[] files) {
+    return sum(files, DataFile::fileSizeInBytes);
+  }
+
+  private static long sum(DeleteFile[] files) {
+    return sum(files, ScanTaskUtil::contentSizeInBytes);
+  }
+
+  private static <T extends ContentFile<T>> long sum(T[] files, 
ToLongFunction<T> sizeExtractor) {
+    return Arrays.stream(files).mapToLong(sizeExtractor).sum();
+  }
 }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
index 689fd20483..d17848225f 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
@@ -243,7 +243,7 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase {
             1024L,
             properties,
             100,
-            new DynamicWriterMetrics(new UnregisteredMetricsGroup()),
+            new 
DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()),
             0,
             0);
     return dynamicWriter;
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index c2a3032858..9073857974 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -145,6 +145,7 @@ class DynamicWriter implements 
CommittingSinkWriter<DynamicRecordInternal, Dynam
               return taskWriterFactory.create();
             })
         .write(element.rowData());
+    metrics.mainMetricsGroup().getNumRecordsSendCounter().inc();
   }
 
   @Override
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriterMetrics.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriterMetrics.java
index 2e1f82df9d..d50a41512a 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriterMetrics.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriterMetrics.java
@@ -18,24 +18,37 @@
  */
 package org.apache.iceberg.flink.sink.dynamic;
 
+import java.util.Arrays;
 import java.util.Map;
-import org.apache.flink.metrics.MetricGroup;
+import java.util.function.ToLongFunction;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.flink.sink.IcebergStreamWriterMetrics;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ScanTaskUtil;
 
 class DynamicWriterMetrics {
 
   private final Map<String, IcebergStreamWriterMetrics> metrics;
-  private final MetricGroup mainMetricsGroup;
+  private final SinkWriterMetricGroup mainMetricsGroup;
 
-  DynamicWriterMetrics(MetricGroup mainMetricsGroup) {
+  DynamicWriterMetrics(SinkWriterMetricGroup mainMetricsGroup) {
     this.mainMetricsGroup = mainMetricsGroup;
     this.metrics = Maps.newHashMap();
   }
 
+  SinkWriterMetricGroup mainMetricsGroup() {
+    return this.mainMetricsGroup;
+  }
+
   public void updateFlushResult(String fullTableName, WriteResult result) {
     writerMetrics(fullTableName).updateFlushResult(result);
+
+    long bytesOutTotal = sum(result.dataFiles()) + sum(result.deleteFiles());
+    this.mainMetricsGroup.getNumBytesSendCounter().inc(bytesOutTotal);
   }
 
   public void flushDuration(String fullTableName, long flushDurationMs) {
@@ -46,4 +59,16 @@ class DynamicWriterMetrics {
     return metrics.computeIfAbsent(
         fullTableName, tableName -> new 
IcebergStreamWriterMetrics(mainMetricsGroup, tableName));
   }
+
+  private static long sum(DataFile[] files) {
+    return sum(files, DataFile::fileSizeInBytes);
+  }
+
+  private static long sum(DeleteFile[] files) {
+    return sum(files, ScanTaskUtil::contentSizeInBytes);
+  }
+
+  private static <T extends ContentFile<T>> long sum(T[] files, 
ToLongFunction<T> sizeExtractor) {
+    return Arrays.stream(files).mapToLong(sizeExtractor).sum();
+  }
 }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
index 689fd20483..d17848225f 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
@@ -243,7 +243,7 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase {
             1024L,
             properties,
             100,
-            new DynamicWriterMetrics(new UnregisteredMetricsGroup()),
+            new 
DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()),
             0,
             0);
     return dynamicWriter;

Reply via email to