This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 67e1975ac6 Flink: Fix writeDataFiles with hardcoded formatVersion 
(#14570)
67e1975ac6 is described below

commit 67e1975ac65e2690fc9599d0f5a91c2f0563e337
Author: GuoYu <[email protected]>
AuthorDate: Wed Nov 12 21:10:58 2025 +0800

    Flink: Fix writeDataFiles with hardcoded formatVersion (#14570)
---
 .../org/apache/iceberg/flink/sink/FlinkManifestUtil.java     | 12 ++++++++----
 .../org/apache/iceberg/flink/sink/TestFlinkManifest.java     |  3 ++-
 .../org/apache/iceberg/flink/sink/FlinkManifestUtil.java     | 12 ++++++++----
 .../org/apache/iceberg/flink/sink/TestFlinkManifest.java     |  3 ++-
 .../org/apache/iceberg/flink/sink/FlinkManifestUtil.java     | 12 ++++++++----
 .../org/apache/iceberg/flink/sink/TestFlinkManifest.java     |  3 ++-
 6 files changed, 30 insertions(+), 15 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index 1736d91b1b..c7e79ed8db 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -41,15 +41,15 @@ import org.slf4j.LoggerFactory;
 public class FlinkManifestUtil {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkManifestUtil.class);
-  private static final int FORMAT_V2 = 2;
   private static final Long DUMMY_SNAPSHOT_ID = 0L;
 
   private FlinkManifestUtil() {}
 
   static ManifestFile writeDataFiles(
-      OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles) 
throws IOException {
+      OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles, int 
formatVersion)
+      throws IOException {
     ManifestWriter<DataFile> writer =
-        ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID);
+        ManifestFiles.write(formatVersion, spec, outputFile, 
DUMMY_SNAPSHOT_ID);
 
     try (ManifestWriter<DataFile> closeableWriter = writer) {
       closeableWriter.addAll(dataFiles);
@@ -108,7 +108,11 @@ public class FlinkManifestUtil {
     // Write the completed data files into a newly created data manifest file.
     if (result.dataFiles() != null && result.dataFiles().length > 0) {
       dataManifest =
-          writeDataFiles(outputFileSupplier.get(), spec, 
Lists.newArrayList(result.dataFiles()));
+          writeDataFiles(
+              outputFileSupplier.get(),
+              spec,
+              Lists.newArrayList(result.dataFiles()),
+              formatVersion);
     }
 
     // Write the completed delete files into a newly created delete manifest 
file.
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index 4bbd523ec0..9ae435c263 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -213,7 +213,8 @@ public class TestFlinkManifest {
 
     List<DataFile> dataFiles = generateDataFiles(10);
     ManifestFile manifest =
-        FlinkManifestUtil.writeDataFiles(factory.create(checkpointId), 
table.spec(), dataFiles);
+        FlinkManifestUtil.writeDataFiles(
+            factory.create(checkpointId), table.spec(), dataFiles, 
TableUtil.formatVersion(table));
     byte[] dataV1 =
         SimpleVersionedSerialization.writeVersionAndSerialize(new 
V1Serializer(), manifest);
 
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index 1736d91b1b..c7e79ed8db 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -41,15 +41,15 @@ import org.slf4j.LoggerFactory;
 public class FlinkManifestUtil {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkManifestUtil.class);
-  private static final int FORMAT_V2 = 2;
   private static final Long DUMMY_SNAPSHOT_ID = 0L;
 
   private FlinkManifestUtil() {}
 
   static ManifestFile writeDataFiles(
-      OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles) 
throws IOException {
+      OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles, int 
formatVersion)
+      throws IOException {
     ManifestWriter<DataFile> writer =
-        ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID);
+        ManifestFiles.write(formatVersion, spec, outputFile, 
DUMMY_SNAPSHOT_ID);
 
     try (ManifestWriter<DataFile> closeableWriter = writer) {
       closeableWriter.addAll(dataFiles);
@@ -108,7 +108,11 @@ public class FlinkManifestUtil {
     // Write the completed data files into a newly created data manifest file.
     if (result.dataFiles() != null && result.dataFiles().length > 0) {
       dataManifest =
-          writeDataFiles(outputFileSupplier.get(), spec, 
Lists.newArrayList(result.dataFiles()));
+          writeDataFiles(
+              outputFileSupplier.get(),
+              spec,
+              Lists.newArrayList(result.dataFiles()),
+              formatVersion);
     }
 
     // Write the completed delete files into a newly created delete manifest 
file.
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index 4bbd523ec0..9ae435c263 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -213,7 +213,8 @@ public class TestFlinkManifest {
 
     List<DataFile> dataFiles = generateDataFiles(10);
     ManifestFile manifest =
-        FlinkManifestUtil.writeDataFiles(factory.create(checkpointId), 
table.spec(), dataFiles);
+        FlinkManifestUtil.writeDataFiles(
+            factory.create(checkpointId), table.spec(), dataFiles, 
TableUtil.formatVersion(table));
     byte[] dataV1 =
         SimpleVersionedSerialization.writeVersionAndSerialize(new 
V1Serializer(), manifest);
 
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index 1736d91b1b..c7e79ed8db 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -41,15 +41,15 @@ import org.slf4j.LoggerFactory;
 public class FlinkManifestUtil {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkManifestUtil.class);
-  private static final int FORMAT_V2 = 2;
   private static final Long DUMMY_SNAPSHOT_ID = 0L;
 
   private FlinkManifestUtil() {}
 
   static ManifestFile writeDataFiles(
-      OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles) 
throws IOException {
+      OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles, int 
formatVersion)
+      throws IOException {
     ManifestWriter<DataFile> writer =
-        ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID);
+        ManifestFiles.write(formatVersion, spec, outputFile, 
DUMMY_SNAPSHOT_ID);
 
     try (ManifestWriter<DataFile> closeableWriter = writer) {
       closeableWriter.addAll(dataFiles);
@@ -108,7 +108,11 @@ public class FlinkManifestUtil {
     // Write the completed data files into a newly created data manifest file.
     if (result.dataFiles() != null && result.dataFiles().length > 0) {
       dataManifest =
-          writeDataFiles(outputFileSupplier.get(), spec, 
Lists.newArrayList(result.dataFiles()));
+          writeDataFiles(
+              outputFileSupplier.get(),
+              spec,
+              Lists.newArrayList(result.dataFiles()),
+              formatVersion);
     }
 
     // Write the completed delete files into a newly created delete manifest 
file.
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index 4bbd523ec0..9ae435c263 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -213,7 +213,8 @@ public class TestFlinkManifest {
 
     List<DataFile> dataFiles = generateDataFiles(10);
     ManifestFile manifest =
-        FlinkManifestUtil.writeDataFiles(factory.create(checkpointId), 
table.spec(), dataFiles);
+        FlinkManifestUtil.writeDataFiles(
+            factory.create(checkpointId), table.spec(), dataFiles, 
TableUtil.formatVersion(table));
     byte[] dataV1 =
         SimpleVersionedSerialization.writeVersionAndSerialize(new 
V1Serializer(), manifest);
 

Reply via email to