This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new e160bb89c9 Flink: Backport Support writing DVs in IcebergSink to Flink 
2.0 and 1.20 (#14390)
e160bb89c9 is described below

commit e160bb89c949b73685d908823b439b416bec272e
Author: GuoYu <[email protected]>
AuthorDate: Tue Oct 21 23:49:47 2025 +0800

    Flink: Backport Support writing DVs in IcebergSink to Flink 2.0 and 1.20 
(#14390)
    
    backports #14197
---
 .../iceberg/flink/sink/BaseDeltaTaskWriter.java    | 10 +++++----
 .../iceberg/flink/sink/FlinkManifestUtil.java      |  8 ++++++--
 .../iceberg/flink/sink/IcebergFilesCommitter.java  |  6 +++++-
 .../iceberg/flink/sink/IcebergWriteAggregator.java |  6 +++++-
 .../iceberg/flink/sink/PartitionedDeltaWriter.java |  9 +++++---
 .../flink/sink/RowDataTaskWriterFactory.java       |  9 ++++++--
 .../flink/sink/UnpartitionedDeltaWriter.java       |  9 +++++---
 .../sink/dynamic/DynamicWriteResultAggregator.java |  3 ++-
 .../iceberg/flink/sink/dynamic/DynamicWriter.java  |  6 ++++++
 .../org/apache/iceberg/flink/TestFlinkUpsert.java  | 24 ++++++++++++++--------
 .../TestCommittableToTableChangeConverter.java     |  9 ++++++--
 .../iceberg/flink/sink/TestDeltaTaskWriter.java    | 16 ++++++++++-----
 .../iceberg/flink/sink/TestFlinkManifest.java      | 10 ++++++---
 .../iceberg/flink/sink/BaseDeltaTaskWriter.java    | 10 +++++----
 .../iceberg/flink/sink/FlinkManifestUtil.java      |  8 ++++++--
 .../iceberg/flink/sink/IcebergFilesCommitter.java  |  6 +++++-
 .../iceberg/flink/sink/IcebergWriteAggregator.java |  6 +++++-
 .../iceberg/flink/sink/PartitionedDeltaWriter.java |  9 +++++---
 .../flink/sink/RowDataTaskWriterFactory.java       |  9 ++++++--
 .../flink/sink/UnpartitionedDeltaWriter.java       |  9 +++++---
 .../sink/dynamic/DynamicWriteResultAggregator.java |  3 ++-
 .../iceberg/flink/sink/dynamic/DynamicWriter.java  |  6 ++++++
 .../org/apache/iceberg/flink/TestFlinkUpsert.java  | 24 ++++++++++++++--------
 .../TestCommittableToTableChangeConverter.java     |  9 ++++++--
 .../iceberg/flink/sink/TestDeltaTaskWriter.java    | 16 ++++++++++-----
 .../iceberg/flink/sink/TestFlinkManifest.java      | 10 ++++++---
 26 files changed, 178 insertions(+), 72 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index f68eff6912..33a09705e7 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.io.BaseTaskWriter;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.FileWriterFactory;
 import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitioningDVWriter;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
 
@@ -57,8 +58,9 @@ abstract class BaseDeltaTaskWriter extends 
BaseTaskWriter<RowData> {
       Schema schema,
       RowType flinkSchema,
       Set<Integer> equalityFieldIds,
-      boolean upsert) {
-    super(spec, format, fileWriterFactory, fileFactory, io, targetFileSize);
+      boolean upsert,
+      boolean useDv) {
+    super(spec, format, fileWriterFactory, fileFactory, io, targetFileSize, 
useDv);
     this.schema = schema;
     this.deleteSchema = TypeUtil.select(schema, 
Sets.newHashSet(equalityFieldIds));
     this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
@@ -109,8 +111,8 @@ abstract class BaseDeltaTaskWriter extends 
BaseTaskWriter<RowData> {
   }
 
   protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
-    RowDataDeltaWriter(PartitionKey partition) {
-      super(partition, schema, deleteSchema, DeleteGranularity.FILE);
+    RowDataDeltaWriter(PartitionKey partition, PartitioningDVWriter<RowData> 
dvFileWriter) {
+      super(partition, schema, deleteSchema, DeleteGranularity.FILE, 
dvFileWriter);
     }
 
     @Override
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index 0eeedf2659..1736d91b1b 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -96,7 +96,10 @@ public class FlinkManifestUtil {
    *     partition spec
    */
   public static DeltaManifests writeCompletedFiles(
-      WriteResult result, Supplier<OutputFile> outputFileSupplier, 
PartitionSpec spec)
+      WriteResult result,
+      Supplier<OutputFile> outputFileSupplier,
+      PartitionSpec spec,
+      int formatVersion)
       throws IOException {
 
     ManifestFile dataManifest = null;
@@ -113,7 +116,8 @@ public class FlinkManifestUtil {
       OutputFile deleteManifestFile = outputFileSupplier.get();
 
       ManifestWriter<DeleteFile> deleteManifestWriter =
-          ManifestFiles.writeDeleteManifest(FORMAT_V2, spec, 
deleteManifestFile, DUMMY_SNAPSHOT_ID);
+          ManifestFiles.writeDeleteManifest(
+              formatVersion, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID);
       try (ManifestWriter<DeleteFile> writer = deleteManifestWriter) {
         for (DeleteFile deleteFile : result.deleteFiles()) {
           writer.add(deleteFile);
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index b510dce28b..f78a705e66 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -46,6 +46,7 @@ import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.io.WriteResult;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
@@ -443,7 +444,10 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     WriteResult result = WriteResult.builder().addAll(writeResults).build();
     DeltaManifests deltaManifests =
         FlinkManifestUtil.writeCompletedFiles(
-            result, () -> manifestOutputFileFactory.create(checkpointId), 
spec);
+            result,
+            () -> manifestOutputFileFactory.create(checkpointId),
+            spec,
+            TableUtil.formatVersion(table));
 
     return SimpleVersionedSerialization.writeVersionAndSerialize(
         DeltaManifestsSerializer.INSTANCE, deltaManifests);
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
index 794ade5779..7f21c1c372 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -110,7 +111,10 @@ class IcebergWriteAggregator extends 
AbstractStreamOperator<CommittableMessage<I
     WriteResult result = WriteResult.builder().addAll(writeResults).build();
     DeltaManifests deltaManifests =
         FlinkManifestUtil.writeCompletedFiles(
-            result, () -> 
icebergManifestOutputFileFactory.create(checkpointId), table.spec());
+            result,
+            () -> icebergManifestOutputFileFactory.create(checkpointId),
+            table.spec(),
+            TableUtil.formatVersion(table));
 
     return SimpleVersionedSerialization.writeVersionAndSerialize(
         DeltaManifestsSerializer.INSTANCE, deltaManifests);
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
index afbc14b7f1..5e597d8e71 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
@@ -50,7 +50,8 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
       Schema schema,
       RowType flinkSchema,
       Set<Integer> equalityFieldIds,
-      boolean upsert) {
+      boolean upsert,
+      boolean useDv) {
     super(
         spec,
         format,
@@ -61,7 +62,8 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
         schema,
         flinkSchema,
         equalityFieldIds,
-        upsert);
+        upsert,
+        useDv);
     this.partitionKey = new PartitionKey(spec, schema);
   }
 
@@ -74,7 +76,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
       // NOTICE: we need to copy a new partition key here, in case of messing 
up the keys in
       // writers.
       PartitionKey copiedKey = partitionKey.copy();
-      writer = new RowDataDeltaWriter(copiedKey);
+      writer = new RowDataDeltaWriter(copiedKey, dvFileWriter());
       writers.put(copiedKey, writer);
     }
 
@@ -84,6 +86,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
   @Override
   public void close() {
     try {
+      super.close();
       Tasks.foreach(writers.values())
           .throwFailureWhenFinished()
           .noRetry()
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index ef2c795e23..bc3bc51ced 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.flink.RowDataWrapper;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.FileWriterFactory;
@@ -52,6 +53,7 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
   private final Set<Integer> equalityFieldIds;
   private final boolean upsert;
   private final FileWriterFactory<RowData> fileWriterFactory;
+  private boolean useDv;
 
   private transient OutputFileFactory outputFileFactory;
 
@@ -170,6 +172,7 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
     }
 
     refreshTable();
+    this.useDv = TableUtil.formatVersion(table) > 2;
 
     this.outputFileFactory =
         OutputFileFactory.builderFor(table, taskId, attemptId)
@@ -221,7 +224,8 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
             schema,
             flinkSchema,
             equalityFieldIds,
-            upsert);
+            upsert,
+            useDv);
       } else {
         return new PartitionedDeltaWriter(
             spec,
@@ -233,7 +237,8 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
             schema,
             flinkSchema,
             equalityFieldIds,
-            upsert);
+            upsert,
+            useDv);
       }
     }
   }
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
index e709206c94..9d749d3062 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
@@ -42,7 +42,8 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
       Schema schema,
       RowType flinkSchema,
       Set<Integer> equalityFieldIds,
