This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new b6747f8cf6 Flink: Support writing DVs in IcebergSink (#14197)
b6747f8cf6 is described below
commit b6747f8cf6313fa4c53c5596bf75b675d721c8d2
Author: GuoYu <[email protected]>
AuthorDate: Tue Oct 21 22:25:45 2025 +0800
Flink: Support writing DVs in IcebergSink (#14197)
---
.../java/org/apache/iceberg/io/BaseTaskWriter.java | 106 +++++++++++++++---
.../iceberg/io/TestTaskEqualityDeltaWriter.java | 123 +++++++++++++++------
.../iceberg/flink/sink/BaseDeltaTaskWriter.java | 10 +-
.../iceberg/flink/sink/FlinkManifestUtil.java | 8 +-
.../iceberg/flink/sink/IcebergFilesCommitter.java | 6 +-
.../iceberg/flink/sink/IcebergWriteAggregator.java | 6 +-
.../iceberg/flink/sink/PartitionedDeltaWriter.java | 9 +-
.../flink/sink/RowDataTaskWriterFactory.java | 9 +-
.../flink/sink/UnpartitionedDeltaWriter.java | 9 +-
.../sink/dynamic/DynamicWriteResultAggregator.java | 3 +-
.../iceberg/flink/sink/dynamic/DynamicWriter.java | 6 +
.../org/apache/iceberg/flink/TestFlinkUpsert.java | 24 ++--
.../TestCommittableToTableChangeConverter.java | 9 +-
.../iceberg/flink/sink/TestDeltaTaskWriter.java | 16 ++-
.../iceberg/flink/sink/TestFlinkManifest.java | 10 +-
15 files changed, 272 insertions(+), 82 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
index 860a155dee..b3d260c91e 100644
--- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
@@ -20,8 +20,10 @@ package org.apache.iceberg.io;
import java.io.Closeable;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
+import java.util.function.Supplier;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
@@ -61,6 +63,7 @@ public abstract class BaseTaskWriter<T> implements
TaskWriter<T> {
private final FileIO io;
private final long targetFileSize;
private Throwable failure;
+ private PartitioningDVWriter<T> dvFileWriter;
protected BaseTaskWriter(
PartitionSpec spec,
@@ -94,6 +97,26 @@ public abstract class BaseTaskWriter<T> implements
TaskWriter<T> {
this.targetFileSize = targetFileSize;
}
+ protected BaseTaskWriter(
+ PartitionSpec spec,
+ FileFormat format,
+ FileWriterFactory<T> writerFactory,
+ OutputFileFactory fileFactory,
+ FileIO io,
+ long targetFileSize,
+ boolean useDv) {
+ this.spec = spec;
+ this.format = format;
+ this.appenderFactory = null;
+ this.writerFactory = writerFactory;
+ this.fileFactory = fileFactory;
+ this.io = io;
+ this.targetFileSize = targetFileSize;
+ if (useDv) {
+ this.dvFileWriter = new PartitioningDVWriter<>(fileFactory, p -> null);
+ }
+ }
+
protected PartitionSpec spec() {
return spec;
}
@@ -104,6 +127,10 @@ public abstract class BaseTaskWriter<T> implements
TaskWriter<T> {
}
}
+ protected PartitioningDVWriter<T> dvFileWriter() {
+ return dvFileWriter;
+ }
+
@Override
public void abort() throws IOException {
close();
@@ -129,17 +156,38 @@ public abstract class BaseTaskWriter<T> implements
TaskWriter<T> {
.build();
}
+ @Override
+ public void close() throws IOException {
+ try {
+ if (dvFileWriter != null) {
+ try {
+ // complete will call close
+ dvFileWriter.close();
+ DeleteWriteResult result = dvFileWriter.result();
+ completedDeleteFiles.addAll(result.deleteFiles());
+ referencedDataFiles.addAll(result.referencedDataFiles());
+ } finally {
+ dvFileWriter = null;
+ }
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close dvFileWriter", e);
+ }
+ }
+
/** Base equality delta writer to write both insert records and
equality-deletes. */
protected abstract class BaseEqualityDeltaWriter implements Closeable {
private final StructProjection structProjection;
private final PositionDelete<T> positionDelete;
+ private final StructLike partitionKey;
private RollingFileWriter dataWriter;
private RollingEqDeleteWriter eqDeleteWriter;
- private FileWriter<PositionDelete<T>, DeleteWriteResult> posDeleteWriter;
+ private PartitioningWriter<PositionDelete<T>, DeleteWriteResult>
posDeleteWriter;
private Map<StructLike, PathOffset> insertedRowMap;
+ private boolean closePosDeleteWriter;
protected BaseEqualityDeltaWriter(StructLike partition, Schema schema,
Schema deleteSchema) {
- this(partition, schema, deleteSchema, DeleteGranularity.PARTITION);
+ this(partition, schema, deleteSchema, DeleteGranularity.PARTITION, null);
}
protected BaseEqualityDeltaWriter(
@@ -147,6 +195,15 @@ public abstract class BaseTaskWriter<T> implements
TaskWriter<T> {
Schema schema,
Schema deleteSchema,
DeleteGranularity deleteGranularity) {
+ this(partition, schema, deleteSchema, deleteGranularity, null);
+ }
+
+ protected BaseEqualityDeltaWriter(
+ StructLike partition,
+ Schema schema,
+ Schema deleteSchema,
+ DeleteGranularity deleteGranularity,
+ PartitioningDVWriter<T> posDeleteWriter) {
Preconditions.checkNotNull(schema, "Iceberg table schema cannot be
null.");
Preconditions.checkNotNull(deleteSchema, "Equality-delete schema cannot
be null.");
this.structProjection = StructProjection.create(schema, deleteSchema);
@@ -155,18 +212,11 @@ public abstract class BaseTaskWriter<T> implements
TaskWriter<T> {
this.dataWriter = new RollingFileWriter(partition);
this.eqDeleteWriter = new RollingEqDeleteWriter(partition);
this.posDeleteWriter =
- new SortingPositionOnlyDeleteWriter<>(
- () -> {
- if (writerFactory != null) {
- return writerFactory.newPositionDeleteWriter(
- newOutputFile(partition), spec, partition);
- } else {
- return appenderFactory.newPosDeleteWriter(
- newOutputFile(partition), format, partition);
- }
- },
- deleteGranularity);
+ posDeleteWriter != null
+ ? posDeleteWriter
+ : createPosDeleteWriter(partition, deleteGranularity);
this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct());
+ this.partitionKey = partition;
}
/** Wrap the data as a {@link StructLike}. */
@@ -191,6 +241,17 @@ public abstract class BaseTaskWriter<T> implements
TaskWriter<T> {
dataWriter.write(row);
}
+ private PartitioningWriter<PositionDelete<T>, DeleteWriteResult>
createPosDeleteWriter(
+ StructLike partition, DeleteGranularity deleteGranularity) {
+ this.closePosDeleteWriter = true;
+ return new WrappedPositionDeleteWriter<>(
+ () ->
+ writerFactory != null
+ ?
writerFactory.newPositionDeleteWriter(newOutputFile(partition), spec, partition)
+ :
appenderFactory.newPosDeleteWriter(newOutputFile(partition), format, partition),
+ deleteGranularity);
+ }
+
private EncryptedOutputFile newOutputFile(StructLike partition) {
if (spec.isUnpartitioned() || partition == null) {
return fileFactory.newOutputFile();
@@ -201,7 +262,7 @@ public abstract class BaseTaskWriter<T> implements
TaskWriter<T> {
private void writePosDelete(PathOffset pathOffset) {
positionDelete.set(pathOffset.path, pathOffset.rowOffset, null);
- posDeleteWriter.write(positionDelete);
+ posDeleteWriter.write(positionDelete, spec, partitionKey);
}
/**
@@ -272,7 +333,7 @@ public abstract class BaseTaskWriter<T> implements
TaskWriter<T> {
}
// Add the completed pos-delete files.
- if (posDeleteWriter != null) {
+ if (closePosDeleteWriter && posDeleteWriter != null) {
try {
// complete will call close
posDeleteWriter.close();
@@ -466,4 +527,19 @@ public abstract class BaseTaskWriter<T> implements
TaskWriter<T> {
completedDeleteFiles.add(closedWriter.toDeleteFile());
}
}
+
+ private static class WrappedPositionDeleteWriter<T> extends
SortingPositionOnlyDeleteWriter<T>
+ implements PartitioningWriter<PositionDelete<T>, DeleteWriteResult> {
+
+ WrappedPositionDeleteWriter(
+ Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers,
+ DeleteGranularity granularity) {
+ super(writers, granularity);
+ }
+
+ @Override
+ public void write(PositionDelete<T> positionDelete, PartitionSpec spec,
StructLike partition) {
+ super.write(positionDelete);
+ }
+ }
}
diff --git
a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
index 3388a8636a..271e3fbe82 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
@@ -39,8 +39,11 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TestBase;
+import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.BaseDeleteLoader;
import org.apache.iceberg.data.GenericFileWriterFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
@@ -49,6 +52,7 @@ import org.apache.iceberg.data.avro.PlannedDataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.deletes.DeleteGranularity;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -74,12 +78,17 @@ public class TestTaskEqualityDeltaWriter extends TestBase {
@Parameter(index = 1)
protected FileFormat format;
- @Parameters(name = "formatVersion = {0}, FileFormat = {0}")
- protected static List<Object> parameters() {
- return Arrays.asList(
- new Object[] {FORMAT_V2, FileFormat.AVRO},
- new Object[] {FORMAT_V2, FileFormat.ORC},
- new Object[] {FORMAT_V2, FileFormat.PARQUET});
+ @Parameters(name = "formatVersion = {0}, FileFormat = {1}")
+ protected static List<Object[]> parameters() {
+ List<Object[]> parameters = Lists.newArrayList();
+ for (FileFormat format :
+ new FileFormat[] {FileFormat.AVRO, FileFormat.ORC,
FileFormat.PARQUET}) {
+ for (int version : TestHelpers.V2_AND_ABOVE) {
+ parameters.add(new Object[] {version, format});
+ }
+ }
+
+ return parameters;
}
@Override
@@ -190,15 +199,25 @@ public class TestTaskEqualityDeltaWriter extends TestBase
{
createRecord(2, "ggg"),
createRecord(1, "hhh")));
- // Check records in the pos-delete file.
- Schema posDeleteSchema = DeleteSchemaUtil.pathPosSchema();
- assertThat(readRecordsAsList(posDeleteSchema, posDeleteFile.location()))
- .isEqualTo(
- ImmutableList.of(
- posRecord.copy("file_path", dataFile.location(), "pos", 0L),
- posRecord.copy("file_path", dataFile.location(), "pos", 1L),
- posRecord.copy("file_path", dataFile.location(), "pos", 2L),
- posRecord.copy("file_path", dataFile.location(), "pos", 3L)));
+ if (formatVersion == FORMAT_V2) {
+ // Check records in the pos-delete file.
+ Schema posDeleteSchema = DeleteSchemaUtil.pathPosSchema();
+ assertThat(readRecordsAsList(posDeleteSchema, posDeleteFile.location()))
+ .isEqualTo(
+ ImmutableList.of(
+ posRecord.copy("file_path", dataFile.location(), "pos", 0L),
+ posRecord.copy("file_path", dataFile.location(), "pos", 1L),
+ posRecord.copy("file_path", dataFile.location(), "pos", 2L),
+ posRecord.copy("file_path", dataFile.location(), "pos",
3L)));
+ } else {
+ assertThat(posDeleteFile.format()).isEqualTo(FileFormat.PUFFIN);
+ PositionDeleteIndex positionDeleteIndex = readDVFile(table,
posDeleteFile);
+ assertThat(positionDeleteIndex.cardinality()).isEqualTo(4);
+ assertThat(positionDeleteIndex.isDeleted(0L)).isTrue();
+ assertThat(positionDeleteIndex.isDeleted(1L)).isTrue();
+ assertThat(positionDeleteIndex.isDeleted(2L)).isTrue();
+ assertThat(positionDeleteIndex.isDeleted(3L)).isTrue();
+ }
}
@TestTemplate
@@ -229,10 +248,17 @@ public class TestTaskEqualityDeltaWriter extends TestBase
{
assertThat(readRecordsAsList(table.schema(), dataFile.location()))
.isEqualTo(ImmutableList.of(record, record));
- // Check records in the pos-delete file.
DeleteFile posDeleteFile = result.deleteFiles()[0];
- assertThat(readRecordsAsList(DeleteSchemaUtil.pathPosSchema(),
posDeleteFile.location()))
- .isEqualTo(ImmutableList.of(posRecord.copy("file_path",
dataFile.location(), "pos", 0L)));
+ if (formatVersion == FORMAT_V2) {
+ // Check records in the pos-delete file.
+ assertThat(readRecordsAsList(DeleteSchemaUtil.pathPosSchema(),
posDeleteFile.location()))
+ .isEqualTo(ImmutableList.of(posRecord.copy("file_path",
dataFile.location(), "pos", 0L)));
+ } else {
+ assertThat(posDeleteFile.format()).isEqualTo(FileFormat.PUFFIN);
+ PositionDeleteIndex positionDeleteIndex = readDVFile(table,
posDeleteFile);
+ assertThat(positionDeleteIndex.cardinality()).isEqualTo(1);
+ assertThat(positionDeleteIndex.isDeleted(0L)).isTrue();
+ }
deltaWriter =
createTaskWriter(eqDeleteFieldIds, eqDeleteRowSchema,
DeleteGranularity.PARTITION);
@@ -328,8 +354,16 @@ public class TestTaskEqualityDeltaWriter extends TestBase {
DeleteFile posDeleteFile = result.deleteFiles()[1];
Schema posDeleteSchema = DeleteSchemaUtil.pathPosSchema();
assertThat(posDeleteFile.content()).isEqualTo(FileContent.POSITION_DELETES);
- assertThat(readRecordsAsList(posDeleteSchema, posDeleteFile.location()))
- .isEqualTo(ImmutableList.of(posRecord.copy("file_path",
dataFile.location(), "pos", 0L)));
+
+ if (formatVersion == FORMAT_V2) {
+ assertThat(readRecordsAsList(posDeleteSchema, posDeleteFile.location()))
+ .isEqualTo(ImmutableList.of(posRecord.copy("file_path",
dataFile.location(), "pos", 0L)));
+ } else {
+ assertThat(posDeleteFile.format()).isEqualTo(FileFormat.PUFFIN);
+ PositionDeleteIndex positionDeleteIndex = readDVFile(table,
posDeleteFile);
+ assertThat(positionDeleteIndex.cardinality()).isEqualTo(1);
+ assertThat(positionDeleteIndex.isDeleted(0L)).isTrue();
+ }
}
@TestTemplate
@@ -414,8 +448,15 @@ public class TestTaskEqualityDeltaWriter extends TestBase {
DeleteFile posDeleteFile = result.deleteFiles()[1];
Schema posDeleteSchema = DeleteSchemaUtil.pathPosSchema();
assertThat(posDeleteFile.content()).isEqualTo(FileContent.POSITION_DELETES);
- assertThat(readRecordsAsList(posDeleteSchema, posDeleteFile.location()))
- .isEqualTo(ImmutableList.of(posRecord.copy("file_path",
dataFile.location(), "pos", 0L)));
+ if (formatVersion == FORMAT_V2) {
+ assertThat(readRecordsAsList(posDeleteSchema, posDeleteFile.location()))
+ .isEqualTo(ImmutableList.of(posRecord.copy("file_path",
dataFile.location(), "pos", 0L)));
+ } else {
+ assertThat(posDeleteFile.format()).isEqualTo(FileFormat.PUFFIN);
+ PositionDeleteIndex positionDeleteIndex = readDVFile(table,
posDeleteFile);
+ assertThat(positionDeleteIndex.cardinality()).isEqualTo(1);
+ assertThat(positionDeleteIndex.isDeleted(0L)).isTrue();
+ }
}
@TestTemplate
@@ -462,9 +503,16 @@ public class TestTaskEqualityDeltaWriter extends TestBase {
// Should have 2 files, as BaseRollingWriter checks the size on every 1000
rows (ROWS_DIVISOR)
assertThat(result.dataFiles()).as("Should have 2 data files.").hasSize(2);
- assertThat(result.deleteFiles())
- .as("Should have correct number of pos-delete files")
- .hasSize(granularity.equals(DeleteGranularity.FILE) ? 2 : 1);
+ if (formatVersion == FORMAT_V2) {
+ assertThat(result.deleteFiles())
+ .as("Should have correct number of pos-delete files")
+ .hasSize(granularity.equals(DeleteGranularity.FILE) ? 2 : 1);
+ } else {
+ assertThat(result.deleteFiles())
+ .as("Should have correct number of pos-delete files")
+ .hasSize(2);
+ }
+
assertThat(Arrays.stream(result.deleteFiles()).mapToLong(delete ->
delete.recordCount()).sum())
.isEqualTo(expectedDeleteCount);
@@ -532,7 +580,8 @@ public class TestTaskEqualityDeltaWriter extends TestBase {
fileFactory,
table.io(),
TARGET_FILE_SIZE,
- deleteGranularity);
+ deleteGranularity,
+ formatVersion > 2);
}
private static class GenericTaskDeltaWriter extends BaseTaskWriter<Record> {
@@ -547,10 +596,12 @@ public class TestTaskEqualityDeltaWriter extends TestBase
{
OutputFileFactory fileFactory,
FileIO io,
long targetFileSize,
- DeleteGranularity deleteGranularity) {
- super(spec, format, fileWriterFactory, fileFactory, io, targetFileSize);
+ DeleteGranularity deleteGranularity,
+ boolean useDv) {
+ super(spec, format, fileWriterFactory, fileFactory, io, targetFileSize,
useDv);
this.deltaWriter =
- new GenericEqualityDeltaWriter(null, schema, deleteSchema,
deleteGranularity);
+ new GenericEqualityDeltaWriter(
+ null, schema, deleteSchema, deleteGranularity, dvFileWriter());
}
@Override
@@ -570,6 +621,7 @@ public class TestTaskEqualityDeltaWriter extends TestBase {
@Override
public void close() throws IOException {
deltaWriter.close();
+ super.close();
}
private class GenericEqualityDeltaWriter extends BaseEqualityDeltaWriter {
@@ -577,8 +629,9 @@ public class TestTaskEqualityDeltaWriter extends TestBase {
PartitionKey partition,
Schema schema,
Schema eqDeleteSchema,
- DeleteGranularity deleteGranularity) {
- super(partition, schema, eqDeleteSchema, deleteGranularity);
+ DeleteGranularity deleteGranularity,
+ PartitioningDVWriter<Record> dvWriter) {
+ super(partition, schema, eqDeleteSchema, deleteGranularity, dvWriter);
}
@Override
@@ -631,4 +684,12 @@ public class TestTaskEqualityDeltaWriter extends TestBase {
return Lists.newArrayList(closeableIterable);
}
}
+
+ private PositionDeleteIndex readDVFile(Table table, DeleteFile dvFile) {
+ BaseDeleteLoader deleteLoader =
+ new BaseDeleteLoader(deleteFile ->
table.io().newInputFile(deleteFile.location()));
+ PositionDeleteIndex index =
+ deleteLoader.loadPositionDeletes(List.of(dvFile),
dvFile.referencedDataFile());
+ return index;
+ }
}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index f68eff6912..33a09705e7 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitioningDVWriter;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
@@ -57,8 +58,9 @@ abstract class BaseDeltaTaskWriter extends
BaseTaskWriter<RowData> {
Schema schema,
RowType flinkSchema,
Set<Integer> equalityFieldIds,
- boolean upsert) {
- super(spec, format, fileWriterFactory, fileFactory, io, targetFileSize);
+ boolean upsert,
+ boolean useDv) {
+ super(spec, format, fileWriterFactory, fileFactory, io, targetFileSize,
useDv);
this.schema = schema;
this.deleteSchema = TypeUtil.select(schema,
Sets.newHashSet(equalityFieldIds));
this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
@@ -109,8 +111,8 @@ abstract class BaseDeltaTaskWriter extends
BaseTaskWriter<RowData> {
}
protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
- RowDataDeltaWriter(PartitionKey partition) {
- super(partition, schema, deleteSchema, DeleteGranularity.FILE);
+ RowDataDeltaWriter(PartitionKey partition, PartitioningDVWriter<RowData>
dvFileWriter) {
+ super(partition, schema, deleteSchema, DeleteGranularity.FILE,
dvFileWriter);
}
@Override
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index 0eeedf2659..1736d91b1b 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -96,7 +96,10 @@ public class FlinkManifestUtil {
* partition spec
*/
public static DeltaManifests writeCompletedFiles(
- WriteResult result, Supplier<OutputFile> outputFileSupplier,
PartitionSpec spec)
+ WriteResult result,
+ Supplier<OutputFile> outputFileSupplier,
+ PartitionSpec spec,
+ int formatVersion)
throws IOException {
ManifestFile dataManifest = null;
@@ -113,7 +116,8 @@ public class FlinkManifestUtil {
OutputFile deleteManifestFile = outputFileSupplier.get();
ManifestWriter<DeleteFile> deleteManifestWriter =
- ManifestFiles.writeDeleteManifest(FORMAT_V2, spec,
deleteManifestFile, DUMMY_SNAPSHOT_ID);
+ ManifestFiles.writeDeleteManifest(
+ formatVersion, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID);
try (ManifestWriter<DeleteFile> writer = deleteManifestWriter) {
for (DeleteFile deleteFile : result.deleteFiles()) {
writer.add(deleteFile);
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 89432cff2b..b9ac0f9906 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -47,6 +47,7 @@ import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.WriteResult;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
@@ -446,7 +447,10 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
WriteResult result = WriteResult.builder().addAll(writeResults).build();
DeltaManifests deltaManifests =
FlinkManifestUtil.writeCompletedFiles(
- result, () -> manifestOutputFileFactory.create(checkpointId),
spec);
+ result,
+ () -> manifestOutputFileFactory.create(checkpointId),
+ spec,
+ TableUtil.formatVersion(table));
return SimpleVersionedSerialization.writeVersionAndSerialize(
DeltaManifestsSerializer.INSTANCE, deltaManifests);
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
index 1dbb62363d..4a4a789bf9 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
@@ -30,6 +30,7 @@ import
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -134,7 +135,10 @@ class IcebergWriteAggregator extends
AbstractStreamOperator<CommittableMessage<I
WriteResult result = WriteResult.builder().addAll(writeResults).build();
DeltaManifests deltaManifests =
FlinkManifestUtil.writeCompletedFiles(
- result, () ->
icebergManifestOutputFileFactory.create(checkpointId), table.spec());
+ result,
+ () -> icebergManifestOutputFileFactory.create(checkpointId),
+ table.spec(),
+ TableUtil.formatVersion(table));
return SimpleVersionedSerialization.writeVersionAndSerialize(
DeltaManifestsSerializer.INSTANCE, deltaManifests);
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
index afbc14b7f1..5e597d8e71 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
@@ -50,7 +50,8 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
Schema schema,
RowType flinkSchema,
Set<Integer> equalityFieldIds,
- boolean upsert) {
+ boolean upsert,
+ boolean useDv) {
super(
spec,
format,
@@ -61,7 +62,8 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
schema,
flinkSchema,
equalityFieldIds,
- upsert);
+ upsert,
+ useDv);
this.partitionKey = new PartitionKey(spec, schema);
}
@@ -74,7 +76,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
// NOTICE: we need to copy a new partition key here, in case of messing
up the keys in
// writers.
PartitionKey copiedKey = partitionKey.copy();
- writer = new RowDataDeltaWriter(copiedKey);
+ writer = new RowDataDeltaWriter(copiedKey, dvFileWriter());
writers.put(copiedKey, writer);
}
@@ -84,6 +86,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
@Override
public void close() {
try {
+ super.close();
Tasks.foreach(writers.values())
.throwFailureWhenFinished()
.noRetry()
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index ef2c795e23..bc3bc51ced 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileWriterFactory;
@@ -52,6 +53,7 @@ public class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
private final Set<Integer> equalityFieldIds;
private final boolean upsert;
private final FileWriterFactory<RowData> fileWriterFactory;
+ private boolean useDv;
private transient OutputFileFactory outputFileFactory;
@@ -170,6 +172,7 @@ public class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
}
refreshTable();
+ this.useDv = TableUtil.formatVersion(table) > 2;
this.outputFileFactory =
OutputFileFactory.builderFor(table, taskId, attemptId)
@@ -221,7 +224,8 @@ public class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
schema,
flinkSchema,
equalityFieldIds,
- upsert);
+ upsert,
+ useDv);
} else {
return new PartitionedDeltaWriter(
spec,
@@ -233,7 +237,8 @@ public class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
schema,
flinkSchema,
equalityFieldIds,
- upsert);
+ upsert,
+ useDv);
}
}
}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
index e709206c94..9d749d3062 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
@@ -42,7 +42,8 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
Schema schema,
RowType flinkSchema,
Set<Integer> equalityFieldIds,
- boolean upsert) {
+ boolean upsert,
+ boolean useDv) {
super(
spec,
format,
@@ -53,8 +54,9 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
schema,
flinkSchema,
equalityFieldIds,
- upsert);
- this.writer = new RowDataDeltaWriter(null);
+ upsert,
+ useDv);
+ this.writer = new RowDataDeltaWriter(null, dvFileWriter());
}
@Override
@@ -65,5 +67,6 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
@Override
public void close() throws IOException {
writer.close();
+ super.close();
}
}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
index b3262957ad..0f65f1ae52 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
@@ -164,7 +164,8 @@ class DynamicWriteResultAggregator
FlinkManifestUtil.writeCompletedFiles(
result,
() -> outputFileFactory(key.tableName()).create(checkpointId),
- spec(key.tableName(), key.specId()));
+ spec(key.tableName(), key.specId()),
+ 2);
return SimpleVersionedSerialization.writeVersionAndSerialize(
DeltaManifestsSerializer.INSTANCE, deltaManifests);
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index e99e6e72da..5ed9da8623 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.FlinkSchemaUtil;
@@ -116,6 +117,11 @@ class DynamicWriter implements
CommittingSinkWriter<DynamicRecordInternal, Dynam
Preconditions.checkState(
!equalityFieldIds.isEmpty(),
"Equality field columns shouldn't be empty when
configuring to use UPSERT data.");
+
+ Preconditions.checkArgument(
+ !(TableUtil.formatVersion(table) > 2),
+ "Dynamic Sink writer does not support upsert
mode in tables (V3+)");
+
if (!table.spec().isUnpartitioned()) {
for (PartitionField partitionField :
table.spec().fields()) {
Preconditions.checkState(
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
index c5a7ec4bee..e6ca9c5c74 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -48,21 +48,27 @@ public class TestFlinkUpsert extends CatalogTestBase {
@Parameter(index = 3)
private boolean isStreamingJob;
+ @Parameter(index = 4)
+ private int formatVersion;
+
private final Map<String, String> tableUpsertProps = Maps.newHashMap();
private TableEnvironment tEnv;
- @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2},
isStreaming={3}")
+ @Parameters(
+ name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3},
formatVersion={4} ")
public static List<Object[]> parameters() {
List<Object[]> parameters = Lists.newArrayList();
for (FileFormat format :
new FileFormat[] {FileFormat.PARQUET, FileFormat.AVRO,
FileFormat.ORC}) {
- for (Boolean isStreaming : new Boolean[] {true, false}) {
- // Only test with one catalog as this is a file operation concern.
- // FlinkCatalogTestBase requires the catalog name start with
testhadoop if using hadoop
- // catalog.
- String catalogName = "testhadoop";
- Namespace baseNamespace = Namespace.of("default");
- parameters.add(new Object[] {catalogName, baseNamespace, format,
isStreaming});
+ for (int version : org.apache.iceberg.TestHelpers.V2_AND_ABOVE) {
+ for (Boolean isStreaming : new Boolean[] {true, false}) {
+ // Only test with one catalog as this is a file operation concern.
+ // FlinkCatalogTestBase requires the catalog name start with
testhadoop if using hadoop
+ // catalog.
+ String catalogName = "testhadoop";
+ Namespace baseNamespace = Namespace.of("default");
+ parameters.add(new Object[] {catalogName, baseNamespace, format,
isStreaming, version});
+ }
}
}
return parameters;
@@ -98,7 +104,7 @@ public class TestFlinkUpsert extends CatalogTestBase {
sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
sql("USE CATALOG %s", catalogName);
sql("USE %s", DATABASE);
- tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");
+ tableUpsertProps.put(TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion));
tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true");
tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
index 0c7a47c232..c39e09fd86 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.maintenance.operator.TableChange;
import org.apache.iceberg.io.FileIO;
@@ -116,7 +117,8 @@ class TestCommittableToTableChangeConverter {
.addDeleteFiles(posDeleteFile, eqDeleteFile)
.build();
DeltaManifests deltaManifests =
- FlinkManifestUtil.writeCompletedFiles(writeResult, () ->
factory.create(1), table.spec());
+ FlinkManifestUtil.writeCompletedFiles(
+ writeResult, () -> factory.create(1), table.spec(),
TableUtil.formatVersion(table));
IcebergCommittable committable =
new IcebergCommittable(
SimpleVersionedSerialization.writeVersionAndSerialize(
@@ -297,7 +299,10 @@ class TestCommittableToTableChangeConverter {
.build();
DeltaManifests deltaManifests =
FlinkManifestUtil.writeCompletedFiles(
- writeResult, () -> factory.create(checkpointId), table.spec());
+ writeResult,
+ () -> factory.create(checkpointId),
+ table.spec(),
+ TableUtil.formatVersion(table));
IcebergCommittable committable =
new IcebergCommittable(
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index a21c51c378..89f642af1c 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestBase;
+import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.TestTables;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
@@ -80,11 +81,16 @@ public class TestDeltaTaskWriter extends TestBase {
private FileFormat format;
@Parameters(name = "formatVersion = {0}, fileFormat = {1}")
- protected static List<Object> parameters() {
- return Arrays.asList(
- new Object[] {2, FileFormat.AVRO},
- new Object[] {2, FileFormat.ORC},
- new Object[] {2, FileFormat.PARQUET});
+ protected static List<Object[]> parameters() {
+ List<Object[]> parameters = Lists.newArrayList();
+ for (FileFormat format :
+ new FileFormat[] {FileFormat.AVRO, FileFormat.ORC,
FileFormat.PARQUET}) {
+ for (int version : TestHelpers.V2_AND_ABOVE) {
+ parameters.add(new Object[] {version, format});
+ }
+ }
+
+ return parameters;
}
@Override
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index c6dc984513..4bbd523ec0 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.io.WriteResult;
@@ -105,7 +106,8 @@ public class TestFlinkManifest {
.addDeleteFiles(posDeleteFiles)
.build(),
() -> factory.create(curCkpId),
- table.spec());
+ table.spec(),
+ TableUtil.formatVersion(table));
WriteResult result =
FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(),
table.specs());
@@ -141,7 +143,8 @@ public class TestFlinkManifest {
FlinkManifestUtil.writeCompletedFiles(
WriteResult.builder().addDataFiles(dataFiles).build(),
() -> factory.create(checkpointId),
- table.spec());
+ table.spec(),
+ TableUtil.formatVersion(table));
assertThat(deltaManifests.dataManifest()).isNotNull();
assertThat(deltaManifests.deleteManifest()).isNull();
@@ -180,7 +183,8 @@ public class TestFlinkManifest {
.addDeleteFiles(posDeleteFiles)
.build(),
() -> factory.create(checkpointId),
- table.spec());
+ table.spec(),
+ TableUtil.formatVersion(table));
byte[] versionedSerializeData =
SimpleVersionedSerialization.writeVersionAndSerialize(