This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 8a936637a [lake/iceberg] implement lake writer for iceberg pk table
(#1555)
8a936637a is described below
commit 8a936637a4b023fa827a73ee6474d894b2f6ebe0
Author: xx789 <[email protected]>
AuthorDate: Tue Aug 19 11:56:10 2025 +0800
[lake/iceberg] implement lake writer for iceberg pk table (#1555)
---
.../lake/iceberg/tiering/IcebergLakeCommitter.java | 68 +++++++---
.../lake/iceberg/tiering/IcebergLakeWriter.java | 85 ++++++++++--
.../{append => writer}/AppendOnlyWriter.java | 47 ++-----
.../iceberg/tiering/writer/DeltaTaskWriter.java | 112 ++++++++++++++++
.../tiering/writer/GenericTaskDeltaWriter.java | 80 ++++++++++++
.../fluss/lake/iceberg/IcebergTieringTest.java | 145 +++++++++++++++++----
6 files changed, 443 insertions(+), 94 deletions(-)
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
index 6353fb06c..d3009ee0e 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
@@ -28,7 +28,10 @@ import com.alibaba.fluss.utils.json.BucketOffsetJsonSerde;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -40,6 +43,7 @@ import org.apache.iceberg.io.WriteResult;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -77,6 +81,10 @@ public class IcebergLakeCommitter implements
LakeCommitter<IcebergWriteResult, I
for (DataFile dataFile : writeResult.dataFiles()) {
builder.addDataFile(dataFile);
}
+ // Add delete files
+ for (DeleteFile deleteFile : writeResult.deleteFiles()) {
+ builder.addDeleteFile(deleteFile);
+ }
}
return builder.build();
@@ -88,22 +96,36 @@ public class IcebergLakeCommitter implements
LakeCommitter<IcebergWriteResult, I
try {
// Refresh table to get latest metadata
icebergTable.refresh();
- // Simple append-only case: only data files, no delete files or
compaction
- AppendFiles appendFiles = icebergTable.newAppend();
- for (DataFile dataFile : committable.getDataFiles()) {
- appendFiles.appendFile(dataFile);
- }
- if (!committable.getDeleteFiles().isEmpty()) {
- throw new IllegalStateException(
- "Delete files are not supported in append-only mode. "
- + "Found "
- + committable.getDeleteFiles().size()
- + " delete files.");
- }
- addFlussProperties(appendFiles, snapshotProperties);
+ if (committable.getDeleteFiles().isEmpty()) {
+ // Simple append-only case: only data files, no delete files
or compaction
+ AppendFiles appendFiles = icebergTable.newAppend();
+ for (DataFile dataFile : committable.getDataFiles()) {
+ appendFiles.appendFile(dataFile);
+ }
- appendFiles.commit();
+ addFlussProperties(appendFiles, snapshotProperties);
+
+ appendFiles.commit();
+ } else {
+ /**
+ * Row delta validations are not needed for streaming changes
that write equality
+ * deletes. Equality deletes are applied to data in all
previous sequence numbers,
+ * so retries may push deletes further in the future, but do
not affect correctness.
+ * Position deletes committed to the table in this path are
used only to delete rows
+ * from data files that are being added in this commit. There
is no way for data
+ * files added along with the delete files to be concurrently
removed, so there is
+ * no need to validate the files referenced by the position
delete files that are
+ * being committed.
+ */
+ RowDelta rowDelta = icebergTable.newRowDelta();
+
Arrays.stream(committable.getDataFiles().stream().toArray(DataFile[]::new))
+ .forEach(rowDelta::addRows);
+
Arrays.stream(committable.getDeleteFiles().stream().toArray(DeleteFile[]::new))
+ .forEach(rowDelta::addDeletes);
+ snapshotProperties.forEach(rowDelta::set);
+ rowDelta.commit();
+ }
Long commitSnapshotId = currentCommitSnapshotId.get();
currentCommitSnapshotId.remove();
@@ -116,20 +138,26 @@ public class IcebergLakeCommitter implements
LakeCommitter<IcebergWriteResult, I
}
private void addFlussProperties(
- AppendFiles appendFiles, Map<String, String> snapshotProperties) {
- appendFiles.set("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
+ SnapshotUpdate<?> operation, Map<String, String>
snapshotProperties) {
+ operation.set("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
for (Map.Entry<String, String> entry : snapshotProperties.entrySet()) {
- appendFiles.set(entry.getKey(), entry.getValue());
+ operation.set(entry.getKey(), entry.getValue());
}
}
@Override
public void abort(IcebergCommittable committable) {
- List<String> filesToDelete =
+ List<String> dataFilesToDelete =
committable.getDataFiles().stream()
- .map(dataFile -> dataFile.path().toString())
+ .map(file -> file.path().toString())
+ .collect(Collectors.toList());
+ CatalogUtil.deleteFiles(icebergTable.io(), dataFilesToDelete, "data
file", true);
+
+ List<String> deleteFilesToDelete =
+ committable.getDeleteFiles().stream()
+ .map(file -> file.path().toString())
.collect(Collectors.toList());
- CatalogUtil.deleteFiles(icebergTable.io(), filesToDelete, "data file",
true);
+ CatalogUtil.deleteFiles(icebergTable.io(), deleteFilesToDelete,
"delete file", true);
}
@Nullable
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
index e75378941..dd8e55e37 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
@@ -17,21 +17,32 @@
package com.alibaba.fluss.lake.iceberg.tiering;
-import com.alibaba.fluss.lake.iceberg.tiering.append.AppendOnlyWriter;
+import com.alibaba.fluss.lake.iceberg.tiering.writer.AppendOnlyWriter;
+import com.alibaba.fluss.lake.iceberg.tiering.writer.DeltaTaskWriter;
import com.alibaba.fluss.lake.writer.LakeWriter;
import com.alibaba.fluss.lake.writer.WriterInitContext;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.record.LogRecord;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.util.PropertyUtil;
import java.io.Closeable;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import static
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
/** Implementation of {@link LakeWriter} for Iceberg. */
public class IcebergLakeWriter implements LakeWriter<IcebergWriteResult> {
@@ -52,19 +63,51 @@ public class IcebergLakeWriter implements
LakeWriter<IcebergWriteResult> {
}
private RecordWriter createRecordWriter(WriterInitContext
writerInitContext) {
- if (!icebergTable.spec().isUnpartitioned()) {
- throw new UnsupportedOperationException("Partitioned tables are
not yet supported");
+ Schema schema = icebergTable.schema();
+ List<Integer> equalityFieldIds = new
ArrayList<>(schema.identifierFieldIds());
+ PartitionSpec spec = icebergTable.spec();
+
+ // Get target file size from table properties
+ long targetFileSize = targetFileSize(icebergTable);
+ FileFormat format = fileFormat(icebergTable);
+ OutputFileFactory outputFileFactory =
+ OutputFileFactory.builderFor(
+ icebergTable,
+ writerInitContext.tableBucket().getBucket(),
+ // task id always 0
+ 0)
+ .format(format)
+ .build();
+
+ if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
+ if (spec.isUnpartitioned()) {
+ return new AppendOnlyWriter(
+ icebergTable,
+ writerInitContext.schema().getRowType(),
+ writerInitContext.tableBucket(),
+ null, // No partition for non-partitioned table
+ Collections.emptyList(), // No partition keys
+ format,
+ outputFileFactory,
+ targetFileSize);
+ } else {
+ return null;
+ }
+ } else {
+ if (spec.isUnpartitioned()) {
+ return new DeltaTaskWriter(
+ icebergTable,
+ writerInitContext.schema().getRowType(),
+ writerInitContext.tableBucket(),
+ null, // No partition for non-partitioned table
+ Collections.emptyList(), // No partition keys);
+ format,
+ outputFileFactory,
+ targetFileSize);
+ } else {
+ return null;
+ }
}
-
- // For now, assume append-only (no primary keys)
-
- return new AppendOnlyWriter(
- icebergTable,
- writerInitContext.schema().getRowType(),
- writerInitContext.tableBucket(),
- null, // No partition for non-partitioned table
- Collections.emptyList() // No partition keys
- );
}
@Override
@@ -107,4 +150,20 @@ public class IcebergLakeWriter implements
LakeWriter<IcebergWriteResult> {
throw new IOException("Failed to get table " + tablePath + " in
Iceberg.", e);
}
}
+
+ private static FileFormat fileFormat(Table icebergTable) {
+ String formatString =
+ PropertyUtil.propertyAsString(
+ icebergTable.properties(),
+ TableProperties.DEFAULT_FILE_FORMAT,
+ TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+ return FileFormat.fromString(formatString);
+ }
+
+ private static long targetFileSize(Table icebergTable) {
+ return PropertyUtil.propertyAsLong(
+ icebergTable.properties(),
+ WRITE_TARGET_FILE_SIZE_BYTES,
+ WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+ }
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/append/AppendOnlyWriter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyWriter.java
similarity index 60%
rename from
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/append/AppendOnlyWriter.java
rename to
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyWriter.java
index 9c5607477..0b8ace18c 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/append/AppendOnlyWriter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyWriter.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package com.alibaba.fluss.lake.iceberg.tiering.append;
+package com.alibaba.fluss.lake.iceberg.tiering.writer;
import com.alibaba.fluss.lake.iceberg.tiering.RecordWriter;
import com.alibaba.fluss.metadata.TableBucket;
@@ -24,22 +24,17 @@ import com.alibaba.fluss.types.RowType;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
-import org.apache.iceberg.util.PropertyUtil;
import javax.annotation.Nullable;
import java.util.List;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
-import static
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
-
/** A {@link RecordWriter} to write to Iceberg's append-only table. */
public class AppendOnlyWriter extends RecordWriter {
@@ -48,9 +43,12 @@ public class AppendOnlyWriter extends RecordWriter {
RowType flussRowType,
TableBucket tableBucket,
@Nullable String partition,
- List<String> partitionKeys) {
+ List<String> partitionKeys,
+ FileFormat format,
+ OutputFileFactory outputFileFactory,
+ long targetFileSize) {
super(
- createTaskWriter(icebergTable, tableBucket),
+ createTaskWriter(icebergTable, format, outputFileFactory,
targetFileSize),
icebergTable.schema(),
flussRowType,
tableBucket,
@@ -59,21 +57,12 @@ public class AppendOnlyWriter extends RecordWriter {
}
private static TaskWriter<Record> createTaskWriter(
- Table icebergTable, TableBucket tableBucket) {
- // Get target file size from table properties
- long targetFileSize = targetFileSize(icebergTable);
-
+ Table icebergTable,
+ FileFormat format,
+ OutputFileFactory outputFileFactory,
+ long targetFileSize) {
FileAppenderFactory<Record> fileAppenderFactory =
new GenericAppenderFactory(icebergTable.schema());
- FileFormat format = fileFormat(icebergTable);
- OutputFileFactory outputFileFactory =
- OutputFileFactory.builderFor(
- icebergTable,
- tableBucket.getBucket(),
- // task id always 0
- 0)
- .format(format)
- .build();
return new UnpartitionedWriter<>(
icebergTable.spec(),
@@ -89,20 +78,4 @@ public class AppendOnlyWriter extends RecordWriter {
flussRecordAsIcebergRecord.setFlussRecord(record);
taskWriter.write(flussRecordAsIcebergRecord);
}
-
- private static FileFormat fileFormat(Table icebergTable) {
- String formatString =
- PropertyUtil.propertyAsString(
- icebergTable.properties(),
- TableProperties.DEFAULT_FILE_FORMAT,
- TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
- return FileFormat.fromString(formatString);
- }
-
- private static long targetFileSize(Table icebergTable) {
- return PropertyUtil.propertyAsLong(
- icebergTable.properties(),
- WRITE_TARGET_FILE_SIZE_BYTES,
- WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
- }
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
new file mode 100644
index 000000000..4f16a5ce5
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering.writer;
+
+import com.alibaba.fluss.lake.iceberg.tiering.RecordWriter;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** A {@link RecordWriter} to write to Iceberg's primary-key table. */
+public class DeltaTaskWriter extends RecordWriter {
+
+ public DeltaTaskWriter(
+ Table icebergTable,
+ RowType flussRowType,
+ TableBucket tableBucket,
+ @Nullable String partition,
+ List<String> partitionKeys,
+ FileFormat format,
+ OutputFileFactory outputFileFactory,
+ long targetFileSize) {
+ super(
+ createTaskWriter(icebergTable, format, outputFileFactory,
targetFileSize),
+ icebergTable.schema(),
+ flussRowType,
+ tableBucket,
+ partition,
+ partitionKeys);
+ }
+
+ private static TaskWriter<Record> createTaskWriter(
+ Table icebergTable,
+ FileFormat format,
+ OutputFileFactory outputFileFactory,
+ long targetFileSize) {
+ int[] equalityFieldIds =
+ icebergTable.schema().identifierFieldIds().stream()
+ .mapToInt(Integer::intValue)
+ .toArray();
+ FileAppenderFactory<Record> appenderFactory =
+ new GenericAppenderFactory(
+ icebergTable.schema(),
+ icebergTable.spec(),
+ equalityFieldIds,
+ icebergTable.schema(),
+ null);
+
+ List<String> columns = new ArrayList<>();
+ for (Integer fieldId : icebergTable.schema().identifierFieldIds()) {
+ columns.add(icebergTable.schema().findField(fieldId).name());
+ }
+ Schema deleteSchema = icebergTable.schema().select(columns);
+ return new GenericTaskDeltaWriter(
+ icebergTable.schema(),
+ deleteSchema,
+ icebergTable.spec(),
+ format,
+ appenderFactory,
+ outputFileFactory,
+ icebergTable.io(),
+ targetFileSize);
+ }
+
+ @Override
+ public void write(LogRecord record) throws Exception {
+ GenericTaskDeltaWriter deltaWriter = (GenericTaskDeltaWriter)
taskWriter;
+ flussRecordAsIcebergRecord.setFlussRecord(record);
+ switch (record.getChangeType()) {
+ case INSERT:
+ case UPDATE_AFTER:
+ deltaWriter.write(flussRecordAsIcebergRecord);
+ break;
+ case UPDATE_BEFORE:
+ case DELETE:
+ // TODO we can project the record and only write the equality
delete fields
+ deltaWriter.delete(flussRecordAsIcebergRecord);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown row kind: " + record.getChangeType());
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericTaskDeltaWriter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericTaskDeltaWriter.java
new file mode 100644
index 000000000..65ab644dd
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericTaskDeltaWriter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering.writer;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.BaseTaskWriter;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+
+import java.io.IOException;
+
+/** A generic task equality delta writer. * */
+class GenericTaskDeltaWriter extends BaseTaskWriter<Record> {
+ private final GenericEqualityDeltaWriter deltaWriter;
+
+ public GenericTaskDeltaWriter(
+ Schema schema,
+ Schema deleteSchema,
+ PartitionSpec spec,
+ FileFormat format,
+ FileAppenderFactory<Record> appenderFactory,
+ OutputFileFactory fileFactory,
+ FileIO io,
+ long targetFileSize) {
+ super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+ this.deltaWriter = new GenericEqualityDeltaWriter(null, schema,
deleteSchema);
+ }
+
+ @Override
+ public void write(Record row) throws IOException {
+ deltaWriter.write(row);
+ }
+
+ public void delete(Record row) throws IOException {
+ deltaWriter.delete(row);
+ }
+
+ @Override
+ public void close() throws IOException {
+ deltaWriter.close();
+ }
+
+ private class GenericEqualityDeltaWriter extends BaseEqualityDeltaWriter {
+ private GenericEqualityDeltaWriter(
+ PartitionKey partition, Schema schema, Schema eqDeleteSchema) {
+ super(partition, schema, eqDeleteSchema);
+ }
+
+ @Override
+ protected StructLike asStructLike(Record record) {
+ return record;
+ }
+
+ @Override
+ protected StructLike asStructLikeKey(Record record) {
+ return record;
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
index e8455f0df..7e15f41e1 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
@@ -34,6 +34,7 @@ import com.alibaba.fluss.record.LogRecord;
import com.alibaba.fluss.row.BinaryString;
import com.alibaba.fluss.row.GenericRow;
import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.utils.types.Tuple2;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
@@ -47,8 +48,10 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import javax.annotation.Nullable;
@@ -57,15 +60,23 @@ import java.io.IOException;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
import static
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+import static com.alibaba.fluss.record.ChangeType.DELETE;
+import static com.alibaba.fluss.record.ChangeType.INSERT;
+import static com.alibaba.fluss.record.ChangeType.UPDATE_AFTER;
+import static com.alibaba.fluss.record.ChangeType.UPDATE_BEFORE;
import static org.assertj.core.api.Assertions.assertThat;
/** Unit test for tiering to Iceberg via {@link IcebergLakeTieringFactory}. */
@@ -87,14 +98,23 @@ class IcebergTieringTest {
icebergLakeTieringFactory = new
IcebergLakeTieringFactory(configuration);
}
- @Test
- void testTieringWriteTable() throws Exception {
- TablePath tablePath = TablePath.of("iceberg", "test_table");
- createTable(tablePath);
-
- Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath));
+ private static Stream<Arguments> tieringWriteArgs() {
+ return Stream.of(Arguments.of(true), Arguments.of(false));
+ }
+ @ParameterizedTest
+ @MethodSource("tieringWriteArgs")
+ void testTieringWriteTable(boolean isPrimaryKeyTable) throws Exception {
int bucketNum = 3;
+ TablePath tablePath =
+ TablePath.of(
+ "iceberg",
+ String.format(
+ "test_tiering_table_%s",
+ isPrimaryKeyTable ? "primary_key" : "log"));
+ createTable(tablePath, isPrimaryKeyTable);
+
+ Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath));
Map<Integer, List<LogRecord>> recordsByBucket = new HashMap<>();
@@ -107,11 +127,17 @@ class IcebergTieringTest {
// first, write data
for (int bucket = 0; bucket < bucketNum; bucket++) {
try (LakeWriter<IcebergWriteResult> writer =
createLakeWriter(tablePath, bucket)) {
- List<LogRecord> records = genLogTableRecords(bucket, 5);
- for (LogRecord record : records) {
+ Tuple2<List<LogRecord>, List<LogRecord>> writeAndExpectRecords
=
+ isPrimaryKeyTable
+ ? genPrimaryKeyTableRecords(bucket)
+ : genLogTableRecords(bucket, 10);
+
+ List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
+ List<LogRecord> expectRecords = writeAndExpectRecords.f1;
+ recordsByBucket.put(bucket, expectRecords);
+ for (LogRecord record : writtenRecords) {
writer.write(record);
}
- recordsByBucket.put(bucket, records);
IcebergWriteResult result = writer.complete();
byte[] serialized = writeResultSerializer.serialize(result);
icebergWriteResults.add(
@@ -142,7 +168,11 @@ class IcebergTieringTest {
for (int bucket = 0; bucket < 3; bucket++) {
List<LogRecord> expectRecords = recordsByBucket.get(bucket);
CloseableIterator<Record> actualRecords =
getIcebergRows(icebergTable, bucket);
- verifyLogTableRecords(actualRecords, bucket, expectRecords);
+ if (isPrimaryKeyTable) {
+ verifyTableRecords(actualRecords, expectRecords, bucket);
+ } else {
+ verifyTableRecords(actualRecords, expectRecords, bucket);
+ }
}
}
@@ -187,7 +217,8 @@ class IcebergTieringTest {
return icebergLakeTieringFactory.createLakeCommitter(() -> tablePath);
}
- private List<LogRecord> genLogTableRecords(int bucket, int numRecords) {
+ private Tuple2<List<LogRecord>, List<LogRecord>> genLogTableRecords(
+ int bucket, int numRecords) {
List<LogRecord> logRecords = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
GenericRow genericRow = new GenericRow(3);
@@ -200,10 +231,67 @@ class IcebergTieringTest {
i, System.currentTimeMillis(),
ChangeType.APPEND_ONLY, genericRow);
logRecords.add(logRecord);
}
- return logRecords;
+ return Tuple2.of(logRecords, logRecords);
}
- private void createTable(TablePath tablePath) throws Exception {
+ private Tuple2<List<LogRecord>, List<LogRecord>>
genPrimaryKeyTableRecords(int bucket) {
+ int offset = -1;
+ // gen +I, -U, +U, -D
+ List<GenericRow> rows = genKvRow(bucket, 0, 0, 4);
+ List<LogRecord> writtenLogRecords =
+ new ArrayList<>(
+ Arrays.asList(
+ toRecord(++offset, rows.get(0), INSERT),
+ toRecord(++offset, rows.get(1), UPDATE_BEFORE),
+ toRecord(++offset, rows.get(2), UPDATE_AFTER),
+ toRecord(++offset, rows.get(3), DELETE)));
+ List<LogRecord> expectLogRecords = new ArrayList<>();
+
+ // gen +I, -U, +U
+ rows = genKvRow(bucket, 1, 4, 7);
+ writtenLogRecords.addAll(
+ Arrays.asList(
+ toRecord(++offset, rows.get(0), INSERT),
+ toRecord(++offset, rows.get(1), UPDATE_BEFORE),
+ toRecord(++offset, rows.get(2), UPDATE_AFTER)));
+ expectLogRecords.add(writtenLogRecords.get(writtenLogRecords.size() -
1));
+
+ // gen +I, +U
+ rows = genKvRow(bucket, 2, 7, 9);
+ writtenLogRecords.addAll(
+ Arrays.asList(
+ toRecord(++offset, rows.get(0), INSERT),
+ toRecord(++offset, rows.get(1), UPDATE_AFTER)));
+ expectLogRecords.add(writtenLogRecords.get(writtenLogRecords.size() -
1));
+
+ // gen +I
+ rows = genKvRow(bucket, 3, 9, 10);
+ writtenLogRecords.add(toRecord(++offset, rows.get(0), INSERT));
+ expectLogRecords.add(writtenLogRecords.get(writtenLogRecords.size() -
1));
+
+ return Tuple2.of(writtenLogRecords, expectLogRecords);
+ }
+
+ private List<GenericRow> genKvRow(int bucket, int key, int from, int to) {
+ List<GenericRow> rows = new ArrayList<>();
+ for (int i = from; i < to; i++) {
+ GenericRow genericRow;
+ // Non-partitioned table
+ genericRow = new GenericRow(3);
+ genericRow.setField(0, key);
+ genericRow.setField(1, BinaryString.fromString("bucket" + bucket +
"_" + i));
+ genericRow.setField(2, BinaryString.fromString("bucket" + bucket));
+
+ rows.add(genericRow);
+ }
+ return rows;
+ }
+
+ private GenericRecord toRecord(long offset, GenericRow row, ChangeType
changeType) {
+ return new GenericRecord(offset, System.currentTimeMillis(),
changeType, row);
+ }
+
+ private void createTable(TablePath tablePath, boolean isPrimaryTable)
throws Exception {
Namespace namespace = Namespace.of(tablePath.getDatabaseName());
if (icebergCatalog instanceof SupportsNamespaces) {
SupportsNamespaces ns = (SupportsNamespaces) icebergCatalog;
@@ -212,15 +300,24 @@ class IcebergTieringTest {
}
}
+ Set<Integer> identifierFieldIds = new HashSet<>();
+ if (isPrimaryTable) {
+ identifierFieldIds.add(1);
+ }
+
org.apache.iceberg.Schema schema =
new org.apache.iceberg.Schema(
- Types.NestedField.optional(1, "c1",
Types.IntegerType.get()),
- Types.NestedField.optional(2, "c2",
Types.StringType.get()),
- Types.NestedField.optional(3, "c3",
Types.StringType.get()),
- Types.NestedField.required(4, BUCKET_COLUMN_NAME,
Types.IntegerType.get()),
- Types.NestedField.required(5, OFFSET_COLUMN_NAME,
Types.LongType.get()),
- Types.NestedField.required(
- 6, TIMESTAMP_COLUMN_NAME,
Types.TimestampType.withZone()));
+ Arrays.asList(
+ Types.NestedField.required(1, "c1",
Types.IntegerType.get()),
+ Types.NestedField.optional(2, "c2",
Types.StringType.get()),
+ Types.NestedField.optional(3, "c3",
Types.StringType.get()),
+ Types.NestedField.required(
+ 4, BUCKET_COLUMN_NAME,
Types.IntegerType.get()),
+ Types.NestedField.required(
+ 5, OFFSET_COLUMN_NAME,
Types.LongType.get()),
+ Types.NestedField.required(
+ 6, TIMESTAMP_COLUMN_NAME,
Types.TimestampType.withZone())),
+ identifierFieldIds);
TableIdentifier tableId =
TableIdentifier.of(tablePath.getDatabaseName(),
tablePath.getTableName());
@@ -234,10 +331,10 @@ class IcebergTieringTest {
.iterator();
}
- private void verifyLogTableRecords(
+ private void verifyTableRecords(
CloseableIterator<Record> actualRecords,
- int expectBucket,
- List<LogRecord> expectRecords) {
+ List<LogRecord> expectRecords,
+ int expectBucket) {
for (LogRecord expectRecord : expectRecords) {
Record actualRecord = actualRecords.next();
// check business columns: