This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 8f2ed20689 Flink: Backport: DynamicSink: Report writer records/bytes
send metrics (#14971)
8f2ed20689 is described below
commit 8f2ed2068967e9cadb7ae4e263cfef490c844b8e
Author: aiborodin <[email protected]>
AuthorDate: Tue Jan 6 20:06:56 2026 +1100
Flink: Backport: DynamicSink: Report writer records/bytes send metrics
(#14971)
Backports #14878
---
.../iceberg/flink/sink/dynamic/DynamicWriter.java | 1 +
.../flink/sink/dynamic/DynamicWriterMetrics.java | 31 +++++++++++++++++++---
.../flink/sink/dynamic/TestDynamicWriter.java | 2 +-
.../iceberg/flink/sink/dynamic/DynamicWriter.java | 1 +
.../flink/sink/dynamic/DynamicWriterMetrics.java | 31 +++++++++++++++++++---
.../flink/sink/dynamic/TestDynamicWriter.java | 2 +-
6 files changed, 60 insertions(+), 8 deletions(-)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index c2a3032858..9073857974 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -145,6 +145,7 @@ class DynamicWriter implements
CommittingSinkWriter<DynamicRecordInternal, Dynam
return taskWriterFactory.create();
})
.write(element.rowData());
+ metrics.mainMetricsGroup().getNumRecordsSendCounter().inc();
}
@Override
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriterMetrics.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriterMetrics.java
index 2e1f82df9d..d50a41512a 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriterMetrics.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriterMetrics.java
@@ -18,24 +18,37 @@
*/
package org.apache.iceberg.flink.sink.dynamic;
+import java.util.Arrays;
import java.util.Map;
-import org.apache.flink.metrics.MetricGroup;
+import java.util.function.ToLongFunction;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.flink.sink.IcebergStreamWriterMetrics;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ScanTaskUtil;
class DynamicWriterMetrics {
private final Map<String, IcebergStreamWriterMetrics> metrics;
- private final MetricGroup mainMetricsGroup;
+ private final SinkWriterMetricGroup mainMetricsGroup;
- DynamicWriterMetrics(MetricGroup mainMetricsGroup) {
+ DynamicWriterMetrics(SinkWriterMetricGroup mainMetricsGroup) {
this.mainMetricsGroup = mainMetricsGroup;
this.metrics = Maps.newHashMap();
}
+ SinkWriterMetricGroup mainMetricsGroup() {
+ return this.mainMetricsGroup;
+ }
+
public void updateFlushResult(String fullTableName, WriteResult result) {
writerMetrics(fullTableName).updateFlushResult(result);
+
+ long bytesOutTotal = sum(result.dataFiles()) + sum(result.deleteFiles());
+ this.mainMetricsGroup.getNumBytesSendCounter().inc(bytesOutTotal);
}
public void flushDuration(String fullTableName, long flushDurationMs) {
@@ -46,4 +59,16 @@ class DynamicWriterMetrics {
return metrics.computeIfAbsent(
fullTableName, tableName -> new
IcebergStreamWriterMetrics(mainMetricsGroup, tableName));
}
+
+ private static long sum(DataFile[] files) {
+ return sum(files, DataFile::fileSizeInBytes);
+ }
+
+ private static long sum(DeleteFile[] files) {
+ return sum(files, ScanTaskUtil::contentSizeInBytes);
+ }
+
+ private static <T extends ContentFile<T>> long sum(T[] files,
ToLongFunction<T> sizeExtractor) {
+ return Arrays.stream(files).mapToLong(sizeExtractor).sum();
+ }
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
index 689fd20483..d17848225f 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
@@ -243,7 +243,7 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase {
1024L,
properties,
100,
- new DynamicWriterMetrics(new UnregisteredMetricsGroup()),
+ new
DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()),
0,
0);
return dynamicWriter;
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index c2a3032858..9073857974 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -145,6 +145,7 @@ class DynamicWriter implements
CommittingSinkWriter<DynamicRecordInternal, Dynam
return taskWriterFactory.create();
})
.write(element.rowData());
+ metrics.mainMetricsGroup().getNumRecordsSendCounter().inc();
}
@Override
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriterMetrics.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriterMetrics.java
index 2e1f82df9d..d50a41512a 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriterMetrics.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriterMetrics.java
@@ -18,24 +18,37 @@
*/
package org.apache.iceberg.flink.sink.dynamic;
+import java.util.Arrays;
import java.util.Map;
-import org.apache.flink.metrics.MetricGroup;
+import java.util.function.ToLongFunction;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.flink.sink.IcebergStreamWriterMetrics;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ScanTaskUtil;
class DynamicWriterMetrics {
private final Map<String, IcebergStreamWriterMetrics> metrics;
- private final MetricGroup mainMetricsGroup;
+ private final SinkWriterMetricGroup mainMetricsGroup;
- DynamicWriterMetrics(MetricGroup mainMetricsGroup) {
+ DynamicWriterMetrics(SinkWriterMetricGroup mainMetricsGroup) {
this.mainMetricsGroup = mainMetricsGroup;
this.metrics = Maps.newHashMap();
}
+ SinkWriterMetricGroup mainMetricsGroup() {
+ return this.mainMetricsGroup;
+ }
+
public void updateFlushResult(String fullTableName, WriteResult result) {
writerMetrics(fullTableName).updateFlushResult(result);
+
+ long bytesOutTotal = sum(result.dataFiles()) + sum(result.deleteFiles());
+ this.mainMetricsGroup.getNumBytesSendCounter().inc(bytesOutTotal);
}
public void flushDuration(String fullTableName, long flushDurationMs) {
@@ -46,4 +59,16 @@ class DynamicWriterMetrics {
return metrics.computeIfAbsent(
fullTableName, tableName -> new
IcebergStreamWriterMetrics(mainMetricsGroup, tableName));
}
+
+ private static long sum(DataFile[] files) {
+ return sum(files, DataFile::fileSizeInBytes);
+ }
+
+ private static long sum(DeleteFile[] files) {
+ return sum(files, ScanTaskUtil::contentSizeInBytes);
+ }
+
+ private static <T extends ContentFile<T>> long sum(T[] files,
ToLongFunction<T> sizeExtractor) {
+ return Arrays.stream(files).mapToLong(sizeExtractor).sum();
+ }
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
index 689fd20483..d17848225f 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
@@ -243,7 +243,7 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase {
1024L,
properties,
100,
- new DynamicWriterMetrics(new UnregisteredMetricsGroup()),
+ new
DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()),
0,
0);
return dynamicWriter;