-      boolean upsert) {
+      boolean upsert,
+      boolean useDv) {
     super(
         spec,
         format,
@@ -53,8 +54,9 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
         schema,
         flinkSchema,
         equalityFieldIds,
-        upsert);
-    this.writer = new RowDataDeltaWriter(null);
+        upsert,
+        useDv);
+    this.writer = new RowDataDeltaWriter(null, dvFileWriter());
   }
 
   @Override
@@ -65,5 +67,6 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
   @Override
   public void close() throws IOException {
     writer.close();
+    super.close();
   }
 }
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
index b92d32fcc4..77bd2a0f97 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
@@ -141,7 +141,8 @@ class DynamicWriteResultAggregator
         FlinkManifestUtil.writeCompletedFiles(
             result,
             () -> outputFileFactory(key.tableName()).create(checkpointId),
-            spec(key.tableName(), key.specId()));
+            spec(key.tableName(), key.specId()),
+            2);
 
     return SimpleVersionedSerialization.writeVersionAndSerialize(
         DeltaManifestsSerializer.INSTANCE, deltaManifests);
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index e99e6e72da..5ed9da8623 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
@@ -116,6 +117,11 @@ class DynamicWriter implements 
CommittingSinkWriter<DynamicRecordInternal, Dynam
                           Preconditions.checkState(
                               !equalityFieldIds.isEmpty(),
                               "Equality field columns shouldn't be empty when 
configuring to use UPSERT data.");
+
+                          Preconditions.checkArgument(
+                              !(TableUtil.formatVersion(table) > 2),
+                              "Dynamic Sink writer does not support upsert 
mode in tables (V3+)");
+
                           if (!table.spec().isUnpartitioned()) {
                             for (PartitionField partitionField : 
table.spec().fields()) {
                               Preconditions.checkState(
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
index c5a7ec4bee..e6ca9c5c74 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -48,21 +48,27 @@ public class TestFlinkUpsert extends CatalogTestBase {
   @Parameter(index = 3)
   private boolean isStreamingJob;
 
+  @Parameter(index = 4)
+  private int formatVersion;
+
   private final Map<String, String> tableUpsertProps = Maps.newHashMap();
   private TableEnvironment tEnv;
 
-  @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, 
isStreaming={3}")
+  @Parameters(
+      name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}, 
formatVersion={4} ")
   public static List<Object[]> parameters() {
     List<Object[]> parameters = Lists.newArrayList();
     for (FileFormat format :
         new FileFormat[] {FileFormat.PARQUET, FileFormat.AVRO, 
FileFormat.ORC}) {
-      for (Boolean isStreaming : new Boolean[] {true, false}) {
-        // Only test with one catalog as this is a file operation concern.
-        // FlinkCatalogTestBase requires the catalog name start with 
testhadoop if using hadoop
-        // catalog.
-        String catalogName = "testhadoop";
-        Namespace baseNamespace = Namespace.of("default");
-        parameters.add(new Object[] {catalogName, baseNamespace, format, 
isStreaming});
+      for (int version : org.apache.iceberg.TestHelpers.V2_AND_ABOVE) {
+        for (Boolean isStreaming : new Boolean[] {true, false}) {
+          // Only test with one catalog as this is a file operation concern.
+          // FlinkCatalogTestBase requires the catalog name start with 
testhadoop if using hadoop
+          // catalog.
+          String catalogName = "testhadoop";
+          Namespace baseNamespace = Namespace.of("default");
+          parameters.add(new Object[] {catalogName, baseNamespace, format, 
isStreaming, version});
+        }
       }
     }
     return parameters;
@@ -98,7 +104,7 @@ public class TestFlinkUpsert extends CatalogTestBase {
     sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
     sql("USE CATALOG %s", catalogName);
     sql("USE %s", DATABASE);
-    tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");
+    tableUpsertProps.put(TableProperties.FORMAT_VERSION, 
String.valueOf(formatVersion));
     tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true");
     tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());
   }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
index 0c7a47c232..c39e09fd86 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.FileMetadata;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.maintenance.operator.TableChange;
 import org.apache.iceberg.io.FileIO;
@@ -116,7 +117,8 @@ class TestCommittableToTableChangeConverter {
               .addDeleteFiles(posDeleteFile, eqDeleteFile)
               .build();
       DeltaManifests deltaManifests =
-          FlinkManifestUtil.writeCompletedFiles(writeResult, () -> 
factory.create(1), table.spec());
+          FlinkManifestUtil.writeCompletedFiles(
+              writeResult, () -> factory.create(1), table.spec(), 
TableUtil.formatVersion(table));
       IcebergCommittable committable =
           new IcebergCommittable(
               SimpleVersionedSerialization.writeVersionAndSerialize(
@@ -297,7 +299,10 @@ class TestCommittableToTableChangeConverter {
             .build();
     DeltaManifests deltaManifests =
         FlinkManifestUtil.writeCompletedFiles(
-            writeResult, () -> factory.create(checkpointId), table.spec());
+            writeResult,
+            () -> factory.create(checkpointId),
+            table.spec(),
+            TableUtil.formatVersion(table));
 
     IcebergCommittable committable =
         new IcebergCommittable(
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index a21c51c378..89f642af1c 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TestBase;
+import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.TestTables;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
@@ -80,11 +81,16 @@ public class TestDeltaTaskWriter extends TestBase {
   private FileFormat format;
 
   @Parameters(name = "formatVersion = {0}, fileFormat = {1}")
-  protected static List<Object> parameters() {
-    return Arrays.asList(
-        new Object[] {2, FileFormat.AVRO},
-        new Object[] {2, FileFormat.ORC},
-        new Object[] {2, FileFormat.PARQUET});
+  protected static List<Object[]> parameters() {
+    List<Object[]> parameters = Lists.newArrayList();
+    for (FileFormat format :
+        new FileFormat[] {FileFormat.AVRO, FileFormat.ORC, 
FileFormat.PARQUET}) {
+      for (int version : TestHelpers.V2_AND_ABOVE) {
+        parameters.add(new Object[] {version, format});
+      }
+    }
+
+    return parameters;
   }
 
   @Override
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index c6dc984513..4bbd523ec0 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.io.WriteResult;
@@ -105,7 +106,8 @@ public class TestFlinkManifest {
                   .addDeleteFiles(posDeleteFiles)
                   .build(),
               () -> factory.create(curCkpId),
-              table.spec());
+              table.spec(),
+              TableUtil.formatVersion(table));
 
       WriteResult result =
           FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), 
table.specs());
@@ -141,7 +143,8 @@ public class TestFlinkManifest {
         FlinkManifestUtil.writeCompletedFiles(
             WriteResult.builder().addDataFiles(dataFiles).build(),
             () -> factory.create(checkpointId),
-            table.spec());
+            table.spec(),
+            TableUtil.formatVersion(table));
 
     assertThat(deltaManifests.dataManifest()).isNotNull();
     assertThat(deltaManifests.deleteManifest()).isNull();
@@ -180,7 +183,8 @@ public class TestFlinkManifest {
                 .addDeleteFiles(posDeleteFiles)
                 .build(),
             () -> factory.create(checkpointId),
-            table.spec());
+            table.spec(),
+            TableUtil.formatVersion(table));
 
     byte[] versionedSerializeData =
         SimpleVersionedSerialization.writeVersionAndSerialize(
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index f68eff6912..33a09705e7 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.io.BaseTaskWriter;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.FileWriterFactory;
 import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitioningDVWriter;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
 
@@ -57,8 +58,9 @@ abstract class BaseDeltaTaskWriter extends 
BaseTaskWriter<RowData> {
       Schema schema,
       RowType flinkSchema,
       Set<Integer> equalityFieldIds,
-      boolean upsert) {
-    super(spec, format, fileWriterFactory, fileFactory, io, targetFileSize);
+      boolean upsert,
+      boolean useDv) {
+    super(spec, format, fileWriterFactory, fileFactory, io, targetFileSize, 
useDv);
     this.schema = schema;
     this.deleteSchema = TypeUtil.select(schema, 
Sets.newHashSet(equalityFieldIds));
     this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
@@ -109,8 +111,8 @@ abstract class BaseDeltaTaskWriter extends 
BaseTaskWriter<RowData> {
   }
 
   protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
-    RowDataDeltaWriter(PartitionKey partition) {
-      super(partition, schema, deleteSchema, DeleteGranularity.FILE);
+    RowDataDeltaWriter(PartitionKey partition, PartitioningDVWriter<RowData> 
dvFileWriter) {
+      super(partition, schema, deleteSchema, DeleteGranularity.FILE, 
dvFileWriter);
     }
 
     @Override
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index 0eeedf2659..1736d91b1b 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -96,7 +96,10 @@ public class FlinkManifestUtil {
    *     partition spec
    */
   public static DeltaManifests writeCompletedFiles(
-      WriteResult result, Supplier<OutputFile> outputFileSupplier, 
PartitionSpec spec)
+      WriteResult result,
+      Supplier<OutputFile> outputFileSupplier,
+      PartitionSpec spec,
+      int formatVersion)
       throws IOException {
 
     ManifestFile dataManifest = null;
@@ -113,7 +116,8 @@ public class FlinkManifestUtil {
       OutputFile deleteManifestFile = outputFileSupplier.get();
 
       ManifestWriter<DeleteFile> deleteManifestWriter =
-          ManifestFiles.writeDeleteManifest(FORMAT_V2, spec, 
deleteManifestFile, DUMMY_SNAPSHOT_ID);
+          ManifestFiles.writeDeleteManifest(
+              formatVersion, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID);
       try (ManifestWriter<DeleteFile> writer = deleteManifestWriter) {
         for (DeleteFile deleteFile : result.deleteFiles()) {
           writer.add(deleteFile);
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 89432cff2b..b9ac0f9906 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -47,6 +47,7 @@ import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.io.WriteResult;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
@@ -446,7 +447,10 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     WriteResult result = WriteResult.builder().addAll(writeResults).build();
     DeltaManifests deltaManifests =
         FlinkManifestUtil.writeCompletedFiles(
-            result, () -> manifestOutputFileFactory.create(checkpointId), 
spec);
+            result,
+            () -> manifestOutputFileFactory.create(checkpointId),
+            spec,
+            TableUtil.formatVersion(table));
 
     return SimpleVersionedSerialization.writeVersionAndSerialize(
         DeltaManifestsSerializer.INSTANCE, deltaManifests);
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
index 794ade5779..7f21c1c372 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -110,7 +111,10 @@ class IcebergWriteAggregator extends 
AbstractStreamOperator<CommittableMessage<I
     WriteResult result = WriteResult.builder().addAll(writeResults).build();
     DeltaManifests deltaManifests =
         FlinkManifestUtil.writeCompletedFiles(
-            result, () -> 
icebergManifestOutputFileFactory.create(checkpointId), table.spec());
+            result,
+            () -> icebergManifestOutputFileFactory.create(checkpointId),
+            table.spec(),
+            TableUtil.formatVersion(table));
 
     return SimpleVersionedSerialization.writeVersionAndSerialize(
         DeltaManifestsSerializer.INSTANCE, deltaManifests);
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
index afbc14b7f1..5e597d8e71 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
@@ -50,7 +50,8 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
       Schema schema,
       RowType flinkSchema,
       Set<Integer> equalityFieldIds,
-      boolean upsert) {
+      boolean upsert,
+      boolean useDv) {
     super(
         spec,
         format,
@@ -61,7 +62,8 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
         schema,
         flinkSchema,
         equalityFieldIds,
-        upsert);
+        upsert,
+        useDv);
     this.partitionKey = new PartitionKey(spec, schema);
   }
 
@@ -74,7 +76,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
       // NOTICE: we need to copy a new partition key here, in case of messing 
up the keys in
       // writers.
       PartitionKey copiedKey = partitionKey.copy();
-      writer = new RowDataDeltaWriter(copiedKey);
+      writer = new RowDataDeltaWriter(copiedKey, dvFileWriter());
       writers.put(copiedKey, writer);
     }
 
@@ -84,6 +86,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
   @Override
   public void close() {
     try {
+      super.close();
       Tasks.foreach(writers.values())
           .throwFailureWhenFinished()
           .noRetry()
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index ef2c795e23..bc3bc51ced 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.flink.RowDataWrapper;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.FileWriterFactory;
@@ -52,6 +53,7 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
   private final Set<Integer> equalityFieldIds;
   private final boolean upsert;
   private final FileWriterFactory<RowData> fileWriterFactory;
+  private boolean useDv;
 
   private transient OutputFileFactory outputFileFactory;
 
@@ -170,6 +172,7 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
     }
 
     refreshTable();
+    this.useDv = TableUtil.formatVersion(table) > 2;
 
     this.outputFileFactory =
         OutputFileFactory.builderFor(table, taskId, attemptId)
@@ -221,7 +224,8 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
             schema,
             flinkSchema,
             equalityFieldIds,
-            upsert);
+            upsert,
+            useDv);
       } else {
         return new PartitionedDeltaWriter(
             spec,
@@ -233,7 +237,8 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
             schema,
             flinkSchema,
             equalityFieldIds,
-            upsert);
+            upsert,
+            useDv);
       }
     }
   }
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
index e709206c94..9d749d3062 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
@@ -42,7 +42,8 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
       Schema schema,
       RowType flinkSchema,
       Set<Integer> equalityFieldIds,
-      boolean upsert) {
+      boolean upsert,
+      boolean useDv) {
     super(
         spec,
         format,
@@ -53,8 +54,9 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
         schema,
         flinkSchema,
         equalityFieldIds,
-        upsert);
-    this.writer = new RowDataDeltaWriter(null);
+        upsert,
+        useDv);
+    this.writer = new RowDataDeltaWriter(null, dvFileWriter());
   }
 
   @Override
@@ -65,5 +67,6 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
   @Override
   public void close() throws IOException {
     writer.close();
+    super.close();
   }
 }
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
index b92d32fcc4..77bd2a0f97 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
@@ -141,7 +141,8 @@ class DynamicWriteResultAggregator
         FlinkManifestUtil.writeCompletedFiles(
             result,
             () -> outputFileFactory(key.tableName()).create(checkpointId),
-            spec(key.tableName(), key.specId()));
+            spec(key.tableName(), key.specId()),
+            2);
 
     return SimpleVersionedSerialization.writeVersionAndSerialize(
         DeltaManifestsSerializer.INSTANCE, deltaManifests);
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index e99e6e72da..5ed9da8623 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
@@ -116,6 +117,11 @@ class DynamicWriter implements 
CommittingSinkWriter<DynamicRecordInternal, Dynam
                           Preconditions.checkState(
                               !equalityFieldIds.isEmpty(),
                               "Equality field columns shouldn't be empty when 
configuring to use UPSERT data.");
+
+                          Preconditions.checkArgument(
+                              !(TableUtil.formatVersion(table) > 2),
+                              "Dynamic Sink writer does not support upsert 
mode in tables (V3+)");
+
                           if (!table.spec().isUnpartitioned()) {
                             for (PartitionField partitionField : 
table.spec().fields()) {
                               Preconditions.checkState(
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
index c5a7ec4bee..e6ca9c5c74 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -48,21 +48,27 @@ public class TestFlinkUpsert extends CatalogTestBase {
   @Parameter(index = 3)
   private boolean isStreamingJob;
 
+  @Parameter(index = 4)
+  private int formatVersion;
+
   private final Map<String, String> tableUpsertProps = Maps.newHashMap();
   private TableEnvironment tEnv;
 
-  @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, 
isStreaming={3}")
+  @Parameters(
+      name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}, 
formatVersion={4} ")
   public static List<Object[]> parameters() {
     List<Object[]> parameters = Lists.newArrayList();
     for (FileFormat format :
         new FileFormat[] {FileFormat.PARQUET, FileFormat.AVRO, 
FileFormat.ORC}) {
-      for (Boolean isStreaming : new Boolean[] {true, false}) {
-        // Only test with one catalog as this is a file operation concern.
-        // FlinkCatalogTestBase requires the catalog name start with 
testhadoop if using hadoop
-        // catalog.
-        String catalogName = "testhadoop";
-        Namespace baseNamespace = Namespace.of("default");
-        parameters.add(new Object[] {catalogName, baseNamespace, format, 
isStreaming});
+      for (int version : org.apache.iceberg.TestHelpers.V2_AND_ABOVE) {
+        for (Boolean isStreaming : new Boolean[] {true, false}) {
+          // Only test with one catalog as this is a file operation concern.
+          // FlinkCatalogTestBase requires the catalog name start with 
testhadoop if using hadoop
+          // catalog.
+          String catalogName = "testhadoop";
+          Namespace baseNamespace = Namespace.of("default");
+          parameters.add(new Object[] {catalogName, baseNamespace, format, 
isStreaming, version});
+        }
       }
     }
     return parameters;
@@ -98,7 +104,7 @@ public class TestFlinkUpsert extends CatalogTestBase {
     sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
     sql("USE CATALOG %s", catalogName);
     sql("USE %s", DATABASE);
-    tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");
+    tableUpsertProps.put(TableProperties.FORMAT_VERSION, 
String.valueOf(formatVersion));
     tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true");
     tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());
   }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
index 0c7a47c232..c39e09fd86 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.FileMetadata;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.maintenance.operator.TableChange;
 import org.apache.iceberg.io.FileIO;
@@ -116,7 +117,8 @@ class TestCommittableToTableChangeConverter {
               .addDeleteFiles(posDeleteFile, eqDeleteFile)
               .build();
       DeltaManifests deltaManifests =
-          FlinkManifestUtil.writeCompletedFiles(writeResult, () -> 
factory.create(1), table.spec());
+          FlinkManifestUtil.writeCompletedFiles(
+              writeResult, () -> factory.create(1), table.spec(), 
TableUtil.formatVersion(table));
       IcebergCommittable committable =
           new IcebergCommittable(
               SimpleVersionedSerialization.writeVersionAndSerialize(
@@ -297,7 +299,10 @@ class TestCommittableToTableChangeConverter {
             .build();
     DeltaManifests deltaManifests =
         FlinkManifestUtil.writeCompletedFiles(
-            writeResult, () -> factory.create(checkpointId), table.spec());
+            writeResult,
+            () -> factory.create(checkpointId),
+            table.spec(),
+            TableUtil.formatVersion(table));
 
     IcebergCommittable committable =
         new IcebergCommittable(
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index a21c51c378..89f642af1c 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TestBase;
+import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.TestTables;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
@@ -80,11 +81,16 @@ public class TestDeltaTaskWriter extends TestBase {
   private FileFormat format;
 
   @Parameters(name = "formatVersion = {0}, fileFormat = {1}")
-  protected static List<Object> parameters() {
-    return Arrays.asList(
-        new Object[] {2, FileFormat.AVRO},
-        new Object[] {2, FileFormat.ORC},
-        new Object[] {2, FileFormat.PARQUET});
+  protected static List<Object[]> parameters() {
+    List<Object[]> parameters = Lists.newArrayList();
+    for (FileFormat format :
+        new FileFormat[] {FileFormat.AVRO, FileFormat.ORC, 
FileFormat.PARQUET}) {
+      for (int version : TestHelpers.V2_AND_ABOVE) {
+        parameters.add(new Object[] {version, format});
+      }
+    }
+
+    return parameters;
   }
 
   @Override
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index c6dc984513..4bbd523ec0 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.io.WriteResult;
@@ -105,7 +106,8 @@ public class TestFlinkManifest {
                   .addDeleteFiles(posDeleteFiles)
                   .build(),
               () -> factory.create(curCkpId),
-              table.spec());
+              table.spec(),
+              TableUtil.formatVersion(table));
 
       WriteResult result =
           FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), 
table.specs());
@@ -141,7 +143,8 @@ public class TestFlinkManifest {
         FlinkManifestUtil.writeCompletedFiles(
             WriteResult.builder().addDataFiles(dataFiles).build(),
             () -> factory.create(checkpointId),
-            table.spec());
+            table.spec(),
+            TableUtil.formatVersion(table));
 
     assertThat(deltaManifests.dataManifest()).isNotNull();
     assertThat(deltaManifests.deleteManifest()).isNull();
@@ -180,7 +183,8 @@ public class TestFlinkManifest {
                 .addDeleteFiles(posDeleteFiles)
                 .build(),
             () -> factory.create(checkpointId),
-            table.spec());
+            table.spec(),
+            TableUtil.formatVersion(table));
 
     byte[] versionedSerializeData =
         SimpleVersionedSerialization.writeVersionAndSerialize(


Reply via email to