This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 4b7286cf0f Flink: Backport Preserve row lineage in RewriteDataFiles to
Flink 2.1 and 1.20 (#14520)
4b7286cf0f is described below
commit 4b7286cf0f0e8ce48104835ba747941883ac1570
Author: GuoYu <[email protected]>
AuthorDate: Thu Nov 6 21:27:51 2025 +0800
Flink: Backport Preserve row lineage in RewriteDataFiles to Flink 2.1 and
1.20 (#14520)
---
.../operator/DataFileRewritePlanner.java | 6 --
.../operator/DataFileRewriteRunner.java | 34 ++++--
.../flink/source/RowDataFileScanTaskReader.java | 9 +-
.../org/apache/iceberg/flink/SimpleDataUtil.java | 30 +++++-
.../maintenance/api/TestRewriteDataFiles.java | 115 +++++++++++++++++++++
.../maintenance/operator/OperatorTestBase.java | 76 ++++++++++++--
.../operator/TestDataFileRewritePlanner.java | 13 ---
.../operator/TestDataFileRewriteRunner.java | 20 ++++
.../operator/DataFileRewritePlanner.java | 6 --
.../operator/DataFileRewriteRunner.java | 34 ++++--
.../flink/source/RowDataFileScanTaskReader.java | 9 +-
.../org/apache/iceberg/flink/SimpleDataUtil.java | 30 +++++-
.../maintenance/api/TestRewriteDataFiles.java | 115 +++++++++++++++++++++
.../maintenance/operator/OperatorTestBase.java | 76 ++++++++++++--
.../operator/TestDataFileRewritePlanner.java | 13 ---
.../operator/TestDataFileRewriteRunner.java | 20 ++++
16 files changed, 512 insertions(+), 94 deletions(-)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
index 5403dfe19a..6751caeb28 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
@@ -29,8 +29,6 @@ import org.apache.flink.util.Collector;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.SerializableTable;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableUtil;
import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
import org.apache.iceberg.actions.FileRewritePlan;
import org.apache.iceberg.actions.RewriteDataFiles;
@@ -93,10 +91,6 @@ public class DataFileRewritePlanner
@Override
public void open(Configuration parameters) throws Exception {
tableLoader.open();
- Table table = tableLoader.loadTable();
- Preconditions.checkArgument(
- !TableUtil.supportsRowLineage(table),
- "Flink does not support compaction on row lineage enabled tables
(V3+)");
this.errorCounter =
TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName,
taskName, taskIndex)
.counter(TableMaintenanceMetrics.ERROR_COUNTER);
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
index c03b5cc1c8..1e8db128e9 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
@@ -28,11 +28,15 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.actions.RewriteFileGroup;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import
org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner.PlannedGroup;
@@ -108,8 +112,10 @@ public class DataFileRewriteRunner
value.group().rewrittenFiles().size());
}
- try (TaskWriter<RowData> writer = writerFor(value)) {
- try (DataIterator<RowData> iterator = readerFor(value)) {
+ boolean preserveRowId = TableUtil.supportsRowLineage(value.table());
+
+ try (TaskWriter<RowData> writer = writerFor(value, preserveRowId)) {
+ try (DataIterator<RowData> iterator = readerFor(value, preserveRowId)) {
while (iterator.hasNext()) {
writer.write(iterator.next());
}
@@ -169,30 +175,42 @@ public class DataFileRewriteRunner
}
}
- private TaskWriter<RowData> writerFor(PlannedGroup value) {
+ private TaskWriter<RowData> writerFor(PlannedGroup value, boolean
preserveRowId) {
String formatString =
PropertyUtil.propertyAsString(
value.table().properties(),
TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+ Schema writeSchema =
+ preserveRowId
+ ? MetadataColumns.schemaWithRowLineage(value.table().schema())
+ : value.table().schema();
+ RowType flinkWriteType = FlinkSchemaUtil.convert(writeSchema);
RowDataTaskWriterFactory factory =
new RowDataTaskWriterFactory(
- value.table(),
- FlinkSchemaUtil.convert(value.table().schema()),
+ value::table,
+ flinkWriteType,
value.group().inputSplitSize(),
FileFormat.fromString(formatString),
value.table().properties(),
null,
- false);
+ false,
+ writeSchema,
+ value.table().spec());
factory.initialize(subTaskId, attemptId);
return factory.create();
}
- private DataIterator<RowData> readerFor(PlannedGroup value) {
+ private DataIterator<RowData> readerFor(PlannedGroup value, boolean
preserveRowId) {
+ Schema projectedSchema =
+ preserveRowId
+ ? MetadataColumns.schemaWithRowLineage(value.table().schema())
+ : value.table().schema();
+
RowDataFileScanTaskReader reader =
new RowDataFileScanTaskReader(
value.table().schema(),
- value.table().schema(),
+ projectedSchema,
PropertyUtil.propertyAsString(value.table().properties(),
DEFAULT_NAME_MAPPING, null),
false,
Collections.emptyList());
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
index bf6f72cc28..b8fb1ba32e 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
@@ -46,7 +46,6 @@ import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PartitionUtil;
@@ -84,13 +83,7 @@ public class RowDataFileScanTaskReader implements
FileScanTaskReader<RowData> {
@Override
public CloseableIterator<RowData> open(
FileScanTask task, InputFilesDecryptor inputFilesDecryptor) {
- Schema partitionSchema = TypeUtil.select(projectedSchema,
task.spec().identitySourceIds());
-
- Map<Integer, ?> idToConstant =
- partitionSchema.columns().isEmpty()
- ? ImmutableMap.of()
- : PartitionUtil.constantsMap(task, RowDataUtil::convertConstant);
-
+ Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task,
RowDataUtil::convertConstant);
FlinkDeleteFilter deletes =
new FlinkDeleteFilter(task, tableSchema, projectedSchema,
inputFilesDecryptor);
CloseableIterable<RowData> iterable =
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index d3146d3f42..fc5bea17b4 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -89,6 +89,13 @@ public class SimpleDataUtil {
Types.NestedField.optional(2, "data", Types.StringType.get()),
Types.NestedField.optional(3, "extra", Types.StringType.get()));
+ public static final Schema SCHEMA3 =
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "data", Types.StringType.get()),
+ Types.NestedField.optional(3, "_row_id", Types.LongType.get()),
+ Types.NestedField.optional(4, "_last_updated_sequence_number",
Types.LongType.get()));
+
public static final ResolvedSchema FLINK_SCHEMA =
ResolvedSchema.of(
Column.physical("id", DataTypes.INT()), Column.physical("data",
DataTypes.STRING()));
@@ -100,6 +107,7 @@ public class SimpleDataUtil {
public static final Record RECORD = GenericRecord.create(SCHEMA);
public static final Record RECORD2 = GenericRecord.create(SCHEMA2);
+ public static final Record RECORD3 = GenericRecord.create(SCHEMA3);
public static Table createTable(
String path, Map<String, String> properties, boolean partitioned) {
@@ -127,6 +135,16 @@ public class SimpleDataUtil {
return record;
}
+ public static Record createRecordWithRowId(
+ Integer id, String data, Long rowId, Long lastUpdatedSequenceNumber) {
+ Record record = RECORD3.copy();
+ record.setField("id", id);
+ record.setField("data", data);
+ record.setField("_row_id", rowId);
+ record.setField("_last_updated_sequence_number",
lastUpdatedSequenceNumber);
+ return record;
+ }
+
public static RowData createRowData(Integer id, String data) {
return GenericRowData.of(id, StringData.fromString(data));
}
@@ -348,6 +366,11 @@ public class SimpleDataUtil {
public static void assertTableRecords(Table table, List<Record> expected,
String branch)
throws IOException {
+ assertTableRecords(table, expected, branch, table.schema());
+ }
+
+ public static void assertTableRecords(
+ Table table, List<Record> expected, String branch, Schema projectSchema)
throws IOException {
table.refresh();
Snapshot snapshot = latestSnapshot(table, branch);
@@ -360,12 +383,15 @@ public class SimpleDataUtil {
return;
}
- Types.StructType type = table.schema().asStruct();
+ Types.StructType type = projectSchema.asStruct();
StructLikeSet expectedSet = StructLikeSet.create(type);
expectedSet.addAll(expected);
try (CloseableIterable<Record> iterable =
-
IcebergGenerics.read(table).useSnapshot(snapshot.snapshotId()).build()) {
+ IcebergGenerics.read(table)
+ .useSnapshot(snapshot.snapshotId())
+ .project(projectSchema)
+ .build()) {
StructLikeSet actualSet = StructLikeSet.create(type);
for (Record record : iterable) {
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 795057e235..707038c925 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -33,6 +33,9 @@ import java.util.List;
import java.util.stream.StreamSupport;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.SimpleDataUtil;
@@ -79,6 +82,118 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase {
createRecord(4, "d")));
}
+ @Test
+ void testRewriteUnpartitionedPreserveLineage() throws Exception {
+ Table table = createTable(3);
+ insert(table, 1, "a");
+ insert(table, 2, "b");
+ insert(table, 3, "c");
+ insert(table, 4, "d");
+
+ assertFileNum(table, 4, 0);
+
+ appendRewriteDataFiles(
+ RewriteDataFiles.builder()
+ .parallelism(2)
+ .deleteFileThreshold(10)
+ .targetFileSizeBytes(1_000_000L)
+ .maxFileGroupSizeBytes(10_000_000L)
+ .maxFileSizeBytes(2_000_000L)
+ .minFileSizeBytes(500_000L)
+ .minInputFiles(2)
+ .partialProgressEnabled(true)
+ .partialProgressMaxCommits(1)
+ .maxRewriteBytes(100_000L)
+ .rewriteAll(false));
+
+ runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+ assertFileNum(table, 1, 0);
+
+ Schema schema = MetadataColumns.schemaWithRowLineage(table.schema());
+ SimpleDataUtil.assertTableRecords(
+ table,
+ ImmutableList.of(
+ SimpleDataUtil.createRecordWithRowId(1, "a", 0L, 1L),
+ SimpleDataUtil.createRecordWithRowId(2, "b", 1L, 2L),
+ SimpleDataUtil.createRecordWithRowId(3, "c", 2L, 3L),
+ SimpleDataUtil.createRecordWithRowId(4, "d", 3L, 4L)),
+ SnapshotRef.MAIN_BRANCH,
+ schema);
+ }
+
+ @Test
+ void testRewriteTheSameFilePreserveLineage() throws Exception {
+ Table table = createTable(3);
+ insert(table, 1, "a");
+ insert(table, 2, "b");
+ // Create a file with two lines of data to verify that the rowid is read
correctly.
+ insert(
+ table,
+ ImmutableList.of(SimpleDataUtil.createRecord(3, "c"),
SimpleDataUtil.createRecord(4, "d")));
+
+ assertFileNum(table, 3, 0);
+
+ appendRewriteDataFiles(
+ RewriteDataFiles.builder()
+ .parallelism(2)
+ .deleteFileThreshold(10)
+ .targetFileSizeBytes(1_000_000L)
+ .maxFileGroupSizeBytes(10_000_000L)
+ .maxFileSizeBytes(2_000_000L)
+ .minFileSizeBytes(500_000L)
+ .minInputFiles(2)
+ .partialProgressEnabled(true)
+ .partialProgressMaxCommits(1)
+ .maxRewriteBytes(100_000L)
+ .rewriteAll(false));
+
+ runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+ assertFileNum(table, 1, 0);
+
+ Schema schema = MetadataColumns.schemaWithRowLineage(table.schema());
+ SimpleDataUtil.assertTableRecords(
+ table,
+ ImmutableList.of(
+ SimpleDataUtil.createRecordWithRowId(1, "a", 0L, 1L),
+ SimpleDataUtil.createRecordWithRowId(2, "b", 1L, 2L),
+ // The Ids 3 and 4 come from the same file, so the last updated
sequence number should
+ // be the same.
+ SimpleDataUtil.createRecordWithRowId(3, "c", 2L, 3L),
+ SimpleDataUtil.createRecordWithRowId(4, "d", 3L, 3L)),
+ SnapshotRef.MAIN_BRANCH,
+ schema);
+ }
+
+ @Test
+ void testRewritePartitionedPreserveLineage() throws Exception {
+ Table table = createPartitionedTable(3);
+ insertPartitioned(table, 1, "p1");
+ insertPartitioned(table, 2, "p1");
+ insertPartitioned(table, 3, "p2");
+ insertPartitioned(table, 4, "p2");
+
+ assertFileNum(table, 4, 0);
+
+ appendRewriteDataFiles();
+
+ runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+ assertFileNum(table, 2, 0);
+
+ Schema schema = MetadataColumns.schemaWithRowLineage(table.schema());
+ SimpleDataUtil.assertTableRecords(
+ table,
+ ImmutableList.of(
+ SimpleDataUtil.createRecordWithRowId(1, "p1", 0L, 1L),
+ SimpleDataUtil.createRecordWithRowId(2, "p1", 1L, 2L),
+ SimpleDataUtil.createRecordWithRowId(3, "p2", 2L, 3L),
+ SimpleDataUtil.createRecordWithRowId(4, "p2", 3L, 4L)),
+ SnapshotRef.MAIN_BRANCH,
+ schema);
+ }
+
@Test
void testRewritePartitioned() throws Exception {
Table table = createPartitionedTable();
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
index 8460b392e2..5eecc5a803 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
@@ -24,6 +24,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.execution.JobClient;
@@ -49,6 +50,7 @@ import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.flink.HadoopCatalogExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
@@ -122,11 +124,10 @@ public class OperatorTestBase {
}
protected static Table createTable() {
- // only test V2 tables as compaction doesn't support V3 with row lineage
- return createTable("2");
+ return createTable(2);
}
- protected static Table createTable(String formatVersion) {
+ protected static Table createTable(int formatVersion) {
return CATALOG_EXTENSION
.catalog()
.createTable(
@@ -136,12 +137,16 @@ public class OperatorTestBase {
null,
ImmutableMap.of(
TableProperties.FORMAT_VERSION,
- formatVersion,
+ String.valueOf(formatVersion),
"flink.max-continuous-empty-commits",
"100000"));
}
protected static Table createTableWithDelete() {
+ return createTableWithDelete(2);
+ }
+
+ protected static Table createTableWithDelete(int formatVersion) {
return CATALOG_EXTENSION
.catalog()
.createTable(
@@ -149,10 +154,11 @@ public class OperatorTestBase {
SCHEMA_WITH_PRIMARY_KEY,
PartitionSpec.unpartitioned(),
null,
- ImmutableMap.of("format-version", "2", "write.upsert.enabled",
"true"));
+ ImmutableMap.of(
+ "format-version", String.valueOf(formatVersion),
"write.upsert.enabled", "true"));
}
- protected static Table createPartitionedTable() {
+ protected static Table createPartitionedTable(int formatVersion) {
return CATALOG_EXTENSION
.catalog()
.createTable(
@@ -160,7 +166,15 @@ public class OperatorTestBase {
SimpleDataUtil.SCHEMA,
PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build(),
null,
- ImmutableMap.of("format-version", "2",
"flink.max-continuous-empty-commits", "100000"));
+ ImmutableMap.of(
+ "format-version",
+ String.valueOf(formatVersion),
+ "flink.max-continuous-empty-commits",
+ "100000"));
+ }
+
+ protected static Table createPartitionedTable() {
+ return createPartitionedTable(2);
}
protected void insert(Table table, Integer id, String data) throws
IOException {
@@ -169,6 +183,11 @@ public class OperatorTestBase {
table.refresh();
}
+ protected void insert(Table table, List<Record> records) throws IOException {
+ new GenericAppenderHelper(table, FileFormat.PARQUET,
warehouseDir).appendToTable(records);
+ table.refresh();
+ }
+
protected void insert(Table table, Integer id, String data, String extra)
throws IOException {
new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
.appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id,
data, extra)));
@@ -195,6 +214,35 @@ public class OperatorTestBase {
table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).commit();
}
+ /**
+ * For the same identifier column id this methods simulate the following row
operations: <tr>
+ * <li>add an equality delete on oldData
+ * <li>insert tempData
+ * <li>add a position delete on tempData
+ * <li>insert newData </tr>
+ *
+ * @param table to modify
+ * @param id the identifier column id
+ * @param oldData the old data to be deleted
+ * @param tempData the temp data to be inserted and deleted with a position
delete
+ * @param newData the new data to be inserted
+ * @param formatVersion the format version to use
+ */
+ protected void update(
+ Table table, Integer id, String oldData, String tempData, String
newData, int formatVersion)
+ throws IOException {
+ DataFile dataFile =
+ new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+ .writeFile(
+ Lists.newArrayList(
+ SimpleDataUtil.createRecord(id, tempData),
+ SimpleDataUtil.createRecord(id, newData)));
+ DeleteFile eqDelete = writeEqualityDelete(table, id, oldData);
+ DeleteFile posDelete = writePosDelete(table, dataFile.path(), 0, id,
tempData, formatVersion);
+
+
table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).addDeletes(posDelete).commit();
+ }
+
/**
* For the same identifier column id this methods simulate the following row
operations: <tr>
* <li>add an equality delete on oldData
@@ -217,7 +265,7 @@ public class OperatorTestBase {
SimpleDataUtil.createRecord(id, tempData),
SimpleDataUtil.createRecord(id, newData)));
DeleteFile eqDelete = writeEqualityDelete(table, id, oldData);
- DeleteFile posDelete = writePosDelete(table, dataFile.path(), 0, id,
tempData);
+ DeleteFile posDelete = writePosDelete(table, dataFile.path(), 0, id,
tempData, 2);
table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).addDeletes(posDelete).commit();
}
@@ -237,6 +285,13 @@ public class OperatorTestBase {
table.refresh();
}
+ protected void insertPartitioned(Table table, List<Record> records, String
partition)
+ throws IOException {
+ new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+ .appendToTable(TestHelpers.Row.of(partition), records);
+ table.refresh();
+ }
+
protected void dropTable() {
CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
}
@@ -332,7 +387,8 @@ public class OperatorTestBase {
}
private DeleteFile writePosDelete(
- Table table, CharSequence path, Integer pos, Integer id, String oldData)
throws IOException {
+ Table table, CharSequence path, Integer pos, Integer id, String oldData,
int formatVersion)
+ throws IOException {
File file = File.createTempFile("junit", null, warehouseDir.toFile());
assertThat(file.delete()).isTrue();
PositionDelete<GenericRecord> posDelete = PositionDelete.create();
@@ -341,7 +397,7 @@ public class OperatorTestBase {
nested.set(1, oldData);
posDelete.set(path, pos, nested);
return FileHelpers.writePosDeleteFile(
- table, Files.localOutput(file), null, Lists.newArrayList(posDelete));
+ table, Files.localOutput(file), null, Lists.newArrayList(posDelete),
formatVersion);
}
static void trigger(OneInputStreamOperatorTestHarness<Trigger, ?> harness)
throws Exception {
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
index 2d83f553e5..9f4f96e106 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
@@ -22,7 +22,6 @@ import static
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_F
import static
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.newDataFiles;
import static
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.util.List;
import java.util.Set;
@@ -41,18 +40,6 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.junit.jupiter.api.Test;
class TestDataFileRewritePlanner extends OperatorTestBase {
- @Test
- void testFailsOnV3Table() throws Exception {
- Table table = createTable("3");
- Set<DataFile> expected = Sets.newHashSetWithExpectedSize(3);
- insert(table, 1, "a");
- expected.addAll(newDataFiles(table));
-
- assertThatThrownBy(() -> planDataFileRewrite(tableLoader()))
- .hasMessageContaining(
- "Flink does not support compaction on row lineage enabled tables
(V3+)")
- .isInstanceOf(IllegalArgumentException.class);
- }
@Test
void testUnpartitioned() throws Exception {
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
index 3c5a103287..4e21c7a956 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
@@ -248,6 +248,26 @@ class TestDataFileRewriteRunner extends OperatorTestBase {
ImmutableSet.of(new
PartitionData(PartitionSpec.unpartitioned().partitionType())));
}
+ @Test
+ void testV3Table() throws Exception {
+ Table table = createTableWithDelete(3);
+ update(table, 1, null, "a", "b", 3);
+ update(table, 1, "b", "c");
+
+ List<DataFileRewritePlanner.PlannedGroup> planned =
planDataFileRewrite(tableLoader());
+ assertThat(planned).hasSize(1);
+
+ List<DataFileRewriteRunner.ExecutedGroup> actual = executeRewrite(planned);
+ assertThat(actual).hasSize(1);
+
+ assertRewriteFileGroup(
+ actual.get(0),
+ table,
+ records(table.schema(), ImmutableSet.of(ImmutableList.of(1, "c"))),
+ 1,
+ ImmutableSet.of(new
PartitionData(PartitionSpec.unpartitioned().partitionType())));
+ }
+
@Test
void testSplitSize() throws Exception {
Table table = createTable();
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
index 81db62e8bf..c50060e16a 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
@@ -29,8 +29,6 @@ import org.apache.flink.util.Collector;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.SerializableTable;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableUtil;
import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
import org.apache.iceberg.actions.FileRewritePlan;
import org.apache.iceberg.actions.RewriteDataFiles;
@@ -93,10 +91,6 @@ public class DataFileRewritePlanner
@Override
public void open(OpenContext context) throws Exception {
tableLoader.open();
- Table table = tableLoader.loadTable();
- Preconditions.checkArgument(
- !TableUtil.supportsRowLineage(table),
- "Flink does not support compaction on row lineage enabled tables
(V3+)");
this.errorCounter =
TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName,
taskName, taskIndex)
.counter(TableMaintenanceMetrics.ERROR_COUNTER);
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
index ad3b045400..57b0e53d86 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
@@ -28,11 +28,15 @@ import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.actions.RewriteFileGroup;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import
org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner.PlannedGroup;
@@ -108,8 +112,10 @@ public class DataFileRewriteRunner
value.group().rewrittenFiles().size());
}
- try (TaskWriter<RowData> writer = writerFor(value)) {
- try (DataIterator<RowData> iterator = readerFor(value)) {
+ boolean preserveRowId = TableUtil.supportsRowLineage(value.table());
+
+ try (TaskWriter<RowData> writer = writerFor(value, preserveRowId)) {
+ try (DataIterator<RowData> iterator = readerFor(value, preserveRowId)) {
while (iterator.hasNext()) {
writer.write(iterator.next());
}
@@ -169,30 +175,42 @@ public class DataFileRewriteRunner
}
}
- private TaskWriter<RowData> writerFor(PlannedGroup value) {
+ private TaskWriter<RowData> writerFor(PlannedGroup value, boolean
preserveRowId) {
String formatString =
PropertyUtil.propertyAsString(
value.table().properties(),
TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+ Schema writeSchema =
+ preserveRowId
+ ? MetadataColumns.schemaWithRowLineage(value.table().schema())
+ : value.table().schema();
+ RowType flinkWriteType = FlinkSchemaUtil.convert(writeSchema);
RowDataTaskWriterFactory factory =
new RowDataTaskWriterFactory(
- value.table(),
- FlinkSchemaUtil.convert(value.table().schema()),
+ value::table,
+ flinkWriteType,
value.group().inputSplitSize(),
FileFormat.fromString(formatString),
value.table().properties(),
null,
- false);
+ false,
+ writeSchema,
+ value.table().spec());
factory.initialize(subTaskId, attemptId);
return factory.create();
}
- private DataIterator<RowData> readerFor(PlannedGroup value) {
+ private DataIterator<RowData> readerFor(PlannedGroup value, boolean
preserveRowId) {
+ Schema projectedSchema =
+ preserveRowId
+ ? MetadataColumns.schemaWithRowLineage(value.table().schema())
+ : value.table().schema();
+
RowDataFileScanTaskReader reader =
new RowDataFileScanTaskReader(
value.table().schema(),
- value.table().schema(),
+ projectedSchema,
PropertyUtil.propertyAsString(value.table().properties(),
DEFAULT_NAME_MAPPING, null),
false,
Collections.emptyList());
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
index bf6f72cc28..b8fb1ba32e 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
@@ -46,7 +46,6 @@ import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PartitionUtil;
@@ -84,13 +83,7 @@ public class RowDataFileScanTaskReader implements
FileScanTaskReader<RowData> {
@Override
public CloseableIterator<RowData> open(
FileScanTask task, InputFilesDecryptor inputFilesDecryptor) {
- Schema partitionSchema = TypeUtil.select(projectedSchema,
task.spec().identitySourceIds());
-
- Map<Integer, ?> idToConstant =
- partitionSchema.columns().isEmpty()
- ? ImmutableMap.of()
- : PartitionUtil.constantsMap(task, RowDataUtil::convertConstant);
-
+ Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task,
RowDataUtil::convertConstant);
FlinkDeleteFilter deletes =
new FlinkDeleteFilter(task, tableSchema, projectedSchema,
inputFilesDecryptor);
CloseableIterable<RowData> iterable =
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index 6db2b79f77..542376b06c 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -89,6 +89,13 @@ public class SimpleDataUtil {
Types.NestedField.optional(2, "data", Types.StringType.get()),
Types.NestedField.optional(3, "extra", Types.StringType.get()));
+ public static final Schema SCHEMA3 =
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "data", Types.StringType.get()),
+ Types.NestedField.optional(3, "_row_id", Types.LongType.get()),
+ Types.NestedField.optional(4, "_last_updated_sequence_number",
Types.LongType.get()));
+
public static final ResolvedSchema FLINK_SCHEMA =
ResolvedSchema.of(
Column.physical("id", DataTypes.INT()), Column.physical("data",
DataTypes.STRING()));
@@ -100,6 +107,7 @@ public class SimpleDataUtil {
public static final Record RECORD = GenericRecord.create(SCHEMA);
public static final Record RECORD2 = GenericRecord.create(SCHEMA2);
+ public static final Record RECORD3 = GenericRecord.create(SCHEMA3);
public static Table createTable(
String path, Map<String, String> properties, boolean partitioned) {
@@ -127,6 +135,16 @@ public class SimpleDataUtil {
return record;
}
+ public static Record createRecordWithRowId(
+ Integer id, String data, Long rowId, Long lastUpdatedSequenceNumber) {
+ Record record = RECORD3.copy();
+ record.setField("id", id);
+ record.setField("data", data);
+ record.setField("_row_id", rowId);
+ record.setField("_last_updated_sequence_number",
lastUpdatedSequenceNumber);
+ return record;
+ }
+
public static RowData createRowData(Integer id, String data) {
return GenericRowData.of(id, StringData.fromString(data));
}
@@ -348,6 +366,11 @@ public class SimpleDataUtil {
public static void assertTableRecords(Table table, List<Record> expected,
String branch)
throws IOException {
+ assertTableRecords(table, expected, branch, table.schema());
+ }
+
+ public static void assertTableRecords(
+ Table table, List<Record> expected, String branch, Schema projectSchema)
throws IOException {
table.refresh();
Snapshot snapshot = latestSnapshot(table, branch);
@@ -360,12 +383,15 @@ public class SimpleDataUtil {
return;
}
- Types.StructType type = table.schema().asStruct();
+ Types.StructType type = projectSchema.asStruct();
StructLikeSet expectedSet = StructLikeSet.create(type);
expectedSet.addAll(expected);
try (CloseableIterable<Record> iterable =
-
IcebergGenerics.read(table).useSnapshot(snapshot.snapshotId()).build()) {
+ IcebergGenerics.read(table)
+ .useSnapshot(snapshot.snapshotId())
+ .project(projectSchema)
+ .build()) {
StructLikeSet actualSet = StructLikeSet.create(type);
for (Record record : iterable) {
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 795057e235..707038c925 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -33,6 +33,9 @@ import java.util.List;
import java.util.stream.StreamSupport;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.SimpleDataUtil;
@@ -79,6 +82,118 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase {
createRecord(4, "d")));
}
+ @Test
+ void testRewriteUnpartitionedPreserveLineage() throws Exception {
+ Table table = createTable(3);
+ insert(table, 1, "a");
+ insert(table, 2, "b");
+ insert(table, 3, "c");
+ insert(table, 4, "d");
+
+ assertFileNum(table, 4, 0);
+
+ appendRewriteDataFiles(
+ RewriteDataFiles.builder()
+ .parallelism(2)
+ .deleteFileThreshold(10)
+ .targetFileSizeBytes(1_000_000L)
+ .maxFileGroupSizeBytes(10_000_000L)
+ .maxFileSizeBytes(2_000_000L)
+ .minFileSizeBytes(500_000L)
+ .minInputFiles(2)
+ .partialProgressEnabled(true)
+ .partialProgressMaxCommits(1)
+ .maxRewriteBytes(100_000L)
+ .rewriteAll(false));
+
+ runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+ assertFileNum(table, 1, 0);
+
+ Schema schema = MetadataColumns.schemaWithRowLineage(table.schema());
+ SimpleDataUtil.assertTableRecords(
+ table,
+ ImmutableList.of(
+ SimpleDataUtil.createRecordWithRowId(1, "a", 0L, 1L),
+ SimpleDataUtil.createRecordWithRowId(2, "b", 1L, 2L),
+ SimpleDataUtil.createRecordWithRowId(3, "c", 2L, 3L),
+ SimpleDataUtil.createRecordWithRowId(4, "d", 3L, 4L)),
+ SnapshotRef.MAIN_BRANCH,
+ schema);
+ }
+
+ @Test
+ void testRewriteTheSameFilePreserveLineage() throws Exception {
+ Table table = createTable(3);
+ insert(table, 1, "a");
+ insert(table, 2, "b");
+ // Create a file with two lines of data to verify that the rowid is read
correctly.
+ insert(
+ table,
+ ImmutableList.of(SimpleDataUtil.createRecord(3, "c"),
SimpleDataUtil.createRecord(4, "d")));
+
+ assertFileNum(table, 3, 0);
+
+ appendRewriteDataFiles(
+ RewriteDataFiles.builder()
+ .parallelism(2)
+ .deleteFileThreshold(10)
+ .targetFileSizeBytes(1_000_000L)
+ .maxFileGroupSizeBytes(10_000_000L)
+ .maxFileSizeBytes(2_000_000L)
+ .minFileSizeBytes(500_000L)
+ .minInputFiles(2)
+ .partialProgressEnabled(true)
+ .partialProgressMaxCommits(1)
+ .maxRewriteBytes(100_000L)
+ .rewriteAll(false));
+
+ runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+ assertFileNum(table, 1, 0);
+
+ Schema schema = MetadataColumns.schemaWithRowLineage(table.schema());
+ SimpleDataUtil.assertTableRecords(
+ table,
+ ImmutableList.of(
+ SimpleDataUtil.createRecordWithRowId(1, "a", 0L, 1L),
+ SimpleDataUtil.createRecordWithRowId(2, "b", 1L, 2L),
+ // The Ids 3 and 4 come from the same file, so the last updated
sequence number should
+ // be the same.
+ SimpleDataUtil.createRecordWithRowId(3, "c", 2L, 3L),
+ SimpleDataUtil.createRecordWithRowId(4, "d", 3L, 3L)),
+ SnapshotRef.MAIN_BRANCH,
+ schema);
+ }
+
+ @Test
+ void testRewritePartitionedPreserveLineage() throws Exception {
+ Table table = createPartitionedTable(3);
+ insertPartitioned(table, 1, "p1");
+ insertPartitioned(table, 2, "p1");
+ insertPartitioned(table, 3, "p2");
+ insertPartitioned(table, 4, "p2");
+
+ assertFileNum(table, 4, 0);
+
+ appendRewriteDataFiles();
+
+ runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+ assertFileNum(table, 2, 0);
+
+ Schema schema = MetadataColumns.schemaWithRowLineage(table.schema());
+ SimpleDataUtil.assertTableRecords(
+ table,
+ ImmutableList.of(
+ SimpleDataUtil.createRecordWithRowId(1, "p1", 0L, 1L),
+ SimpleDataUtil.createRecordWithRowId(2, "p1", 1L, 2L),
+ SimpleDataUtil.createRecordWithRowId(3, "p2", 2L, 3L),
+ SimpleDataUtil.createRecordWithRowId(4, "p2", 3L, 4L)),
+ SnapshotRef.MAIN_BRANCH,
+ schema);
+ }
+
@Test
void testRewritePartitioned() throws Exception {
Table table = createPartitionedTable();
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
index f9cbc9715c..b9422a63d6 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
@@ -24,6 +24,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
@@ -49,6 +50,7 @@ import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.flink.HadoopCatalogExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
@@ -122,11 +124,10 @@ public class OperatorTestBase {
}
protected static Table createTable() {
- // only test V2 tables as compaction doesn't support V3 with row lineage
- return createTable("2");
+ return createTable(2);
}
- protected static Table createTable(String formatVersion) {
+ protected static Table createTable(int formatVersion) {
return CATALOG_EXTENSION
.catalog()
.createTable(
@@ -136,12 +137,16 @@ public class OperatorTestBase {
null,
ImmutableMap.of(
TableProperties.FORMAT_VERSION,
- formatVersion,
+ String.valueOf(formatVersion),
"flink.max-continuous-empty-commits",
"100000"));
}
protected static Table createTableWithDelete() {
+ return createTableWithDelete(2);
+ }
+
+ protected static Table createTableWithDelete(int formatVersion) {
return CATALOG_EXTENSION
.catalog()
.createTable(
@@ -149,10 +154,11 @@ public class OperatorTestBase {
SCHEMA_WITH_PRIMARY_KEY,
PartitionSpec.unpartitioned(),
null,
- ImmutableMap.of("format-version", "2", "write.upsert.enabled",
"true"));
+ ImmutableMap.of(
+ "format-version", String.valueOf(formatVersion),
"write.upsert.enabled", "true"));
}
- protected static Table createPartitionedTable() {
+ protected static Table createPartitionedTable(int formatVersion) {
return CATALOG_EXTENSION
.catalog()
.createTable(
@@ -160,7 +166,15 @@ public class OperatorTestBase {
SimpleDataUtil.SCHEMA,
PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build(),
null,
- ImmutableMap.of("format-version", "2",
"flink.max-continuous-empty-commits", "100000"));
+ ImmutableMap.of(
+ "format-version",
+ String.valueOf(formatVersion),
+ "flink.max-continuous-empty-commits",
+ "100000"));
+ }
+
+ protected static Table createPartitionedTable() {
+ return createPartitionedTable(2);
}
protected void insert(Table table, Integer id, String data) throws
IOException {
@@ -169,6 +183,11 @@ public class OperatorTestBase {
table.refresh();
}
+ protected void insert(Table table, List<Record> records) throws IOException {
+ new GenericAppenderHelper(table, FileFormat.PARQUET,
warehouseDir).appendToTable(records);
+ table.refresh();
+ }
+
protected void insert(Table table, Integer id, String data, String extra)
throws IOException {
new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
.appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id,
data, extra)));
@@ -195,6 +214,35 @@ public class OperatorTestBase {
table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).commit();
}
+ /**
+ * For the same identifier column id this methods simulate the following row
operations: <tr>
+ * <li>add an equality delete on oldData
+ * <li>insert tempData
+ * <li>add a position delete on tempData
+ * <li>insert newData </tr>
+ *
+ * @param table to modify
+ * @param id the identifier column id
+ * @param oldData the old data to be deleted
+ * @param tempData the temp data to be inserted and deleted with a position
delete
+ * @param newData the new data to be inserted
+ * @param formatVersion the format version to use
+ */
+ protected void update(
+ Table table, Integer id, String oldData, String tempData, String
newData, int formatVersion)
+ throws IOException {
+ DataFile dataFile =
+ new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+ .writeFile(
+ Lists.newArrayList(
+ SimpleDataUtil.createRecord(id, tempData),
+ SimpleDataUtil.createRecord(id, newData)));
+ DeleteFile eqDelete = writeEqualityDelete(table, id, oldData);
+ DeleteFile posDelete = writePosDelete(table, dataFile.path(), 0, id,
tempData, formatVersion);
+
+
table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).addDeletes(posDelete).commit();
+ }
+
/**
* For the same identifier column id this methods simulate the following row
operations: <tr>
* <li>add an equality delete on oldData
@@ -217,7 +265,7 @@ public class OperatorTestBase {
SimpleDataUtil.createRecord(id, tempData),
SimpleDataUtil.createRecord(id, newData)));
DeleteFile eqDelete = writeEqualityDelete(table, id, oldData);
- DeleteFile posDelete = writePosDelete(table, dataFile.path(), 0, id,
tempData);
+ DeleteFile posDelete = writePosDelete(table, dataFile.path(), 0, id,
tempData, 2);
table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).addDeletes(posDelete).commit();
}
@@ -237,6 +285,13 @@ public class OperatorTestBase {
table.refresh();
}
+ protected void insertPartitioned(Table table, List<Record> records, String
partition)
+ throws IOException {
+ new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+ .appendToTable(TestHelpers.Row.of(partition), records);
+ table.refresh();
+ }
+
protected void dropTable() {
CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
}
@@ -329,7 +384,8 @@ public class OperatorTestBase {
}
private DeleteFile writePosDelete(
- Table table, CharSequence path, Integer pos, Integer id, String oldData)
throws IOException {
+ Table table, CharSequence path, Integer pos, Integer id, String oldData,
int formatVersion)
+ throws IOException {
File file = File.createTempFile("junit", null, warehouseDir.toFile());
assertThat(file.delete()).isTrue();
PositionDelete<GenericRecord> posDelete = PositionDelete.create();
@@ -338,7 +394,7 @@ public class OperatorTestBase {
nested.set(1, oldData);
posDelete.set(path, pos, nested);
return FileHelpers.writePosDeleteFile(
- table, Files.localOutput(file), null, Lists.newArrayList(posDelete));
+ table, Files.localOutput(file), null, Lists.newArrayList(posDelete),
formatVersion);
}
static void trigger(OneInputStreamOperatorTestHarness<Trigger, ?> harness)
throws Exception {
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
index 2d83f553e5..9f4f96e106 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
@@ -22,7 +22,6 @@ import static
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_F
import static
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.newDataFiles;
import static
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.util.List;
import java.util.Set;
@@ -41,18 +40,6 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.junit.jupiter.api.Test;
class TestDataFileRewritePlanner extends OperatorTestBase {
- @Test
- void testFailsOnV3Table() throws Exception {
- Table table = createTable("3");
- Set<DataFile> expected = Sets.newHashSetWithExpectedSize(3);
- insert(table, 1, "a");
- expected.addAll(newDataFiles(table));
-
- assertThatThrownBy(() -> planDataFileRewrite(tableLoader()))
- .hasMessageContaining(
- "Flink does not support compaction on row lineage enabled tables
(V3+)")
- .isInstanceOf(IllegalArgumentException.class);
- }
@Test
void testUnpartitioned() throws Exception {
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
index 3c5a103287..4e21c7a956 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
@@ -248,6 +248,26 @@ class TestDataFileRewriteRunner extends OperatorTestBase {
ImmutableSet.of(new
PartitionData(PartitionSpec.unpartitioned().partitionType())));
}
+ @Test
+ void testV3Table() throws Exception {
+ Table table = createTableWithDelete(3);
+ update(table, 1, null, "a", "b", 3);
+ update(table, 1, "b", "c");
+
+ List<DataFileRewritePlanner.PlannedGroup> planned =
planDataFileRewrite(tableLoader());
+ assertThat(planned).hasSize(1);
+
+ List<DataFileRewriteRunner.ExecutedGroup> actual = executeRewrite(planned);
+ assertThat(actual).hasSize(1);
+
+ assertRewriteFileGroup(
+ actual.get(0),
+ table,
+ records(table.schema(), ImmutableSet.of(ImmutableList.of(1, "c"))),
+ 1,
+ ImmutableSet.of(new
PartitionData(PartitionSpec.unpartitioned().partitionType())));
+ }
+
@Test
void testSplitSize() throws Exception {
Table table = createTable();