This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new b6747f8cf6 Flink: Support writing DVs in IcebergSink (#14197)
b6747f8cf6 is described below

commit b6747f8cf6313fa4c53c5596bf75b675d721c8d2
Author: GuoYu <[email protected]>
AuthorDate: Tue Oct 21 22:25:45 2025 +0800

    Flink: Support writing DVs in IcebergSink (#14197)
---
 .../java/org/apache/iceberg/io/BaseTaskWriter.java | 106 +++++++++++++++---
 .../iceberg/io/TestTaskEqualityDeltaWriter.java    | 123 +++++++++++++++------
 .../iceberg/flink/sink/BaseDeltaTaskWriter.java    |  10 +-
 .../iceberg/flink/sink/FlinkManifestUtil.java      |   8 +-
 .../iceberg/flink/sink/IcebergFilesCommitter.java  |   6 +-
 .../iceberg/flink/sink/IcebergWriteAggregator.java |   6 +-
 .../iceberg/flink/sink/PartitionedDeltaWriter.java |   9 +-
 .../flink/sink/RowDataTaskWriterFactory.java       |   9 +-
 .../flink/sink/UnpartitionedDeltaWriter.java       |   9 +-
 .../sink/dynamic/DynamicWriteResultAggregator.java |   3 +-
 .../iceberg/flink/sink/dynamic/DynamicWriter.java  |   6 +
 .../org/apache/iceberg/flink/TestFlinkUpsert.java  |  24 ++--
 .../TestCommittableToTableChangeConverter.java     |   9 +-
 .../iceberg/flink/sink/TestDeltaTaskWriter.java    |  16 ++-
 .../iceberg/flink/sink/TestFlinkManifest.java      |  10 +-
 15 files changed, 272 insertions(+), 82 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java 
b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
index 860a155dee..b3d260c91e 100644
--- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
@@ -20,8 +20,10 @@ package org.apache.iceberg.io;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
@@ -61,6 +63,7 @@ public abstract class BaseTaskWriter<T> implements 
TaskWriter<T> {
   private final FileIO io;
   private final long targetFileSize;
   private Throwable failure;
+  private PartitioningDVWriter<T> dvFileWriter;
 
   protected BaseTaskWriter(
       PartitionSpec spec,
@@ -94,6 +97,26 @@ public abstract class BaseTaskWriter<T> implements 
TaskWriter<T> {
     this.targetFileSize = targetFileSize;
   }
 
+  protected BaseTaskWriter(
+      PartitionSpec spec,
+      FileFormat format,
+      FileWriterFactory<T> writerFactory,
+      OutputFileFactory fileFactory,
+      FileIO io,
+      long targetFileSize,
+      boolean useDv) {
+    this.spec = spec;
+    this.format = format;
+    this.appenderFactory = null;
+    this.writerFactory = writerFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+    if (useDv) {
+      this.dvFileWriter = new PartitioningDVWriter<>(fileFactory, p -> null);
+    }
+  }
+
   protected PartitionSpec spec() {
     return spec;
   }
@@ -104,6 +127,10 @@ public abstract class BaseTaskWriter<T> implements 
TaskWriter<T> {
     }
   }
 
+  protected PartitioningDVWriter<T> dvFileWriter() {
+    return dvFileWriter;
+  }
+
   @Override
   public void abort() throws IOException {
     close();
@@ -129,17 +156,38 @@ public abstract class BaseTaskWriter<T> implements 
TaskWriter<T> {
         .build();
   }
 
+  @Override
+  public void close() throws IOException {
+    try {
+      if (dvFileWriter != null) {
+        try {
+          // complete will call close
+          dvFileWriter.close();
+          DeleteWriteResult result = dvFileWriter.result();
+          completedDeleteFiles.addAll(result.deleteFiles());
+          referencedDataFiles.addAll(result.referencedDataFiles());
+        } finally {
+          dvFileWriter = null;
+        }
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to close dvFileWriter", e);
+    }
+  }
+
   /** Base equality delta writer to write both insert records and 
equality-deletes. */
   protected abstract class BaseEqualityDeltaWriter implements Closeable {
     private final StructProjection structProjection;
     private final PositionDelete<T> positionDelete;
+    private final StructLike partitionKey;
     private RollingFileWriter dataWriter;
     private RollingEqDeleteWriter eqDeleteWriter;
-    private FileWriter<PositionDelete<T>, DeleteWriteResult> posDeleteWriter;
+    private PartitioningWriter<PositionDelete<T>, DeleteWriteResult> 
posDeleteWriter;
     private Map<StructLike, PathOffset> insertedRowMap;
+    private boolean closePosDeleteWriter;
 
     protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, 
Schema deleteSchema) {
-      this(partition, schema, deleteSchema, DeleteGranularity.PARTITION);
+      this(partition, schema, deleteSchema, DeleteGranularity.PARTITION, null);
     }
 
     protected BaseEqualityDeltaWriter(
@@ -147,6 +195,15 @@ public abstract class BaseTaskWriter<T> implements 
TaskWriter<T> {
         Schema schema,
         Schema deleteSchema,
         DeleteGranularity deleteGranularity) {
+      this(partition, schema, deleteSchema, deleteGranularity, null);
+    }
+
+    protected BaseEqualityDeltaWriter(
+        StructLike partition,
+        Schema schema,
+        Schema deleteSchema,
+        DeleteGranularity deleteGranularity,
+        PartitioningDVWriter<T> posDeleteWriter) {
       Preconditions.checkNotNull(schema, "Iceberg table schema cannot be 
null.");
       Preconditions.checkNotNull(deleteSchema, "Equality-delete schema cannot 
be null.");
       this.structProjection = StructProjection.create(schema, deleteSchema);
@@ -155,18 +212,11 @@ public abstract class BaseTaskWriter<T> implements 
TaskWriter<T> {
       this.dataWriter = new RollingFileWriter(partition);
       this.eqDeleteWriter = new RollingEqDeleteWriter(partition);
       this.posDeleteWriter =
-          new SortingPositionOnlyDeleteWriter<>(
-              () -> {
-                if (writerFactory != null) {
-                  return writerFactory.newPositionDeleteWriter(
-                      newOutputFile(partition), spec, partition);
-                } else {
-                  return appenderFactory.newPosDeleteWriter(
-                      newOutputFile(partition), format, partition);
-                }
-              },
-              deleteGranularity);
+          posDeleteWriter != null
+              ? posDeleteWriter
+              : createPosDeleteWriter(partition, deleteGranularity);
       this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct());
+      this.partitionKey = partition;
     }
 
     /** Wrap the data as a {@link StructLike}. */
@@ -191,6 +241,17 @@ public abstract class BaseTaskWriter<T> implements 
TaskWriter<T> {
       dataWriter.write(row);
     }
 
+    private PartitioningWriter<PositionDelete<T>, DeleteWriteResult> 
createPosDeleteWriter(
+        StructLike partition, DeleteGranularity deleteGranularity) {
+      this.closePosDeleteWriter = true;
+      return new WrappedPositionDeleteWriter<>(
+          () ->
+              writerFactory != null
+                  ? 
writerFactory.newPositionDeleteWriter(newOutputFile(partition), spec, partition)
+                  : 
appenderFactory.newPosDeleteWriter(newOutputFile(partition), format, partition),
+          deleteGranularity);
+    }
+
     private EncryptedOutputFile newOutputFile(StructLike partition) {
       if (spec.isUnpartitioned() || partition == null) {
         return fileFactory.newOutputFile();
@@ -201,7 +262,7 @@ public abstract class BaseTaskWriter<T> implements 
TaskWriter<T> {
 
     private void writePosDelete(PathOffset pathOffset) {
       positionDelete.set(pathOffset.path, pathOffset.rowOffset, null);
-      posDeleteWriter.write(positionDelete);
+      posDeleteWriter.write(positionDelete, spec, partitionKey);
     }
 
     /**
@@ -272,7 +333,7 @@ public abstract class BaseTaskWriter<T> implements 
TaskWriter<T> {
         }
 
         // Add the completed pos-delete files.
-        if (posDeleteWriter != null) {
+        if (closePosDeleteWriter && posDeleteWriter != null) {
           try {
             // complete will call close
             posDeleteWriter.close();
@@ -466,4 +527,19 @@ public abstract class BaseTaskWriter<T> implements 
TaskWriter<T> {
       completedDeleteFiles.add(closedWriter.toDeleteFile());
     }
   }
+
+  private static class WrappedPositionDeleteWriter<T> extends 
SortingPositionOnlyDeleteWriter<T>
+      implements PartitioningWriter<PositionDelete<T>, DeleteWriteResult> {
+
+    WrappedPositionDeleteWriter(
+        Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers,
+        DeleteGranularity granularity) {
+      super(writers, granularity);
+    }
+
+    @Override
+    public void write(PositionDelete<T> positionDelete, PartitionSpec spec, 
StructLike partition) {
+      super.write(positionDelete);
+    }
+  }
 }
diff --git 
a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java 
b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
index 3388a8636a..271e3fbe82 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
@@ -39,8 +39,11 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.TestBase;
+import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.BaseDeleteLoader;
 import org.apache.iceberg.data.GenericFileWriterFactory;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.IcebergGenerics;
@@ -49,6 +52,7 @@ import org.apache.iceberg.data.avro.PlannedDataReader;
 import org.apache.iceberg.data.orc.GenericOrcReader;
 import org.apache.iceberg.data.parquet.GenericParquetReaders;
 import org.apache.iceberg.deletes.DeleteGranularity;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -74,12 +78,17 @@ public class TestTaskEqualityDeltaWriter extends TestBase {
   @Parameter(index = 1)
   protected FileFormat format;
 
-  @Parameters(name = "formatVersion = {0}, FileFormat = {0}")
-  protected static List<Object> parameters() {
-    return Arrays.asList(
-        new Object[] {FORMAT_V2, FileFormat.AVRO},
-        new Object[] {FORMAT_V2, FileFormat.ORC},
-        new Object[] {FORMAT_V2, FileFormat.PARQUET});
+  @Parameters(name = "formatVersion = {0}, FileFormat = {1}")
+  protected static List<Object[]> parameters() {
+    List<Object[]> parameters = Lists.newArrayList();
+    for (FileFormat format :
+        new FileFormat[] {FileFormat.AVRO, FileFormat.ORC, 
FileFormat.PARQUET}) {
+      for (int version : TestHelpers.V2_AND_ABOVE) {
+        parameters.add(new Object[] {version, format});
+      }
+    }
+
+    return parameters;
   }
 
   @Override
@@ -190,15 +199,25 @@ public class TestTaskEqualityDeltaWriter extends TestBase 
{
                 createRecord(2, "ggg"),
                 createRecord(1, "hhh")));
 
-    // Check records in the pos-delete file.
-    Schema posDeleteSchema = DeleteSchemaUtil.pathPosSchema();
-    assertThat(readRecordsAsList(posDeleteSchema, posDeleteFile.location()))
-        .isEqualTo(
-            ImmutableList.of(
-                posRecord.copy("file_path", dataFile.location(), "pos", 0L),
-                posRecord.copy("file_path", dataFile.location(), "pos", 1L),
-                posRecord.copy("file_path", dataFile.location(), "pos", 2L),
-                posRecord.copy("file_path", dataFile.location(), "pos", 3L)));
+    if (formatVersion == FORMAT_V2) {
+      // Check records in the pos-delete file.
+      Schema posDeleteSchema = DeleteSchemaUtil.pathPosSchema();
+      assertThat(readRecordsAsList(posDeleteSchema, posDeleteFile.location()))
+          .isEqualTo(
+              ImmutableList.of(
+                  posRecord.copy("file_path", dataFile.location(), "pos", 0L),
+                  posRecord.copy("file_path", dataFile.location(), "pos", 1L),
+                  posRecord.copy("file_path", dataFile.location(), "pos", 2L),
+                  posRecord.copy("file_path", dataFile.location(), "pos", 
3L)));
+    } else {
+      assertThat(posDeleteFile.format()).isEqualTo(FileFormat.PUFFIN);
+      PositionDeleteIndex positionDeleteIndex = readDVFile(table, 
posDeleteFile);
+      assertThat(positionDeleteIndex.cardinality()).isEqualTo(4);
+      assertThat(positionDeleteIndex.isDeleted(0L)).isTrue();
+      assertThat(positionDeleteIndex.isDeleted(1L)).isTrue();
+      assertThat(positionDeleteIndex.isDeleted(2L)).isTrue();
+      assertThat(positionDeleteIndex.isDeleted(3L)).isTrue();
+    }
   }
 
   @TestTemplate
@@ -229,10 +248,17 @@ public class TestTaskEqualityDeltaWriter extends TestBase 
{
     assertThat(readRecordsAsList(table.schema(), dataFile.location()))
         .isEqualTo(ImmutableList.of(record, record));
 
-    // Check records in the pos-delete file.
     DeleteFile posDeleteFile = result.deleteFiles()[0];
-    assertThat(readRecordsAsList(DeleteSchemaUtil.pathPosSchema(), 
posDeleteFile.location()))
-        .isEqualTo(ImmutableList.of(posRecord.copy("file_path", 
dataFile.location(), "pos", 0L)));
+    if (formatVersion == FORMAT_V2) {
+      // Check records in the pos-delete file.
+      assertThat(readRecordsAsList(DeleteSchemaUtil.pathPosSchema(), 
posDeleteFile.location()))
+          .isEqualTo(ImmutableList.of(posRecord.copy("file_path", 
dataFile.location(), "pos", 0L)));
+    } else {
+      assertThat(posDeleteFile.format()).isEqualTo(FileFormat.PUFFIN);
+      PositionDeleteIndex positionDeleteIndex = readDVFile(table, 
posDeleteFile);
+      assertThat(positionDeleteIndex.cardinality()).isEqualTo(1);
+      assertThat(positionDeleteIndex.isDeleted(0L)).isTrue();
+    }
 
     deltaWriter =
         createTaskWriter(eqDeleteFieldIds, eqDeleteRowSchema, 
DeleteGranularity.PARTITION);
@@ -328,8 +354,16 @@ public class TestTaskEqualityDeltaWriter extends TestBase {
     DeleteFile posDeleteFile = result.deleteFiles()[1];
     Schema posDeleteSchema = DeleteSchemaUtil.pathPosSchema();
     
assertThat(posDeleteFile.content()).isEqualTo(FileContent.POSITION_DELETES);
-    assertThat(readRecordsAsList(posDeleteSchema, posDeleteFile.location()))
-        .isEqualTo(ImmutableList.of(posRecord.copy("file_path", 
dataFile.location(), "pos", 0L)));
+
+    if (formatVersion == FORMAT_V2) {
+      assertThat(readRecordsAsList(posDeleteSchema, posDeleteFile.location()))
+          .isEqualTo(ImmutableList.of(posRecord.copy("file_path", 
dataFile.location(), "pos", 0L)));
+    } else {
+      assertThat(posDeleteFile.format()).isEqualTo(FileFormat.PUFFIN);
+      PositionDeleteIndex positionDeleteIndex = readDVFile(table, 
posDeleteFile);
+      assertThat(positionDeleteIndex.cardinality()).isEqualTo(1);
+      assertThat(positionDeleteIndex.isDeleted(0L)).isTrue();
+    }
   }
 
   @TestTemplate
@@ -414,8 +448,15 @@ public class TestTaskEqualityDeltaWriter extends TestBase {
     DeleteFile posDeleteFile = result.deleteFiles()[1];
     Schema posDeleteSchema = DeleteSchemaUtil.pathPosSchema();
     
assertThat(posDeleteFile.content()).isEqualTo(FileContent.POSITION_DELETES);
-    assertThat(readRecordsAsList(posDeleteSchema, posDeleteFile.location()))
-        .isEqualTo(ImmutableList.of(posRecord.copy("file_path", 
dataFile.location(), "pos", 0L)));
+    if (formatVersion == FORMAT_V2) {
+      assertThat(readRecordsAsList(posDeleteSchema, posDeleteFile.location()))
+          .isEqualTo(ImmutableList.of(posRecord.copy("file_path", 
dataFile.location(), "pos", 0L)));
+    } else {
+      assertThat(posDeleteFile.format()).isEqualTo(FileFormat.PUFFIN);
+      PositionDeleteIndex positionDeleteIndex = readDVFile(table, 
posDeleteFile);
+      assertThat(positionDeleteIndex.cardinality()).isEqualTo(1);
+      assertThat(positionDeleteIndex.isDeleted(0L)).isTrue();
+    }
   }
 
   @TestTemplate
@@ -462,9 +503,16 @@ public class TestTaskEqualityDeltaWriter extends TestBase {
 
     // Should have 2 files, as BaseRollingWriter checks the size on every 1000 
rows (ROWS_DIVISOR)
     assertThat(result.dataFiles()).as("Should have 2 data files.").hasSize(2);
-    assertThat(result.deleteFiles())
-        .as("Should have correct number of pos-delete files")
-        .hasSize(granularity.equals(DeleteGranularity.FILE) ? 2 : 1);
+    if (formatVersion == FORMAT_V2) {
+      assertThat(result.deleteFiles())
+          .as("Should have correct number of pos-delete files")
+          .hasSize(granularity.equals(DeleteGranularity.FILE) ? 2 : 1);
+    } else {
+      assertThat(result.deleteFiles())
+          .as("Should have correct number of pos-delete files")
+          .hasSize(2);
+    }
+
     assertThat(Arrays.stream(result.deleteFiles()).mapToLong(delete -> 
delete.recordCount()).sum())
         .isEqualTo(expectedDeleteCount);
 
@@ -532,7 +580,8 @@ public class TestTaskEqualityDeltaWriter extends TestBase {
         fileFactory,
         table.io(),
         TARGET_FILE_SIZE,
-        deleteGranularity);
+        deleteGranularity,
+        formatVersion > 2);
   }
 
   private static class GenericTaskDeltaWriter extends BaseTaskWriter<Record> {
@@ -547,10 +596,12 @@ public class TestTaskEqualityDeltaWriter extends TestBase 
{
         OutputFileFactory fileFactory,
         FileIO io,
         long targetFileSize,
-        DeleteGranularity deleteGranularity) {
-      super(spec, format, fileWriterFactory, fileFactory, io, targetFileSize);
+        DeleteGranularity deleteGranularity,
+        boolean useDv) {
+      super(spec, format, fileWriterFactory, fileFactory, io, targetFileSize, 
useDv);
       this.deltaWriter =
-          new GenericEqualityDeltaWriter(null, schema, deleteSchema, 
deleteGranularity);
+          new GenericEqualityDeltaWriter(
+              null, schema, deleteSchema, deleteGranularity, dvFileWriter());
     }
 
     @Override
@@ -570,6 +621,7 @@ public class TestTaskEqualityDeltaWriter extends TestBase {
     @Override
     public void close() throws IOException {
       deltaWriter.close();
+      super.close();
     }
 
     private class GenericEqualityDeltaWriter extends BaseEqualityDeltaWriter {
@@ -577,8 +629,9 @@ public class TestTaskEqualityDeltaWriter extends TestBase {
           PartitionKey partition,
           Schema schema,
           Schema eqDeleteSchema,
-          DeleteGranularity deleteGranularity) {
-        super(partition, schema, eqDeleteSchema, deleteGranularity);
+          DeleteGranularity deleteGranularity,
+          PartitioningDVWriter<Record> dvWriter) {
+        super(partition, schema, eqDeleteSchema, deleteGranularity, dvWriter);
       }
 
       @Override
@@ -631,4 +684,12 @@ public class TestTaskEqualityDeltaWriter extends TestBase {
       return Lists.newArrayList(closeableIterable);
     }
   }
+
+  private PositionDeleteIndex readDVFile(Table table, DeleteFile dvFile) {
+    BaseDeleteLoader deleteLoader =
+        new BaseDeleteLoader(deleteFile -> 
table.io().newInputFile(deleteFile.location()));
+    PositionDeleteIndex index =
+        deleteLoader.loadPositionDeletes(List.of(dvFile), 
dvFile.referencedDataFile());
+    return index;
+  }
 }
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index f68eff6912..33a09705e7 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.io.BaseTaskWriter;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.FileWriterFactory;
 import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitioningDVWriter;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
 
@@ -57,8 +58,9 @@ abstract class BaseDeltaTaskWriter extends 
BaseTaskWriter<RowData> {
       Schema schema,
       RowType flinkSchema,
       Set<Integer> equalityFieldIds,
-      boolean upsert) {
-    super(spec, format, fileWriterFactory, fileFactory, io, targetFileSize);
+      boolean upsert,
+      boolean useDv) {
+    super(spec, format, fileWriterFactory, fileFactory, io, targetFileSize, 
useDv);
     this.schema = schema;
     this.deleteSchema = TypeUtil.select(schema, 
Sets.newHashSet(equalityFieldIds));
     this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
@@ -109,8 +111,8 @@ abstract class BaseDeltaTaskWriter extends 
BaseTaskWriter<RowData> {
   }
 
   protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
-    RowDataDeltaWriter(PartitionKey partition) {
-      super(partition, schema, deleteSchema, DeleteGranularity.FILE);
+    RowDataDeltaWriter(PartitionKey partition, PartitioningDVWriter<RowData> 
dvFileWriter) {
+      super(partition, schema, deleteSchema, DeleteGranularity.FILE, 
dvFileWriter);
     }
 
     @Override
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index 0eeedf2659..1736d91b1b 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -96,7 +96,10 @@ public class FlinkManifestUtil {
    *     partition spec
    */
   public static DeltaManifests writeCompletedFiles(
-      WriteResult result, Supplier<OutputFile> outputFileSupplier, 
PartitionSpec spec)
+      WriteResult result,
+      Supplier<OutputFile> outputFileSupplier,
+      PartitionSpec spec,
+      int formatVersion)
       throws IOException {
 
     ManifestFile dataManifest = null;
@@ -113,7 +116,8 @@ public class FlinkManifestUtil {
       OutputFile deleteManifestFile = outputFileSupplier.get();
 
       ManifestWriter<DeleteFile> deleteManifestWriter =
-          ManifestFiles.writeDeleteManifest(FORMAT_V2, spec, 
deleteManifestFile, DUMMY_SNAPSHOT_ID);
+          ManifestFiles.writeDeleteManifest(
+              formatVersion, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID);
       try (ManifestWriter<DeleteFile> writer = deleteManifestWriter) {
         for (DeleteFile deleteFile : result.deleteFiles()) {
           writer.add(deleteFile);
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 89432cff2b..b9ac0f9906 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -47,6 +47,7 @@ import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.io.WriteResult;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
@@ -446,7 +447,10 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     WriteResult result = WriteResult.builder().addAll(writeResults).build();
     DeltaManifests deltaManifests =
         FlinkManifestUtil.writeCompletedFiles(
-            result, () -> manifestOutputFileFactory.create(checkpointId), 
spec);
+            result,
+            () -> manifestOutputFileFactory.create(checkpointId),
+            spec,
+            TableUtil.formatVersion(table));
 
     return SimpleVersionedSerialization.writeVersionAndSerialize(
         DeltaManifestsSerializer.INSTANCE, deltaManifests);
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
index 1dbb62363d..4a4a789bf9 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -134,7 +135,10 @@ class IcebergWriteAggregator extends 
AbstractStreamOperator<CommittableMessage<I
     WriteResult result = WriteResult.builder().addAll(writeResults).build();
     DeltaManifests deltaManifests =
         FlinkManifestUtil.writeCompletedFiles(
-            result, () -> 
icebergManifestOutputFileFactory.create(checkpointId), table.spec());
+            result,
+            () -> icebergManifestOutputFileFactory.create(checkpointId),
+            table.spec(),
+            TableUtil.formatVersion(table));
 
     return SimpleVersionedSerialization.writeVersionAndSerialize(
         DeltaManifestsSerializer.INSTANCE, deltaManifests);
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
index afbc14b7f1..5e597d8e71 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
@@ -50,7 +50,8 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
       Schema schema,
       RowType flinkSchema,
       Set<Integer> equalityFieldIds,
-      boolean upsert) {
+      boolean upsert,
+      boolean useDv) {
     super(
         spec,
         format,
@@ -61,7 +62,8 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
         schema,
         flinkSchema,
         equalityFieldIds,
-        upsert);
+        upsert,
+        useDv);
     this.partitionKey = new PartitionKey(spec, schema);
   }
 
@@ -74,7 +76,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
       // NOTICE: we need to copy a new partition key here, in case of messing 
up the keys in
       // writers.
       PartitionKey copiedKey = partitionKey.copy();
-      writer = new RowDataDeltaWriter(copiedKey);
+      writer = new RowDataDeltaWriter(copiedKey, dvFileWriter());
       writers.put(copiedKey, writer);
     }
 
@@ -84,6 +86,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
   @Override
   public void close() {
     try {
+      super.close();
       Tasks.foreach(writers.values())
           .throwFailureWhenFinished()
           .noRetry()
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index ef2c795e23..bc3bc51ced 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.flink.RowDataWrapper;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.FileWriterFactory;
@@ -52,6 +53,7 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
   private final Set<Integer> equalityFieldIds;
   private final boolean upsert;
   private final FileWriterFactory<RowData> fileWriterFactory;
+  private boolean useDv;
 
   private transient OutputFileFactory outputFileFactory;
 
@@ -170,6 +172,7 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
     }
 
     refreshTable();
+    this.useDv = TableUtil.formatVersion(table) > 2;
 
     this.outputFileFactory =
         OutputFileFactory.builderFor(table, taskId, attemptId)
@@ -221,7 +224,8 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
             schema,
             flinkSchema,
             equalityFieldIds,
-            upsert);
+            upsert,
+            useDv);
       } else {
         return new PartitionedDeltaWriter(
             spec,
@@ -233,7 +237,8 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory<RowData> {
             schema,
             flinkSchema,
             equalityFieldIds,
-            upsert);
+            upsert,
+            useDv);
       }
     }
   }
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
index e709206c94..9d749d3062 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
@@ -42,7 +42,8 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
       Schema schema,
       RowType flinkSchema,
       Set<Integer> equalityFieldIds,
-      boolean upsert) {
+      boolean upsert,
+      boolean useDv) {
     super(
         spec,
         format,
@@ -53,8 +54,9 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
         schema,
         flinkSchema,
         equalityFieldIds,
-        upsert);
-    this.writer = new RowDataDeltaWriter(null);
+        upsert,
+        useDv);
+    this.writer = new RowDataDeltaWriter(null, dvFileWriter());
   }
 
   @Override
@@ -65,5 +67,6 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
   @Override
   public void close() throws IOException {
     writer.close();
+    super.close();
   }
 }
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
index b3262957ad..0f65f1ae52 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
@@ -164,7 +164,8 @@ class DynamicWriteResultAggregator
         FlinkManifestUtil.writeCompletedFiles(
             result,
             () -> outputFileFactory(key.tableName()).create(checkpointId),
-            spec(key.tableName(), key.specId()));
+            spec(key.tableName(), key.specId()),
+            2);
 
     return SimpleVersionedSerialization.writeVersionAndSerialize(
         DeltaManifestsSerializer.INSTANCE, deltaManifests);
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index e99e6e72da..5ed9da8623 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
@@ -116,6 +117,11 @@ class DynamicWriter implements 
CommittingSinkWriter<DynamicRecordInternal, Dynam
                           Preconditions.checkState(
                               !equalityFieldIds.isEmpty(),
                               "Equality field columns shouldn't be empty when 
configuring to use UPSERT data.");
+
+                          Preconditions.checkArgument(
+                              !(TableUtil.formatVersion(table) > 2),
+                              "Dynamic Sink writer does not support upsert 
mode in tables (V3+)");
+
                           if (!table.spec().isUnpartitioned()) {
                             for (PartitionField partitionField : 
table.spec().fields()) {
                               Preconditions.checkState(
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
index c5a7ec4bee..e6ca9c5c74 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -48,21 +48,27 @@ public class TestFlinkUpsert extends CatalogTestBase {
   @Parameter(index = 3)
   private boolean isStreamingJob;
 
+  @Parameter(index = 4)
+  private int formatVersion;
+
   private final Map<String, String> tableUpsertProps = Maps.newHashMap();
   private TableEnvironment tEnv;
 
-  @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, 
isStreaming={3}")
+  @Parameters(
+      name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}, 
formatVersion={4} ")
   public static List<Object[]> parameters() {
     List<Object[]> parameters = Lists.newArrayList();
     for (FileFormat format :
         new FileFormat[] {FileFormat.PARQUET, FileFormat.AVRO, 
FileFormat.ORC}) {
-      for (Boolean isStreaming : new Boolean[] {true, false}) {
-        // Only test with one catalog as this is a file operation concern.
-        // FlinkCatalogTestBase requires the catalog name start with 
testhadoop if using hadoop
-        // catalog.
-        String catalogName = "testhadoop";
-        Namespace baseNamespace = Namespace.of("default");
-        parameters.add(new Object[] {catalogName, baseNamespace, format, 
isStreaming});
+      for (int version : org.apache.iceberg.TestHelpers.V2_AND_ABOVE) {
+        for (Boolean isStreaming : new Boolean[] {true, false}) {
+          // Only test with one catalog as this is a file operation concern.
+          // FlinkCatalogTestBase requires the catalog name start with 
testhadoop if using hadoop
+          // catalog.
+          String catalogName = "testhadoop";
+          Namespace baseNamespace = Namespace.of("default");
+          parameters.add(new Object[] {catalogName, baseNamespace, format, 
isStreaming, version});
+        }
       }
     }
     return parameters;
@@ -98,7 +104,7 @@ public class TestFlinkUpsert extends CatalogTestBase {
     sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
     sql("USE CATALOG %s", catalogName);
     sql("USE %s", DATABASE);
-    tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");
+    tableUpsertProps.put(TableProperties.FORMAT_VERSION, 
String.valueOf(formatVersion));
     tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true");
     tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());
   }
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
index 0c7a47c232..c39e09fd86 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.FileMetadata;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.maintenance.operator.TableChange;
 import org.apache.iceberg.io.FileIO;
@@ -116,7 +117,8 @@ class TestCommittableToTableChangeConverter {
               .addDeleteFiles(posDeleteFile, eqDeleteFile)
               .build();
       DeltaManifests deltaManifests =
-          FlinkManifestUtil.writeCompletedFiles(writeResult, () -> 
factory.create(1), table.spec());
+          FlinkManifestUtil.writeCompletedFiles(
+              writeResult, () -> factory.create(1), table.spec(), 
TableUtil.formatVersion(table));
       IcebergCommittable committable =
           new IcebergCommittable(
               SimpleVersionedSerialization.writeVersionAndSerialize(
@@ -297,7 +299,10 @@ class TestCommittableToTableChangeConverter {
             .build();
     DeltaManifests deltaManifests =
         FlinkManifestUtil.writeCompletedFiles(
-            writeResult, () -> factory.create(checkpointId), table.spec());
+            writeResult,
+            () -> factory.create(checkpointId),
+            table.spec(),
+            TableUtil.formatVersion(table));
 
     IcebergCommittable committable =
         new IcebergCommittable(
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index a21c51c378..89f642af1c 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TestBase;
+import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.TestTables;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
@@ -80,11 +81,16 @@ public class TestDeltaTaskWriter extends TestBase {
   private FileFormat format;
 
   @Parameters(name = "formatVersion = {0}, fileFormat = {1}")
-  protected static List<Object> parameters() {
-    return Arrays.asList(
-        new Object[] {2, FileFormat.AVRO},
-        new Object[] {2, FileFormat.ORC},
-        new Object[] {2, FileFormat.PARQUET});
+  protected static List<Object[]> parameters() {
+    List<Object[]> parameters = Lists.newArrayList();
+    for (FileFormat format :
+        new FileFormat[] {FileFormat.AVRO, FileFormat.ORC, 
FileFormat.PARQUET}) {
+      for (int version : TestHelpers.V2_AND_ABOVE) {
+        parameters.add(new Object[] {version, format});
+      }
+    }
+
+    return parameters;
   }
 
   @Override
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index c6dc984513..4bbd523ec0 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.io.WriteResult;
@@ -105,7 +106,8 @@ public class TestFlinkManifest {
                   .addDeleteFiles(posDeleteFiles)
                   .build(),
               () -> factory.create(curCkpId),
-              table.spec());
+              table.spec(),
+              TableUtil.formatVersion(table));
 
       WriteResult result =
           FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), 
table.specs());
@@ -141,7 +143,8 @@ public class TestFlinkManifest {
         FlinkManifestUtil.writeCompletedFiles(
             WriteResult.builder().addDataFiles(dataFiles).build(),
             () -> factory.create(checkpointId),
-            table.spec());
+            table.spec(),
+            TableUtil.formatVersion(table));
 
     assertThat(deltaManifests.dataManifest()).isNotNull();
     assertThat(deltaManifests.deleteManifest()).isNull();
@@ -180,7 +183,8 @@ public class TestFlinkManifest {
                 .addDeleteFiles(posDeleteFiles)
                 .build(),
             () -> factory.create(checkpointId),
-            table.spec());
+            table.spec(),
+            TableUtil.formatVersion(table));
 
     byte[] versionedSerializeData =
         SimpleVersionedSerialization.writeVersionAndSerialize(


Reply via email to